From a59c42e5caa76a5ea1d5e79b7820a1a012f9a3a4 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 14 Nov 2020 15:21:53 +0100 Subject: MQTT backend skeleton --- backends/mqtt.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 backends/mqtt.c (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c new file mode 100644 index 0000000..4f56aa6 --- /dev/null +++ b/backends/mqtt.c @@ -0,0 +1,108 @@ +#define BACKEND_NAME "mqtt" + +#include +#include "mqtt.h" + +MM_PLUGIN_API int init(){ + backend mqtt = { + .name = BACKEND_NAME, + .conf = mqtt_configure, + .create = mqtt_instance, + .conf_instance = mqtt_configure_instance, + .channel = mqtt_channel, + .handle = mqtt_set, + .process = mqtt_handle, + .start = mqtt_start, + .shutdown = mqtt_shutdown + }; + + //register backend + if(mm_backend_register(mqtt)){ + LOG("Failed to register backend"); + return 1; + } + return 0; +} + +static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ + //TODO + return 0; +} + +static int mqtt_configure(char* option, char* value){ + LOG("This backend does not take global configuration"); + return 1; +} + +static int mqtt_configure_instance(instance* inst, char* option, char* value){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* token = value; + + if(!strcmp(option, "user")){ + free(data->user); + data->user = strdup(value); + return 0; + } + else if(!strcmp(option, "password")){ + free(data->password); + data->user = strdup(value); + return 0; + } + else if(!strcmp(option, "host")){ + //mqtt url may be of the form + //mqtt[s]://[username][:password]@host.domain[:port] + token = strchr(value, ':'); + //TODO + } + + LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); + return 1; +} + +static int mqtt_instance(instance* inst){ + //TODO + return 0; +} + +static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ + //TODO + return NULL; +} + +static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ + //TODO + return 0; +} + +static int mqtt_handle(size_t num, managed_fd* fds){ + //TODO + return 0; +} + +static int mqtt_start(size_t n, instance** inst){ + //TODO + return 0; +} + +static int mqtt_shutdown(size_t n, instance** inst){ + size_t u, p; + mqtt_instance_data* data = NULL; + + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + for(p = 0; p < data->nchannels; p++){ + free(data->channel[p]); + } + free(data->channel); + free(data->host); + free(data->port); + free(data->user); + free(data->password); + + free(inst[u]->impl); + inst[u]->impl = NULL; + } + + LOG("Backend shut down"); + return 0; +} -- cgit v1.2.3 From fdb0bf2e86693f99550f6e21de85023188ac5b03 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 5 Dec 2020 14:27:23 +0100 Subject: Implement basic MQTT connection startup --- backends/mqtt.c | 213 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 201 insertions(+), 12 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 4f56aa6..d9cf2d8 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -1,8 +1,14 @@ #define BACKEND_NAME "mqtt" +#define DEBUG #include + +#include "libmmbackend.h" #include "mqtt.h" +//TODO +// * Periodic connection retries + MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -24,8 +30,100 @@ MM_PLUGIN_API int init(){ return 0; } +static int mqtt_parse_hostspec(instance* inst, char* hostspec){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* host = strchr(hostspec, '@'), *password = NULL, *port = NULL; + + //mqtt[s]://[username][:password]@host.domain[:port] + if(!strncmp(hostspec, "mqtt://", 7)){ + hostspec += 7; + } + else if(!strncmp(hostspec, "mqtts://", 8)){ + data->tls = 1; + hostspec += 8; + } + + if(host){ + //parse credentials, separate out host spec + *host = 0; + host++; + + password = strchr(hostspec, ':'); + if(password){ + //password supplied, store + *password = 0; + password++; + mmbackend_strdup(&(data->password), password); + } + + //store username + mmbackend_strdup(&(data->user), hostspec); + } + else{ + host = hostspec; + } + + //parse port if supplied + port = strchr(host, ':'); + if(port){ + *port = 0; + port++; + mmbackend_strdup(&(data->port), port); + } + + mmbackend_strdup(&(data->host), host); + return 0; +} + static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ - //TODO + size_t value = 0, offset = 0; + do { + value |= (buffer[offset] & 0x7F) << (7 * offset); + offset++; + } while(buffer[offset - 1] & 0x80); + return 0; +} + +static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ + //implementation conforming to spec 1.5.5 + size_t offset = 0; + do { + buffer[offset] = value % 128; + value = value / 128; + if(value){ + buffer[offset] |= 0x80; + } + offset++; + } while(value); + return offset; +} + +static void mqtt_disconnect(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + + //unmanage the fd + mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); + + close(data->fd); + data->fd = -1; +} + +static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t* vh, size_t payload_length, uint8_t* payload){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + uint8_t fixed_header[5]; + size_t offset = 0; + + fixed_header[offset++] = type; + offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); + + if(mmbackend_send(data->fd, fixed_header, offset) + || mmbackend_send(data->fd, vh, vh_length) + || mmbackend_send(data->fd, payload, payload_length)){ + LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name); + mqtt_disconnect(inst); + return 1; + } + return 0; } @@ -34,25 +132,91 @@ static int mqtt_configure(char* option, char* value){ return 1; } +static int mqtt_reconnect(instance* inst){ + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t vh_offset = 10, payload_offset = 0; + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + + if(!data->host){ + LOGPF("No host specified for instance %s", inst->name); + return 2; + } + + if(data->fd >= 0){ + mqtt_disconnect(inst); + } + + LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)", + inst->name, data->host, + data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), + data->tls ? "yes " : "no", + (data->user || data->password) ? "yes" : "no"); + + data->fd = mmbackend_socket(data->host, + data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), + SOCK_STREAM, 0, 0, 1); + + if(data->fd < 0){ + //retry later + return 1; + } + + //prepare CONNECT message + variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); + //TODO set session expiry interval option + //TODO re-use previos session on reconnect + + //push number of option bytes (as a varint, no less) before actually pushing the option data. + //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. + variable_header[vh_offset++] = 7; + //push maximum packet size option + variable_header[vh_offset++] = 0x27; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; + //push topic alias maximum option + variable_header[vh_offset++] = 0x22; + variable_header[vh_offset++] = 0xFF; + variable_header[vh_offset++] = 0xFF; + + //push client_id as utf8 + //payload_offset += mqtt_push_utf8(); + if(data->user){ + //push user name as utf8 + } + if(data->password){ + //push password as binary + } + + mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); + + //register the fd + if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){ + LOG("Failed to register FD"); + return 2; + } + + return 0; +} + static int mqtt_configure_instance(instance* inst, char* option, char* value){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char* token = value; if(!strcmp(option, "user")){ - free(data->user); - data->user = strdup(value); + mmbackend_strdup(&(data->user), value); return 0; } else if(!strcmp(option, "password")){ - free(data->password); - data->user = strdup(value); + mmbackend_strdup(&(data->password), value); return 0; } else if(!strcmp(option, "host")){ - //mqtt url may be of the form - //mqtt[s]://[username][:password]@host.domain[:port] - token = strchr(value, ':'); - //TODO + if(mqtt_parse_hostspec(inst, value)){ + return 1; + } + return 0; } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); @@ -60,7 +224,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ } static int mqtt_instance(instance* inst){ - //TODO + mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + + if(!data){ + LOG("Failed to allocate memory"); + return 1; + } + + data->fd = -1; + inst->impl = data; return 0; } @@ -75,12 +247,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ } static int mqtt_handle(size_t num, managed_fd* fds){ + LOG("Handling"); //TODO return 0; } static int mqtt_start(size_t n, instance** inst){ - //TODO + size_t u = 0; + + for(u = 0; u < n; u++){ + switch(mqtt_reconnect(inst[u])){ + case 1: + LOGPF("Failed to connect to host for instance %s, will be retried", inst[u]->name); + break; + case 2: + LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name); + return 1; + default: + break; + } + } + return 0; } @@ -90,6 +277,8 @@ static int mqtt_shutdown(size_t n, instance** inst){ for(u = 0; u < n; u++){ data = (mqtt_instance_data*) inst[u]->impl; + mqtt_disconnect(inst[u]); + for(p = 0; p < data->nchannels; p++){ free(data->channel[p]); } -- cgit v1.2.3 From e248f0c5778f73988477ba73174c91b7085c232c Mon Sep 17 00:00:00 2001 From: cbdev Date: Fri, 18 Dec 2020 22:31:14 +0100 Subject: Implement MQTT connect --- backends/mqtt.c | 75 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 10 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index d9cf2d8..8c4a9fd 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -2,10 +2,13 @@ #define DEBUG #include +#include #include "libmmbackend.h" #include "mqtt.h" +static uint64_t last_maintenance = 0; + //TODO // * Periodic connection retries @@ -75,6 +78,14 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){ return 0; } +static int mqtt_generate_instanceid(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char clientid[23] = ""; + + snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); + return mmbackend_strdup(&(data->client_id), clientid); +} + static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ size_t value = 0, offset = 0; do { @@ -98,6 +109,24 @@ static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ return offset; } +static size_t mqtt_push_binary(uint8_t* buffer, size_t buffer_length, uint8_t* content, size_t length){ + if(buffer_length < length + 2 || length > 65535){ + LOG("Failed to push length-prefixed data blob, buffer size exceeded"); + return 0; + } + + buffer[0] = (length >> 8) & 0xFF; + buffer[1] = length & 0xFF; + + memcpy(buffer + 2, content, length); + return length + 2; +} + +static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* content){ + //FIXME might want to validate the string for valid UTF-8 + return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); +} + static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -133,7 +162,7 @@ static int mqtt_configure(char* option, char* value){ } static int mqtt_reconnect(instance* inst){ - uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t vh_offset = 10, payload_offset = 0; mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -162,14 +191,15 @@ static int mqtt_reconnect(instance* inst){ return 1; } - //prepare CONNECT message + //prepare CONNECT message flags variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); + //TODO set session expiry interval option //TODO re-use previos session on reconnect - + //push number of option bytes (as a varint, no less) before actually pushing the option data. //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. - variable_header[vh_offset++] = 7; + variable_header[vh_offset++] = 8; //push maximum packet size option variable_header[vh_offset++] = 0x27; variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; @@ -181,13 +211,14 @@ static int mqtt_reconnect(instance* inst){ variable_header[vh_offset++] = 0xFF; variable_header[vh_offset++] = 0xFF; - //push client_id as utf8 - //payload_offset += mqtt_push_utf8(); + //prepare CONNECT payload + //push client id + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->client_id); if(data->user){ - //push user name as utf8 + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->user); } if(data->password){ - //push password as binary + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->password); } mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); @@ -218,6 +249,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ } return 0; } + else if(!strcmp(option, "clientid")){ + if(strlen(value)){ + mmbackend_strdup(&(data->client_id), value); + return 0; + } + else{ + return mqtt_generate_instanceid(inst); + } + } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); return 1; @@ -233,6 +273,10 @@ static int mqtt_instance(instance* inst){ data->fd = -1; inst->impl = data; + + if(mqtt_generate_instanceid(inst)){ + return 1; + } return 0; } @@ -247,8 +291,17 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ } static int mqtt_handle(size_t num, managed_fd* fds){ - LOG("Handling"); - //TODO + size_t n = 0; + + //for(n = 0; n < num; n++){ + //} + + //keepalive/reconnect processing + if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ + //TODO run reconnects + last_maintenance = mm_timestamp(); + } + return 0; } @@ -268,6 +321,8 @@ static int mqtt_start(size_t n, instance** inst){ } } + //initialize maintenance timer + last_maintenance = mm_timestamp(); return 0; } -- cgit v1.2.3 From 2b0aea70c275e08c1e312db91653eef880b4f725 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 19 Dec 2020 17:25:19 +0100 Subject: Reconnect MQTT instances during maintenance --- backends/mqtt.c | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 8c4a9fd..72046df 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -290,15 +290,69 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ return 0; } +static int mqtt_maintenance(){ + size_t n, u; + instance** inst = NULL; + mqtt_instance_data* data = NULL; + + if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ + LOG("Failed to fetch instance list"); + return 1; + } + + DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + if(data->fd <= 0){ + if(mqtt_reconnect(inst[u]) >= 2){ + LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); + free(inst); + return 1; + } + } + } + + free(inst); + return 0; +} + +static int mqtt_handle_fd(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + + bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); + if(bytes_read < 0){ + LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno)); + return 1; + } + else if(bytes_read == 0){ + //disconnected, try to reconnect + LOGPF("Instance %s disconnected, reconnection queued", inst->name); + mqtt_disconnect(inst); + return 1; + } + + DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + + return 0; +} + static int mqtt_handle(size_t num, managed_fd* fds){ size_t n = 0; + int rv = 0; - //for(n = 0; n < num; n++){ - //} + for(n = 0; n < num; n++){ + if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ + //propagate critical failures + return 1; + } + } //keepalive/reconnect processing if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ - //TODO run reconnects + if(mqtt_maintenance()){ + return 1; + } last_maintenance = mm_timestamp(); } -- cgit v1.2.3 From 0175c84ad866e8f33a90d571e6207c6cc120075c Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 20 Dec 2020 13:30:11 +0100 Subject: Basic MQTT message parsing --- backends/mqtt.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 4 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 72046df..d42d1f5 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -86,16 +86,22 @@ static int mqtt_generate_instanceid(instance* inst){ return mmbackend_strdup(&(data->client_id), clientid); } -static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ +static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ size_t value = 0, offset = 0; do { + if(offset >= len){ + return 0; + } + value |= (buffer[offset] & 0x7F) << (7 * offset); offset++; } while(buffer[offset - 1] & 0x80); - return 0; + + *result = value; + return offset; } -static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ +static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){ //implementation conforming to spec 1.5.5 size_t offset = 0; do { @@ -142,8 +148,9 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t uint8_t fixed_header[5]; size_t offset = 0; + //how in the world is it a _fixed_ header if it contains a variable length integer? eh... fixed_header[offset++] = type; - offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); + offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); if(mmbackend_send(data->fd, fixed_header, offset) || mmbackend_send(data->fd, vh, vh_length) @@ -316,9 +323,28 @@ static int mqtt_maintenance(){ return 0; } +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + switch(type){ + case MSG_CONNACK: + if(length >= 2){ + if(variable_header[1]){ + LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); + } + else{ + LOGPF("Connection on %s established", inst->name); + } + } + break; + default: + LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); + } + return 0; +} + static int mqtt_handle_fd(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + uint32_t message_length = 0, header_length = 0; bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); if(bytes_read < 0){ @@ -333,7 +359,27 @@ static int mqtt_handle_fd(instance* inst){ } DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + data->receive_offset += bytes_read; + + //TODO loop this while at least one unhandled message is in the buffer + //check for complete message + if(data->receive_offset >= 2){ + header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); + if(header_length && data->receive_offset >= message_length + header_length + 1){ + DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); + if(mqtt_handle_message(inst, data->receive_buffer[0], data->receive_buffer + header_length + 1, message_length)){ + //TODO handle failures properly + } + + //remove handled message + if(data->receive_offset > message_length + header_length + 1){ + memmove(data->receive_buffer, data->receive_buffer + message_length + header_length + 1, data->receive_offset - (message_length + header_length + 1)); + } + data->receive_offset -= message_length + header_length + 1; + } + } + data->receive_offset += bytes_read; return 0; } -- cgit v1.2.3 From cbef6a61a92453afba5005c287873001354f5090 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 9 Jan 2021 11:36:09 +0100 Subject: Implement keepalive and protocol fallback --- backends/mqtt.c | 56 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 19 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index d42d1f5..e9493a9 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -135,6 +135,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + data->last_control = 0; //unmanage the fd mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -153,13 +154,14 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); if(mmbackend_send(data->fd, fixed_header, offset) - || mmbackend_send(data->fd, vh, vh_length) - || mmbackend_send(data->fd, payload, payload_length)){ + || (vh && vh_length && mmbackend_send(data->fd, vh, vh_length)) + || (payload && payload_length && mmbackend_send(data->fd, payload, payload_length))){ LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name); mqtt_disconnect(inst); return 1; } + data->last_control = mm_timestamp(); return 0; } @@ -169,7 +171,7 @@ static int mqtt_configure(char* option, char* value){ } static int mqtt_reconnect(instance* inst){ - uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION_DEFAULT, 0x00 /*flags*/, ((MQTT_KEEPALIVE * 2) >> 8) & 0xFF, (MQTT_KEEPALIVE * 2) & 0xFF}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t vh_offset = 10, payload_offset = 0; mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -198,25 +200,28 @@ static int mqtt_reconnect(instance* inst){ return 1; } - //prepare CONNECT message flags + //prepare CONNECT message header + variable_header[6] = data->mqtt_version; variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); //TODO set session expiry interval option //TODO re-use previos session on reconnect - //push number of option bytes (as a varint, no less) before actually pushing the option data. - //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. - variable_header[vh_offset++] = 8; - //push maximum packet size option - variable_header[vh_offset++] = 0x27; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; - //push topic alias maximum option - variable_header[vh_offset++] = 0x22; - variable_header[vh_offset++] = 0xFF; - variable_header[vh_offset++] = 0xFF; + if(data->mqtt_version == 0x05){ //mqtt v5 has additional options + //push number of option bytes (as a varint, no less) before actually pushing the option data. + //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. + variable_header[vh_offset++] = 8; + //push maximum packet size option + variable_header[vh_offset++] = 0x27; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; + //push topic alias maximum option + variable_header[vh_offset++] = 0x22; + variable_header[vh_offset++] = 0xFF; + variable_header[vh_offset++] = 0xFF; + } //prepare CONNECT payload //push client id @@ -279,6 +284,7 @@ static int mqtt_instance(instance* inst){ } data->fd = -1; + data->mqtt_version = MQTT_VERSION_DEFAULT; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -317,6 +323,10 @@ static int mqtt_maintenance(){ return 1; } } + else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ + //send keepalive ping requests + mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + } } free(inst); @@ -324,10 +334,17 @@ static int mqtt_maintenance(){ } static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + switch(type){ case MSG_CONNACK: if(length >= 2){ if(variable_header[1]){ + if(variable_header[1] == 1 && data->mqtt_version == 0x05){ + LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); + data->mqtt_version = 0x04; + return 0; + } LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); } else{ @@ -335,6 +352,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } } break; + case MSG_PINGRESP: + //ignore ping responses + break; default: LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); } @@ -379,13 +399,11 @@ static int mqtt_handle_fd(instance* inst){ } } - data->receive_offset += bytes_read; return 0; } static int mqtt_handle(size_t num, managed_fd* fds){ size_t n = 0; - int rv = 0; for(n = 0; n < num; n++){ if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ -- cgit v1.2.3 From 6334a78d0c475ebfa76a739577a561bded135086 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 10 Jan 2021 09:42:54 +0100 Subject: Subscribe to input channels --- backends/mqtt.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 9 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index e9493a9..d3e516f 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -9,9 +9,6 @@ static uint64_t last_maintenance = 0; -//TODO -// * Periodic connection retries - MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -80,7 +77,7 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){ static int mqtt_generate_instanceid(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char clientid[23] = ""; + char clientid[24] = ""; snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); return mmbackend_strdup(&(data->client_id), clientid); @@ -275,6 +272,32 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return 1; } +static int mqtt_push_subscriptions(instance* inst){ + mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + uint8_t variable_header[3] = {0}; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t u, subs = 0, payload_offset = 0; + + //FIXME might want to aggregate multiple subscribes into one packet + for(u = 0; u < data->nchannels; u++){ + payload_offset = 0; + if(data->channel[u].flags & mmchannel_input){ + variable_header[0] = (data->packet_identifier >> 8) & 0xFF; + variable_header[1] = (data->packet_identifier) & 0xFF; + + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->channel[u].topic); + payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0; + + data->packet_identifier++; + mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload); + subs++; + } + } + + LOGPF("Subscribed %" PRIsize_t " channels on %s", subs, inst->name); + return 0; +} + static int mqtt_instance(instance* inst){ mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); @@ -294,12 +317,47 @@ static int mqtt_instance(instance* inst){ } static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ - //TODO - return NULL; + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t u; + + //check spec for compliance + if(strchr(spec, '+') || strchr(spec, '#')){ + LOGPF("Invalid character in channel specification %s", spec); + return NULL; + } + + //find matching channel + for(u = 0; u < data->nchannels; u++){ + if(!strcmp(spec, data->channel[u].topic)){ + data->channel[u].flags |= flags; + break; + } + } + + //allocate new channel + if(u == data->nchannels){ + data->channel = realloc(data->channel, (data->nchannels + 1) * sizeof(mqtt_channel_data)); + if(!data->channel){ + LOG("Failed to allocate memory"); + return NULL; + } + + data->channel[u].topic = strdup(spec); + data->channel[u].topic_alias = 0; + data->channel[u].flags = flags; + + if(!data->channel[u].topic){ + LOG("Failed to allocate memory"); + return NULL; + } + data->nchannels++; + } + + return mm_channel(inst, u, 1); } static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ - //TODO + //TODO mqtt v3.1.1 local filtering return 0; } @@ -349,6 +407,7 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } else{ LOGPF("Connection on %s established", inst->name); + return mqtt_push_subscriptions(inst); } } break; @@ -424,7 +483,7 @@ static int mqtt_handle(size_t num, managed_fd* fds){ } static int mqtt_start(size_t n, instance** inst){ - size_t u = 0; + size_t u = 0, fds = 0; for(u = 0; u < n; u++){ switch(mqtt_reconnect(inst[u])){ @@ -435,9 +494,11 @@ static int mqtt_start(size_t n, instance** inst){ LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name); return 1; default: + fds++; break; } } + LOGPF("Registered %" PRIsize_t " descriptors to core", fds); //initialize maintenance timer last_maintenance = mm_timestamp(); @@ -453,7 +514,7 @@ static int mqtt_shutdown(size_t n, instance** inst){ mqtt_disconnect(inst[u]); for(p = 0; p < data->nchannels; p++){ - free(data->channel[p]); + free(data->channel[p].topic); } free(data->channel); free(data->host); -- cgit v1.2.3 From 983313fe782047a5fe84314fafa2eb0b56e0e3a1 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 6 Feb 2021 22:51:54 +0100 Subject: Implement MQTT output --- backends/mqtt.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 4 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index d3e516f..00045c0 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -9,6 +9,14 @@ static uint64_t last_maintenance = 0; +/* + * TODO + * * proper RETAIN handling + * * use topic aliases if possible + * * mqtt v3.1.1 local filtering + * * modifiable output mappings + */ + MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -267,13 +275,20 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return mqtt_generate_instanceid(inst); } } + else if(!strcmp(option, "protocol")){ + data->mqtt_version = MQTT_VERSION_DEFAULT; + if(!strcmp(value, "3.1.1")){ + data->mqtt_version = 4; + } + return 0; + } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); return 1; } static int mqtt_push_subscriptions(instance* inst){ - mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; uint8_t variable_header[3] = {0}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t u, subs = 0, payload_offset = 0; @@ -282,6 +297,7 @@ static int mqtt_push_subscriptions(instance* inst){ for(u = 0; u < data->nchannels; u++){ payload_offset = 0; if(data->channel[u].flags & mmchannel_input){ + DBGPF("Subscribing %s.%s, channel %" PRIsize_t ", flags %d", inst->name, data->channel[u].topic, u, data->channel[u].flags); variable_header[0] = (data->packet_identifier >> 8) & 0xFF; variable_header[1] = (data->packet_identifier) & 0xFF; @@ -330,6 +346,7 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ for(u = 0; u < data->nchannels; u++){ if(!strcmp(spec, data->channel[u].topic)){ data->channel[u].flags |= flags; + DBGPF("Reusing existing channel %" PRIsize_t " for spec %s.%s, flags are now %02X", u, inst->name, spec, data->channel[u].flags); break; } } @@ -343,13 +360,16 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ } data->channel[u].topic = strdup(spec); - data->channel[u].topic_alias = 0; + data->channel[u].topic_alias_sent = 0; + data->channel[u].topic_alias_rcvd = 0; data->channel[u].flags = flags; if(!data->channel[u].topic){ LOG("Failed to allocate memory"); return NULL; } + + DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags); data->nchannels++; } @@ -357,7 +377,52 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ } static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ - //TODO mqtt v3.1.1 local filtering + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + uint8_t variable_header[MQTT_BUFFER_LENGTH]; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t vh_length = 0, payload_length = 0; + size_t u; + + for(u = 0; u < num; u++){ + vh_length = payload_length = 0; + + if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){ + //push zero-length topic + variable_header[vh_length++] = 0; + variable_header[vh_length++] = 0; + + //push property length + variable_header[vh_length++] = 5; + + //push payload type (0x01) + variable_header[vh_length++] = 0x01; + variable_header[vh_length++] = 1; + + //push topic alias (0x23) + variable_header[vh_length++] = 0x23; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; + variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + } + else{ + //push topic + vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); + if(data->mqtt_version == 0x05){ + //push property length + variable_header[vh_length++] = 2; + + //push payload type (0x01) + variable_header[vh_length++] = 0x01; + variable_header[vh_length++] = 1; + } + } + payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); + //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); + //payload[0] = (payload_length >> 8) & 0xFF; + //payload[1] = payload_length & 0xFF; + //payload_length += 2; + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); + } + return 0; } @@ -412,7 +477,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } break; case MSG_PINGRESP: - //ignore ping responses + case MSG_SUBACK: + //ignore most responses + //FIXME error check SUBACK break; default: LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); -- cgit v1.2.3 From fdd8f8075fbcb22d349135bd87bf95b8ce88c8e1 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 13 Feb 2021 16:58:12 +0100 Subject: Implement basic MQTT reception --- backends/mqtt.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 00045c0..8bff531 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -8,6 +8,40 @@ #include "mqtt.h" static uint64_t last_maintenance = 0; +/* according to spec 2.2.2.2 */ +static struct { + uint8_t property; + uint8_t storage; +} property_lengths[] = { + {0x01, STORAGE_U8}, + {0x02, STORAGE_U32}, + {0x03, STORAGE_PREFIXED}, + {0x08, STORAGE_PREFIXED}, + {0x09, STORAGE_PREFIXED}, + {0x0B, STORAGE_VARINT}, + {0x11, STORAGE_U32}, + + {0x12, STORAGE_PREFIXED}, + {0x13, STORAGE_U16}, + {0x15, STORAGE_PREFIXED}, + {0x16, STORAGE_PREFIXED}, + {0x17, STORAGE_U8}, + {0x18, STORAGE_U32}, + {0x19, STORAGE_U8}, + {0x1A, STORAGE_PREFIXED}, + {0x1C, STORAGE_PREFIXED}, + {0x1F, STORAGE_PREFIXED}, + {0x21, STORAGE_U16}, + {0x22, STORAGE_U16}, + {0x23, STORAGE_U16}, + {0x24, STORAGE_U8}, + {0x25, STORAGE_U8}, + {0x26, STORAGE_PREFIXPAIR}, + {0x27, STORAGE_U32}, + {0x28, STORAGE_U8}, + {0x29, STORAGE_U8}, + {0x2A, STORAGE_U8} +}; /* * TODO @@ -15,6 +49,8 @@ static uint64_t last_maintenance = 0; * * use topic aliases if possible * * mqtt v3.1.1 local filtering * * modifiable output mappings + * * TLS + * * JSON subchannels */ MM_PLUGIN_API int init(){ @@ -91,7 +127,7 @@ static int mqtt_generate_instanceid(instance* inst){ return mmbackend_strdup(&(data->client_id), clientid); } -static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ +static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ size_t value = 0, offset = 0; do { if(offset >= len){ @@ -138,6 +174,21 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); } +static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ + size_t length = 0; + *data = NULL; + + if(buffer_length < 2){ + return 0; + } + + length = (buffer[0] << 8) | buffer[1]; + if(buffer_length >= length + 2){ + *data = (char*) buffer + 2; + } + return length; +} + static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; data->last_control = 0; @@ -305,6 +356,11 @@ static int mqtt_push_subscriptions(instance* inst){ payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0; data->packet_identifier++; + //zero is not a valid packet identifier + if(!data->packet_identifier){ + data->packet_identifier++; + } + mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload); subs++; } @@ -324,6 +380,7 @@ static int mqtt_instance(instance* inst){ data->fd = -1; data->mqtt_version = MQTT_VERSION_DEFAULT; + data->packet_identifier = 1; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -456,6 +513,62 @@ static int mqtt_maintenance(){ return 0; } +static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* topic = NULL, *payload = NULL; + channel* changed = NULL; + channel_value val; + uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; + uint32_t property_length = 0; + size_t u = data->nchannels, property_offset, payload_offset, payload_length; + size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); + + property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0); + if(data->mqtt_version == 0x05){ + //read properties length + payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); + payload_offset += property_length; + + //TODO parse properties + //find topic alias + //find type code + } + + //match via topic alias + if(topic_length == 0){ + //TODO match topic aliases + //TODO build topic alias database + } + //match via topic + else{ + for(u = 0; u < data->nchannels; u++){ + if(!strncmp(data->channel[u].topic, topic, topic_length)){ + break; + } + } + } + + if(content_utf8){ + payload_length = mqtt_pop_utf8(variable_header + payload_offset, length - payload_offset, &payload); + } + else{ + payload_length = length - payload_offset; + payload = (char*) (variable_header + payload_offset); + } + + if(u != data->nchannels && payload_length && payload){ + DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); + //FIXME implement json subchannels + //FIXME implement input mappings + changed = mm_channel(inst, u, 0); + if(changed){ + val.normalised = strtod(payload, NULL); + mm_channel_event(changed, val); + } + } + return 0; +} + static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -482,6 +595,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h //FIXME error check SUBACK break; default: + if((type & 0xF0) == MSG_PUBLISH){ + return mqtt_handle_publish(inst, type, variable_header, length); + } LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); } return 0; -- cgit v1.2.3 From 670f3ff21f30b8aae765d61a38a7269d0eaab19d Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 14 Feb 2021 00:21:47 +0100 Subject: Implement MQTT5 topic aliases --- backends/mqtt.c | 197 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 153 insertions(+), 44 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 8bff531..29f0436 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -46,7 +46,6 @@ static struct { /* * TODO * * proper RETAIN handling - * * use topic aliases if possible * * mqtt v3.1.1 local filtering * * modifiable output mappings * * TLS @@ -138,10 +137,52 @@ static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ offset++; } while(buffer[offset - 1] & 0x80); - *result = value; + if(result){ + *result = value; + } return offset; } +static size_t mqtt_pop_property(uint8_t* buffer, size_t bytes){ + size_t length = 0, u; + + if(bytes){ + for(u = 0; u < sizeof(property_lengths)/sizeof(property_lengths[0]); u++){ + if(property_lengths[u].property == buffer[0]){ + switch(property_lengths[u].storage){ + case STORAGE_U8: + return 2; + case STORAGE_U16: + return 3; + case STORAGE_U32: + return 5; + case STORAGE_VARINT: + return mqtt_pop_varint(buffer + 1, bytes - 1, NULL) + 1; + case STORAGE_PREFIXED: + if(bytes >= 3){ + return ((buffer[1] << 8) | buffer[2]) + 1; + } + //best-effort guess + return 3; + case STORAGE_PREFIXPAIR: + if(bytes >= 3){ + length = ((buffer[1] << 8) | buffer[2]); + if(bytes >= length + 5){ + return (1 + 2 + length + 2 + ((buffer[length + 3] << 8) | buffer[length + 4])); + } + return length + 3; + } + //best-effort guess + return 5; + } + } + } + } + + LOGPF("Storage class for property %02X was unknown", buffer[0]); + return 1; +} + static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){ //implementation conforming to spec 1.5.5 size_t offset = 0; @@ -191,8 +232,18 @@ static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t u; + data->last_control = 0; + //reset aliases as they can not be reused across sessions + data->server_max_alias = 0; + data->current_alias = 1; + for(u = 0; u < data->nchannels; u++){ + data->channel[u].topic_alias_sent = 0; + data->channel[u].topic_alias_rcvd = 0; + } + //unmanage the fd mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -241,11 +292,12 @@ static int mqtt_reconnect(instance* inst){ mqtt_disconnect(inst); } - LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)", + LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s, Protocol: %s)", inst->name, data->host, data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), data->tls ? "yes " : "no", - (data->user || data->password) ? "yes" : "no"); + (data->user || data->password) ? "yes" : "no", + (data->mqtt_version == 0x05) ? "v5" : "v3.1.1"); data->fd = mmbackend_socket(data->host, data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), @@ -260,9 +312,6 @@ static int mqtt_reconnect(instance* inst){ variable_header[6] = data->mqtt_version; variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); - //TODO set session expiry interval option - //TODO re-use previos session on reconnect - if(data->mqtt_version == 0x05){ //mqtt v5 has additional options //push number of option bytes (as a varint, no less) before actually pushing the option data. //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. @@ -381,6 +430,7 @@ static int mqtt_instance(instance* inst){ data->fd = -1; data->mqtt_version = MQTT_VERSION_DEFAULT; data->packet_identifier = 1; + data->current_alias = 1; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -443,22 +493,40 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ for(u = 0; u < num; u++){ vh_length = payload_length = 0; - if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){ - //push zero-length topic - variable_header[vh_length++] = 0; - variable_header[vh_length++] = 0; + if(data->mqtt_version == 0x05){ + if(data->channel[c[u]->ident].topic_alias_sent){ + //push zero-length topic + variable_header[vh_length++] = 0; + variable_header[vh_length++] = 0; + } + else{ + //push topic + vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); + //generate topic alias if possible + if(data->current_alias <= data->server_max_alias){ + data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; + DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + } + } //push property length - variable_header[vh_length++] = 5; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent) ? 5 : 2; //push payload type (0x01) variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; - //push topic alias (0x23) - variable_header[vh_length++] = 0x23; - variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; - variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + if(data->channel[c[u]->ident].topic_alias_sent){ + //push topic alias (0x23) + variable_header[vh_length++] = 0x23; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; + variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + } + + payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); + payload[0] = (payload_length >> 8) & 0xFF; + payload[1] = payload_length & 0xFF; + payload_length += 2; } else{ //push topic @@ -471,12 +539,9 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; } + payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); } - payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); - //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); - //payload[0] = (payload_length >> 8) & 0xFF; - //payload[1] = payload_length & 0xFF; - //payload_length += 2; + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); } @@ -519,6 +584,7 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h channel* changed = NULL; channel_value val; uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; + uint16_t topic_alias = 0; uint32_t property_length = 0; size_t u = data->nchannels, property_offset, payload_offset, payload_length; size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); @@ -529,23 +595,42 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); payload_offset += property_length; - //TODO parse properties - //find topic alias - //find type code + property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); + //parse properties + while(property_offset < payload_offset){ + DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length); + //read payload format indicator + if(variable_header[property_offset] == 0x01){ + content_utf8 = variable_header[property_offset + 1]; + } + //read topic alias + else if(variable_header[property_offset] == 0x23){ + topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; + } + + property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); + } } //match via topic alias - if(topic_length == 0){ - //TODO match topic aliases - //TODO build topic alias database + if(!topic_length && topic_alias){ + for(u = 0; u < data->nchannels; u++){ + if(data->channel[u].topic_alias_rcvd == topic_alias){ + break; + } + } } //match via topic - else{ + else if(topic_length){ for(u = 0; u < data->nchannels; u++){ if(!strncmp(data->channel[u].topic, topic, topic_length)){ break; } } + + if(topic_alias){ + data->channel[u].topic_alias_rcvd = topic_alias; + } } if(content_utf8){ @@ -569,26 +654,50 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h return 0; } -static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t property_offset = 2; + + if(length >= 2){ + if(variable_header[1]){ + if(variable_header[1] == 1 && data->mqtt_version == 0x05){ + LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); + data->mqtt_version = 0x04; + return 0; + } + LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); + mqtt_disconnect(inst); + return 0; + } - switch(type){ - case MSG_CONNACK: - if(length >= 2){ - if(variable_header[1]){ - if(variable_header[1] == 1 && data->mqtt_version == 0x05){ - LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); - data->mqtt_version = 0x04; - return 0; - } - LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); - } - else{ - LOGPF("Connection on %s established", inst->name); - return mqtt_push_subscriptions(inst); + //parse response properties if present + if(data->mqtt_version == 0x05){ + property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); + while(property_offset < length){ + DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); + + //read maximum topic alias + if(variable_header[property_offset] == 0x22){ + data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; + DBGPF("Connection supports maximum connection alias %" PRIu16, data->server_max_alias); } + + property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); } - break; + } + + LOGPF("Connection on %s established", inst->name); + return mqtt_push_subscriptions(inst); + } + + LOGPF("Received malformed CONNACK on %s", inst->name); + return 1; +} + +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + switch(type){ + case MSG_CONNACK: + return mqtt_handle_connack(inst, type, variable_header, length); case MSG_PINGRESP: case MSG_SUBACK: //ignore most responses -- cgit v1.2.3 From f483c0e50676d6a3b8eb8258daca1115550dd808 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 14 Feb 2021 02:12:22 +0100 Subject: MQTT backend documentation --- backends/mqtt.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 29f0436..41fa772 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -580,7 +580,7 @@ static int mqtt_maintenance(){ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char* topic = NULL, *payload = NULL; + char* topic = NULL, *payload = NULL, *conversion_end = NULL; channel* changed = NULL; channel_value val; uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; @@ -647,7 +647,11 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h //FIXME implement input mappings changed = mm_channel(inst, u, 0); if(changed){ - val.normalised = strtod(payload, NULL); + val.normalised = clamp(strtod(payload, &conversion_end), 1.0, 0.0); + if(payload == conversion_end){ + LOGPF("Failed to parse incoming data for %s.%s", inst->name, data->channel[u].topic); + return 0; + } mm_channel_event(changed, val); } } -- cgit v1.2.3 From fca46bef7dd8448216d44f0777f0b5ef31ac5883 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 27 Feb 2021 16:54:06 +0100 Subject: Implement input/output mapping --- backends/mqtt.c | 296 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 237 insertions(+), 59 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 41fa772..36aed03 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -3,6 +3,7 @@ #include #include +#include #include "libmmbackend.h" #include "mqtt.h" @@ -46,8 +47,6 @@ static struct { /* * TODO * * proper RETAIN handling - * * mqtt v3.1.1 local filtering - * * modifiable output mappings * * TLS * * JSON subchannels */ @@ -218,7 +217,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ size_t length = 0; *data = NULL; - + if(buffer_length < 2){ return 0; } @@ -349,6 +348,92 @@ static int mqtt_reconnect(instance* inst){ return 0; } +static int mqtt_configure_channel(instance* inst, char* option, char* value){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* next_token = NULL; + channel* configure = NULL; + uint8_t mark = 0; + mqtt_channel_value config = { + 0 + }; + + if(!strncmp(value, "range ", 6)){ + //we support min > max for range configurations + value += 6; + + config.min = strtod(value, &next_token); + if(value == next_token){ + LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); + return 1; + } + + config.max = strtod(next_token, &value); + if(value == next_token){ + LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); + return 1; + } + } + else if(!strncmp(value, "discrete ", 9)){ + value += 9; + + for(; *value && isspace(*value); value++){ + } + if(value[0] == '!'){ + mark = 1; + value++; + } + config.min = clamp(strtod(value, &next_token), 1.0, 0.0); + value = next_token; + + for(; *value && isspace(*value); value++){ + } + if(value[0] == '!'){ + mark = 2; + value++; + } + + config.max = clamp(strtod(value, &next_token), 1.0, 0.0); + value = next_token; + if(config.max < config.min){ + LOGPF("Discrete topic configuration for %s.%s has invalid limit ordering", inst->name, option); + return 1; + } + + for(; *value && isspace(*value); value++){ + } + + config.discrete = strdup(value); + config.normal = mark ? ((mark == 1) ? config.min : config.max) : (config.min + (config.max - config.min) / 2); + } + else{ + LOGPF("Unknown instance configuration option or invalid preconfiguration %s on instance %s", option, inst->name); + return 1; + } + + configure = mqtt_channel(inst, option, 0); + if(!configure + //if configuring scale, no other config is possible + || (!config.discrete && data->channel[configure->ident].values) + //if configuring discrete, the previous one can't be a a scale + || (config.discrete && data->channel[configure->ident].values && !data->channel[configure->ident].value[0].discrete)){ + LOGPF("Failed to configure topic %s.%s", inst->name, option); + free(config.discrete); + return 1; + } + + data->channel[configure->ident].value = realloc(data->channel[configure->ident].value, (data->channel[configure->ident].values + 1) * sizeof(mqtt_channel_value)); + if(!data->channel[configure->ident].value){ + LOG("Failed to allocate memory"); + return 1; + } + + DBGPF("Configuring value on %s.%s: min %f max %f normal %f discrete %s", inst->name, option, config.min, config.max, config.normal, config.discrete ? config.discrete : "-"); + data->channel[configure->ident].value[data->channel[configure->ident].values] = config; + data->channel[configure->ident].values++; + DBGPF("Value configuration for %s.%s now at %" PRIsize_t " entries", inst->name, option, data->channel[configure->ident].values); + return 0; +} + static int mqtt_configure_instance(instance* inst, char* option, char* value){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -383,8 +468,8 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return 0; } - LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); - return 1; + //try to register as channel preconfig + return mqtt_configure_channel(inst, option, value); } static int mqtt_push_subscriptions(instance* inst){ @@ -470,12 +555,14 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ data->channel[u].topic_alias_sent = 0; data->channel[u].topic_alias_rcvd = 0; data->channel[u].flags = flags; + data->channel[u].values = 0; + data->channel[u].value = NULL; if(!data->channel[u].topic){ LOG("Failed to allocate memory"); return NULL; } - + DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags); data->nchannels++; } @@ -483,15 +570,125 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ return mm_channel(inst, u, 1); } +static int mqtt_maintenance(){ + size_t n, u; + instance** inst = NULL; + mqtt_instance_data* data = NULL; + + if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ + LOG("Failed to fetch instance list"); + return 1; + } + + DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + if(data->fd <= 0){ + if(mqtt_reconnect(inst[u]) >= 2){ + LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); + free(inst); + return 1; + } + } + else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ + //send keepalive ping requests + mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + } + } + + free(inst); + return 0; +} + +static int mqtt_deserialize(instance* inst, channel* output, mqtt_channel_data* input, char* buffer, size_t length){ + char* next_token = NULL, conversion_buffer[1024] = {0}; + channel_value val; + double range, raw; + size_t u; + //FIXME implement json subchannels + + //unconfigured channel + if(!input->values){ + //the original buffer is the result of an unterminated receive, move it over + memcpy(conversion_buffer, buffer, length); + val.normalised = clamp(strtod(conversion_buffer, &next_token), 1.0, 0.0); + if(conversion_buffer == next_token){ + LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); + return 1; + } + } + //ranged channel + else if(!input->value[0].discrete){ + memcpy(conversion_buffer, buffer, length); + raw = clamp(strtod(conversion_buffer, &next_token), max(input->value[0].max, input->value[0].min), min(input->value[0].max, input->value[0].min)); + if(conversion_buffer == next_token){ + LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); + return 1; + } + range = fabs(input->value[0].max - input->value[0].min); + val.normalised = (raw - input->value[0].min) / range; + if(input->value[0].max < input->value[0].min){ + val.normalised = fabs(val.normalised); + } + } + else{ + for(u = 0; u < input->values; u++){ + if(length == strlen(input->value[u].discrete) + && !strncmp(input->value[u].discrete, buffer, length)){ + val.normalised = input->value[u].normal; + break; + } + } + + if(u == input->values){ + LOGPF("Failed to parse incoming data for %s.%s, no matching discrete token", inst->name, input->topic); + return 1; + } + } + + val.normalised = clamp(val.normalised, 1.0, 0.0); + mm_channel_event(output, val); + return 0; +} + +static size_t mqtt_serialize(instance* inst, mqtt_channel_data* input, char* output, size_t length, double value){ + double range; + size_t u, invert = 0; + + //unconfigured channel + if(!input->values){ + return snprintf(output, length, "%f", value); + } + //ranged channel + else if(!input->value[0].discrete){ + range = fabs(input->value[0].max - input->value[0].min); + if(input->value[0].max < input->value[0].min){ + invert = 1; + } + return snprintf(output, length, "%f", (value * range) * (invert ? -1 : 1) + input->value[0].min); + } + else{ + for(u = 0; u < input->values; u++){ + if(input->value[u].min <= value + && input->value[u].max >= value){ + memcpy(output, input->value[u].discrete, min(strlen(input->value[u].discrete), length)); + return min(strlen(input->value[u].discrete), length); + } + } + } + + LOGPF("No discrete value on %s.%s defined for normalized value %f", inst->name, input->topic, value); + return 0; +} + static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; uint8_t variable_header[MQTT_BUFFER_LENGTH]; - uint8_t payload[MQTT_BUFFER_LENGTH]; - size_t vh_length = 0, payload_length = 0; - size_t u; + uint8_t payload[MQTT_BUFFER_LENGTH], alias_assigned = 0; + size_t vh_length = 0, payload_length = 0, u; for(u = 0; u < num; u++){ - vh_length = payload_length = 0; + vh_length = payload_length = alias_assigned = 0; if(data->mqtt_version == 0x05){ if(data->channel[c[u]->ident].topic_alias_sent){ @@ -506,6 +703,8 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ if(data->current_alias <= data->server_max_alias){ data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + + alias_assigned = 1; } } @@ -523,10 +722,12 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; } - payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); - payload[0] = (payload_length >> 8) & 0xFF; - payload[1] = payload_length & 0xFF; - payload_length += 2; + payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) (payload + 2), sizeof(payload) - 2, v[u].normalised); + if(payload_length){ + payload[0] = (payload_length >> 8) & 0xFF; + payload[1] = payload_length & 0xFF; + payload_length += 2; + } } else{ //push topic @@ -539,50 +740,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; } - payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); + payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) payload, sizeof(payload), v[u].normalised); } - mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); - } - - return 0; -} - -static int mqtt_maintenance(){ - size_t n, u; - instance** inst = NULL; - mqtt_instance_data* data = NULL; - - if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ - LOG("Failed to fetch instance list"); - return 1; - } - - DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); - for(u = 0; u < n; u++){ - data = (mqtt_instance_data*) inst[u]->impl; - if(data->fd <= 0){ - if(mqtt_reconnect(inst[u]) >= 2){ - LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); - free(inst); - return 1; - } + if(payload_length){ + DBGPF("Transmitting %" PRIsize_t " bytes for %s", payload_length, inst->name); + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); } - else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ - //send keepalive ping requests - mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + else if(alias_assigned){ + //undo alias assignment + data->channel[c[u]->ident].topic_alias_sent = 0; + data->current_alias--; } } - free(inst); return 0; } static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char* topic = NULL, *payload = NULL, *conversion_end = NULL; + char* topic = NULL, *payload = NULL; channel* changed = NULL; - channel_value val; uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; uint16_t topic_alias = 0; uint32_t property_length = 0; @@ -643,16 +821,9 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h if(u != data->nchannels && payload_length && payload){ DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); - //FIXME implement json subchannels - //FIXME implement input mappings changed = mm_channel(inst, u, 0); if(changed){ - val.normalised = clamp(strtod(payload, &conversion_end), 1.0, 0.0); - if(payload == conversion_end){ - LOGPF("Failed to parse incoming data for %s.%s", inst->name, data->channel[u].topic); - return 0; - } - mm_channel_event(changed, val); + mqtt_deserialize(inst, changed, data->channel + u, payload, payload_length); } } return 0; @@ -679,7 +850,7 @@ static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_h property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); while(property_offset < length){ DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); - + //read maximum topic alias if(variable_header[property_offset] == 0x22){ data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; @@ -736,9 +907,8 @@ static int mqtt_handle_fd(instance* inst){ DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); data->receive_offset += bytes_read; - //TODO loop this while at least one unhandled message is in the buffer - //check for complete message - if(data->receive_offset >= 2){ + while(data->receive_offset >= 2){ + //check for complete message header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); if(header_length && data->receive_offset >= message_length + header_length + 1){ DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); @@ -752,6 +922,9 @@ static int mqtt_handle_fd(instance* inst){ } data->receive_offset -= message_length + header_length + 1; } + else{ + break; + } } return 0; @@ -802,7 +975,7 @@ static int mqtt_start(size_t n, instance** inst){ } static int mqtt_shutdown(size_t n, instance** inst){ - size_t u, p; + size_t u, p, v; mqtt_instance_data* data = NULL; for(u = 0; u < n; u++){ @@ -810,6 +983,10 @@ static int mqtt_shutdown(size_t n, instance** inst){ mqtt_disconnect(inst[u]); for(p = 0; p < data->nchannels; p++){ + for(v = 0; v < data->channel[p].values; v++){ + free(data->channel[p].value[v].discrete); + } + free(data->channel[p].value); free(data->channel[p].topic); } free(data->channel); @@ -817,6 +994,7 @@ static int mqtt_shutdown(size_t n, instance** inst){ free(data->port); free(data->user); free(data->password); + free(data->client_id); free(inst[u]->impl); inst[u]->impl = NULL; -- cgit v1.2.3 From c70ab97fe76003019ebf93d5c37a01366769138f Mon Sep 17 00:00:00 2001 From: cbdev Date: Mon, 21 Jun 2021 21:06:31 +0200 Subject: Publish MQTT backend --- backends/mqtt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 36aed03..f2a7c83 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -1,5 +1,5 @@ #define BACKEND_NAME "mqtt" -#define DEBUG +//#define DEBUG #include #include -- cgit v1.2.3 From a89dcd7942958e0ebe5881dde4820ceb77d29e9b Mon Sep 17 00:00:00 2001 From: cbdev Date: Wed, 23 Jun 2021 22:27:15 +0200 Subject: Fix build with recent environments, fix Coverity CID 371602 and 355842 --- backends/mqtt.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index f2a7c83..47042f1 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -775,14 +775,15 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); //parse properties - while(property_offset < payload_offset){ + while(property_offset < payload_offset && property_offset < length){ DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length); + //read payload format indicator - if(variable_header[property_offset] == 0x01){ + if(variable_header[property_offset] == 0x01 && property_offset < length - 1){ content_utf8 = variable_header[property_offset + 1]; } //read topic alias - else if(variable_header[property_offset] == 0x23){ + else if(variable_header[property_offset] == 0x23 && property_offset < length - 2){ topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; } -- cgit v1.2.3 From 17551fe56c8dc20630811b1726559b00cb911abd Mon Sep 17 00:00:00 2001 From: cbdev Date: Mon, 28 Jun 2021 22:28:47 +0200 Subject: Fix Coverity CIDs 371602, 355843 --- backends/mqtt.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'backends/mqtt.c') diff --git a/backends/mqtt.c b/backends/mqtt.c index 47042f1..da4bf38 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -765,7 +765,7 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h uint16_t topic_alias = 0; uint32_t property_length = 0; size_t u = data->nchannels, property_offset, payload_offset, payload_length; - size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); + size_t topic_length = min(mqtt_pop_utf8(variable_header, length, &topic), length); property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0); if(data->mqtt_version == 0x05){ -- cgit v1.2.3