From e248f0c5778f73988477ba73174c91b7085c232c Mon Sep 17 00:00:00 2001 From: cbdev Date: Fri, 18 Dec 2020 22:31:14 +0100 Subject: Implement MQTT connect --- backends/mqtt.c | 75 +++++++++++++++++++++++++++++++++++++++++++++++++-------- backends/mqtt.h | 3 +++ 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index d9cf2d8..8c4a9fd 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -2,10 +2,13 @@ #define DEBUG #include +#include #include "libmmbackend.h" #include "mqtt.h" +static uint64_t last_maintenance = 0; + //TODO // * Periodic connection retries @@ -75,6 +78,14 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){ return 0; } +static int mqtt_generate_instanceid(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char clientid[23] = ""; + + snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); + return mmbackend_strdup(&(data->client_id), clientid); +} + static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ size_t value = 0, offset = 0; do { @@ -98,6 +109,24 @@ static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ return offset; } +static size_t mqtt_push_binary(uint8_t* buffer, size_t buffer_length, uint8_t* content, size_t length){ + if(buffer_length < length + 2 || length > 65535){ + LOG("Failed to push length-prefixed data blob, buffer size exceeded"); + return 0; + } + + buffer[0] = (length >> 8) & 0xFF; + buffer[1] = length & 0xFF; + + memcpy(buffer + 2, content, length); + return length + 2; +} + +static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* content){ + //FIXME might want to validate the string for valid UTF-8 + return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); +} + static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -133,7 +162,7 @@ static int mqtt_configure(char* option, char* value){ } static int mqtt_reconnect(instance* inst){ - uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t vh_offset = 10, payload_offset = 0; mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -162,14 +191,15 @@ static int mqtt_reconnect(instance* inst){ return 1; } - //prepare CONNECT message + //prepare CONNECT message flags variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); + //TODO set session expiry interval option //TODO re-use previos session on reconnect - + //push number of option bytes (as a varint, no less) before actually pushing the option data. //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. - variable_header[vh_offset++] = 7; + variable_header[vh_offset++] = 8; //push maximum packet size option variable_header[vh_offset++] = 0x27; variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; @@ -181,13 +211,14 @@ static int mqtt_reconnect(instance* inst){ variable_header[vh_offset++] = 0xFF; variable_header[vh_offset++] = 0xFF; - //push client_id as utf8 - //payload_offset += mqtt_push_utf8(); + //prepare CONNECT payload + //push client id + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->client_id); if(data->user){ - //push user name as utf8 + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->user); } if(data->password){ - //push password as binary + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->password); } mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); @@ -218,6 +249,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ } return 0; } + else if(!strcmp(option, "clientid")){ + if(strlen(value)){ + mmbackend_strdup(&(data->client_id), value); + return 0; + } + else{ + return mqtt_generate_instanceid(inst); + } + } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); return 1; @@ -233,6 +273,10 @@ static int mqtt_instance(instance* inst){ data->fd = -1; inst->impl = data; + + if(mqtt_generate_instanceid(inst)){ + return 1; + } return 0; } @@ -247,8 +291,17 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ } static int mqtt_handle(size_t num, managed_fd* fds){ - LOG("Handling"); - //TODO + size_t n = 0; + + //for(n = 0; n < num; n++){ + //} + + //keepalive/reconnect processing + if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ + //TODO run reconnects + last_maintenance = mm_timestamp(); + } + return 0; } @@ -268,6 +321,8 @@ static int mqtt_start(size_t n, instance** inst){ } } + //initialize maintenance timer + last_maintenance = mm_timestamp(); return 0; } diff --git a/backends/mqtt.h b/backends/mqtt.h index 1c8b47d..df63319 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -14,6 +14,7 @@ static int mqtt_shutdown(size_t n, instance** inst); #define MQTT_TLS_PORT "8883" #define MQTT_BUFFER_LENGTH 8192 #define MQTT_KEEPALIVE 10 +#define MQTT_VERSION 0x05 enum { MSG_RESERVED = 0x00, @@ -47,6 +48,8 @@ typedef struct /*_mqtt_instance_data*/ { char** channel; int fd; + uint8_t receive_buffer[MQTT_BUFFER_LENGTH]; + size_t receive_offset; } mqtt_instance_data; //per-channel -- cgit v1.2.3