From 670f3ff21f30b8aae765d61a38a7269d0eaab19d Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 14 Feb 2021 00:21:47 +0100 Subject: Implement MQTT5 topic aliases --- backends/mqtt.c | 197 +++++++++++++++++++++++++++++++++++++++++++------------- backends/mqtt.h | 2 + 2 files changed, 155 insertions(+), 44 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index 8bff531..29f0436 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -46,7 +46,6 @@ static struct { /* * TODO * * proper RETAIN handling - * * use topic aliases if possible * * mqtt v3.1.1 local filtering * * modifiable output mappings * * TLS @@ -138,10 +137,52 @@ static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ offset++; } while(buffer[offset - 1] & 0x80); - *result = value; + if(result){ + *result = value; + } return offset; } +static size_t mqtt_pop_property(uint8_t* buffer, size_t bytes){ + size_t length = 0, u; + + if(bytes){ + for(u = 0; u < sizeof(property_lengths)/sizeof(property_lengths[0]); u++){ + if(property_lengths[u].property == buffer[0]){ + switch(property_lengths[u].storage){ + case STORAGE_U8: + return 2; + case STORAGE_U16: + return 3; + case STORAGE_U32: + return 5; + case STORAGE_VARINT: + return mqtt_pop_varint(buffer + 1, bytes - 1, NULL) + 1; + case STORAGE_PREFIXED: + if(bytes >= 3){ + return ((buffer[1] << 8) | buffer[2]) + 1; + } + //best-effort guess + return 3; + case STORAGE_PREFIXPAIR: + if(bytes >= 3){ + length = ((buffer[1] << 8) | buffer[2]); + if(bytes >= length + 5){ + return (1 + 2 + length + 2 + ((buffer[length + 3] << 8) | buffer[length + 4])); + } + return length + 3; + } + //best-effort guess + return 5; + } + } + } + } + + LOGPF("Storage class for property %02X was unknown", buffer[0]); + return 1; +} + static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){ //implementation conforming to spec 1.5.5 size_t offset = 0; @@ -191,8 +232,18 @@ static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t u; + data->last_control = 0; + //reset aliases as they can not be reused across sessions + data->server_max_alias = 0; + data->current_alias = 1; + for(u = 0; u < data->nchannels; u++){ + data->channel[u].topic_alias_sent = 0; + data->channel[u].topic_alias_rcvd = 0; + } + //unmanage the fd mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -241,11 +292,12 @@ static int mqtt_reconnect(instance* inst){ mqtt_disconnect(inst); } - LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)", + LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s, Protocol: %s)", inst->name, data->host, data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), data->tls ? "yes " : "no", - (data->user || data->password) ? "yes" : "no"); + (data->user || data->password) ? "yes" : "no", + (data->mqtt_version == 0x05) ? "v5" : "v3.1.1"); data->fd = mmbackend_socket(data->host, data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), @@ -260,9 +312,6 @@ static int mqtt_reconnect(instance* inst){ variable_header[6] = data->mqtt_version; variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); - //TODO set session expiry interval option - //TODO re-use previos session on reconnect - if(data->mqtt_version == 0x05){ //mqtt v5 has additional options //push number of option bytes (as a varint, no less) before actually pushing the option data. //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. @@ -381,6 +430,7 @@ static int mqtt_instance(instance* inst){ data->fd = -1; data->mqtt_version = MQTT_VERSION_DEFAULT; data->packet_identifier = 1; + data->current_alias = 1; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -443,22 +493,40 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ for(u = 0; u < num; u++){ vh_length = payload_length = 0; - if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){ - //push zero-length topic - variable_header[vh_length++] = 0; - variable_header[vh_length++] = 0; + if(data->mqtt_version == 0x05){ + if(data->channel[c[u]->ident].topic_alias_sent){ + //push zero-length topic + variable_header[vh_length++] = 0; + variable_header[vh_length++] = 0; + } + else{ + //push topic + vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); + //generate topic alias if possible + if(data->current_alias <= data->server_max_alias){ + data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; + DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + } + } //push property length - variable_header[vh_length++] = 5; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent) ? 5 : 2; //push payload type (0x01) variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; - //push topic alias (0x23) - variable_header[vh_length++] = 0x23; - variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; - variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + if(data->channel[c[u]->ident].topic_alias_sent){ + //push topic alias (0x23) + variable_header[vh_length++] = 0x23; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; + variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + } + + payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); + payload[0] = (payload_length >> 8) & 0xFF; + payload[1] = payload_length & 0xFF; + payload_length += 2; } else{ //push topic @@ -471,12 +539,9 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; } + payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); } - payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); - //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); - //payload[0] = (payload_length >> 8) & 0xFF; - //payload[1] = payload_length & 0xFF; - //payload_length += 2; + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); } @@ -519,6 +584,7 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h channel* changed = NULL; channel_value val; uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; + uint16_t topic_alias = 0; uint32_t property_length = 0; size_t u = data->nchannels, property_offset, payload_offset, payload_length; size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); @@ -529,23 +595,42 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); payload_offset += property_length; - //TODO parse properties - //find topic alias - //find type code + property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); + //parse properties + while(property_offset < payload_offset){ + DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length); + //read payload format indicator + if(variable_header[property_offset] == 0x01){ + content_utf8 = variable_header[property_offset + 1]; + } + //read topic alias + else if(variable_header[property_offset] == 0x23){ + topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; + } + + property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); + } } //match via topic alias - if(topic_length == 0){ - //TODO match topic aliases - //TODO build topic alias database + if(!topic_length && topic_alias){ + for(u = 0; u < data->nchannels; u++){ + if(data->channel[u].topic_alias_rcvd == topic_alias){ + break; + } + } } //match via topic - else{ + else if(topic_length){ for(u = 0; u < data->nchannels; u++){ if(!strncmp(data->channel[u].topic, topic, topic_length)){ break; } } + + if(topic_alias){ + data->channel[u].topic_alias_rcvd = topic_alias; + } } if(content_utf8){ @@ -569,26 +654,50 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h return 0; } -static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t property_offset = 2; + + if(length >= 2){ + if(variable_header[1]){ + if(variable_header[1] == 1 && data->mqtt_version == 0x05){ + LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); + data->mqtt_version = 0x04; + return 0; + } + LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); + mqtt_disconnect(inst); + return 0; + } - switch(type){ - case MSG_CONNACK: - if(length >= 2){ - if(variable_header[1]){ - if(variable_header[1] == 1 && data->mqtt_version == 0x05){ - LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); - data->mqtt_version = 0x04; - return 0; - } - LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); - } - else{ - LOGPF("Connection on %s established", inst->name); - return mqtt_push_subscriptions(inst); + //parse response properties if present + if(data->mqtt_version == 0x05){ + property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); + while(property_offset < length){ + DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); + + //read maximum topic alias + if(variable_header[property_offset] == 0x22){ + data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; + DBGPF("Connection supports maximum connection alias %" PRIu16, data->server_max_alias); } + + property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); } - break; + } + + LOGPF("Connection on %s established", inst->name); + return mqtt_push_subscriptions(inst); + } + + LOGPF("Received malformed CONNACK on %s", inst->name); + return 1; +} + +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + switch(type){ + case MSG_CONNACK: + return mqtt_handle_connack(inst, type, variable_header, length); case MSG_PINGRESP: case MSG_SUBACK: //ignore most responses diff --git a/backends/mqtt.h b/backends/mqtt.h index 0cb2617..d40e83d 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -73,4 +73,6 @@ typedef struct /*_mqtt_instance_data*/ { uint64_t last_control; uint16_t packet_identifier; + uint16_t server_max_alias; + uint16_t current_alias; } mqtt_instance_data; -- cgit v1.2.3