From fdd8f8075fbcb22d349135bd87bf95b8ce88c8e1 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 13 Feb 2021 16:58:12 +0100 Subject: Implement basic MQTT reception --- backends/mqtt.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- backends/mqtt.h | 9 +++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index 00045c0..8bff531 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -8,6 +8,40 @@ #include "mqtt.h" static uint64_t last_maintenance = 0; +/* according to spec 2.2.2.2 */ +static struct { + uint8_t property; + uint8_t storage; +} property_lengths[] = { + {0x01, STORAGE_U8}, + {0x02, STORAGE_U32}, + {0x03, STORAGE_PREFIXED}, + {0x08, STORAGE_PREFIXED}, + {0x09, STORAGE_PREFIXED}, + {0x0B, STORAGE_VARINT}, + {0x11, STORAGE_U32}, + + {0x12, STORAGE_PREFIXED}, + {0x13, STORAGE_U16}, + {0x15, STORAGE_PREFIXED}, + {0x16, STORAGE_PREFIXED}, + {0x17, STORAGE_U8}, + {0x18, STORAGE_U32}, + {0x19, STORAGE_U8}, + {0x1A, STORAGE_PREFIXED}, + {0x1C, STORAGE_PREFIXED}, + {0x1F, STORAGE_PREFIXED}, + {0x21, STORAGE_U16}, + {0x22, STORAGE_U16}, + {0x23, STORAGE_U16}, + {0x24, STORAGE_U8}, + {0x25, STORAGE_U8}, + {0x26, STORAGE_PREFIXPAIR}, + {0x27, STORAGE_U32}, + {0x28, STORAGE_U8}, + {0x29, STORAGE_U8}, + {0x2A, STORAGE_U8} +}; /* * TODO @@ -15,6 +49,8 @@ static uint64_t last_maintenance = 0; * * use topic aliases if possible * * mqtt v3.1.1 local filtering * * modifiable output mappings + * * TLS + * * JSON subchannels */ MM_PLUGIN_API int init(){ @@ -91,7 +127,7 @@ static int mqtt_generate_instanceid(instance* inst){ return mmbackend_strdup(&(data->client_id), clientid); } -static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ +static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ size_t value = 0, offset = 0; do { if(offset >= len){ @@ -138,6 +174,21 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); } +static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ + size_t length = 0; + *data = NULL; + + if(buffer_length < 2){ + return 0; + } + + length = (buffer[0] << 8) | buffer[1]; + if(buffer_length >= length + 2){ + *data = (char*) buffer + 2; + } + return length; +} + static void mqtt_disconnect(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; data->last_control = 0; @@ -305,6 +356,11 @@ static int mqtt_push_subscriptions(instance* inst){ payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0; data->packet_identifier++; + //zero is not a valid packet identifier + if(!data->packet_identifier){ + data->packet_identifier++; + } + mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload); subs++; } @@ -324,6 +380,7 @@ static int mqtt_instance(instance* inst){ data->fd = -1; data->mqtt_version = MQTT_VERSION_DEFAULT; + data->packet_identifier = 1; inst->impl = data; if(mqtt_generate_instanceid(inst)){ @@ -456,6 +513,62 @@ static int mqtt_maintenance(){ return 0; } +static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* topic = NULL, *payload = NULL; + channel* changed = NULL; + channel_value val; + uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; + uint32_t property_length = 0; + size_t u = data->nchannels, property_offset, payload_offset, payload_length; + size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); + + property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0); + if(data->mqtt_version == 0x05){ + //read properties length + payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); + payload_offset += property_length; + + //TODO parse properties + //find topic alias + //find type code + } + + //match via topic alias + if(topic_length == 0){ + //TODO match topic aliases + //TODO build topic alias database + } + //match via topic + else{ + for(u = 0; u < data->nchannels; u++){ + if(!strncmp(data->channel[u].topic, topic, topic_length)){ + break; + } + } + } + + if(content_utf8){ + payload_length = mqtt_pop_utf8(variable_header + payload_offset, length - payload_offset, &payload); + } + else{ + payload_length = length - payload_offset; + payload = (char*) (variable_header + payload_offset); + } + + if(u != data->nchannels && payload_length && payload){ + DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); + //FIXME implement json subchannels + //FIXME implement input mappings + changed = mm_channel(inst, u, 0); + if(changed){ + val.normalised = strtod(payload, NULL); + mm_channel_event(changed, val); + } + } + return 0; +} + static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -482,6 +595,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h //FIXME error check SUBACK break; default: + if((type & 0xF0) == MSG_PUBLISH){ + return mqtt_handle_publish(inst, type, variable_header, length); + } LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); } return 0; diff --git a/backends/mqtt.h b/backends/mqtt.h index 6483364..0cb2617 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -18,6 +18,15 @@ static int mqtt_shutdown(size_t n, instance** inst); #define MQTT5_NO_LOCAL 0x04 +enum /*_mqtt_property_storage_classes*/ { + STORAGE_U8, + STORAGE_U16, + STORAGE_U32, + STORAGE_VARINT, + STORAGE_PREFIXED, + STORAGE_PREFIXPAIR +}; + enum { MSG_RESERVED = 0x00, MSG_CONNECT = 0x10, -- cgit v1.2.3