From cbef6a61a92453afba5005c287873001354f5090 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 9 Jan 2021 11:36:09 +0100 Subject: Implement keepalive and protocol fallback --- backends/mqtt.c | 56 +++++++++++++++++++++++++++++++++++++------------------- backends/mqtt.h | 5 ++++- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index d42d1f5..e9493a9 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -135,6 +135,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + data->last_control = 0; //unmanage the fd mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -153,13 +154,14 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); if(mmbackend_send(data->fd, fixed_header, offset) - || mmbackend_send(data->fd, vh, vh_length) - || mmbackend_send(data->fd, payload, payload_length)){ + || (vh && vh_length && mmbackend_send(data->fd, vh, vh_length)) + || (payload && payload_length && mmbackend_send(data->fd, payload, payload_length))){ LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name); mqtt_disconnect(inst); return 1; } + data->last_control = mm_timestamp(); return 0; } @@ -169,7 +171,7 @@ static int mqtt_configure(char* option, char* value){ } static int mqtt_reconnect(instance* inst){ - uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION_DEFAULT, 0x00 /*flags*/, ((MQTT_KEEPALIVE * 2) >> 8) & 0xFF, (MQTT_KEEPALIVE * 2) & 0xFF}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t vh_offset = 10, payload_offset = 0; mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -198,25 +200,28 @@ static int mqtt_reconnect(instance* inst){ return 1; } - //prepare CONNECT message flags + //prepare CONNECT message header + variable_header[6] = data->mqtt_version; variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); //TODO set session expiry interval option //TODO re-use previos session on reconnect - //push number of option bytes (as a varint, no less) before actually pushing the option data. - //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. - variable_header[vh_offset++] = 8; - //push maximum packet size option - variable_header[vh_offset++] = 0x27; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; - variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; - //push topic alias maximum option - variable_header[vh_offset++] = 0x22; - variable_header[vh_offset++] = 0xFF; - variable_header[vh_offset++] = 0xFF; + if(data->mqtt_version == 0x05){ //mqtt v5 has additional options + //push number of option bytes (as a varint, no less) before actually pushing the option data. + //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. + variable_header[vh_offset++] = 8; + //push maximum packet size option + variable_header[vh_offset++] = 0x27; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; + //push topic alias maximum option + variable_header[vh_offset++] = 0x22; + variable_header[vh_offset++] = 0xFF; + variable_header[vh_offset++] = 0xFF; + } //prepare CONNECT payload //push client id @@ -279,6 +284,7 @@ static int mqtt_instance(instance* inst){ } data->fd = -1; + data->mqtt_version = MQTT_VERSION_DEFAULT; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -317,6 +323,10 @@ static int mqtt_maintenance(){ return 1; } } + else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ + //send keepalive ping requests + mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + } } free(inst); @@ -324,10 +334,17 @@ static int mqtt_maintenance(){ } static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + switch(type){ case MSG_CONNACK: if(length >= 2){ if(variable_header[1]){ + if(variable_header[1] == 1 && data->mqtt_version == 0x05){ + LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); + data->mqtt_version = 0x04; + return 0; + } LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); } else{ @@ -335,6 +352,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } } break; + case MSG_PINGRESP: + //ignore ping responses + break; default: LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); } @@ -379,13 +399,11 @@ static int mqtt_handle_fd(instance* inst){ } } - data->receive_offset += bytes_read; return 0; } static int mqtt_handle(size_t num, managed_fd* fds){ size_t n = 0; - int rv = 0; for(n = 0; n < num; n++){ if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ diff --git a/backends/mqtt.h b/backends/mqtt.h index df63319..a0f5356 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -14,7 +14,7 @@ static int mqtt_shutdown(size_t n, instance** inst); #define MQTT_TLS_PORT "8883" #define MQTT_BUFFER_LENGTH 8192 #define MQTT_KEEPALIVE 10 -#define MQTT_VERSION 0x05 +#define MQTT_VERSION_DEFAULT 0x05 enum { MSG_RESERVED = 0x00, @@ -39,6 +39,7 @@ typedef struct /*_mqtt_instance_data*/ { uint8_t tls; char* host; char* port; + uint8_t mqtt_version; char* user; char* password; @@ -50,6 +51,8 @@ typedef struct /*_mqtt_instance_data*/ { int fd; uint8_t receive_buffer[MQTT_BUFFER_LENGTH]; size_t receive_offset; + + uint64_t last_control; } mqtt_instance_data; //per-channel -- cgit v1.2.3