aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r--backends/mqtt.c118
1 files changed, 117 insertions, 1 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 00045c0..8bff531 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -8,6 +8,40 @@
#include "mqtt.h"
static uint64_t last_maintenance = 0;
+/* according to spec 2.2.2.2 */
+static struct {
+ uint8_t property;
+ uint8_t storage;
+} property_lengths[] = {
+ {0x01, STORAGE_U8},
+ {0x02, STORAGE_U32},
+ {0x03, STORAGE_PREFIXED},
+ {0x08, STORAGE_PREFIXED},
+ {0x09, STORAGE_PREFIXED},
+ {0x0B, STORAGE_VARINT},
+ {0x11, STORAGE_U32},
+
+ {0x12, STORAGE_PREFIXED},
+ {0x13, STORAGE_U16},
+ {0x15, STORAGE_PREFIXED},
+ {0x16, STORAGE_PREFIXED},
+ {0x17, STORAGE_U8},
+ {0x18, STORAGE_U32},
+ {0x19, STORAGE_U8},
+ {0x1A, STORAGE_PREFIXED},
+ {0x1C, STORAGE_PREFIXED},
+ {0x1F, STORAGE_PREFIXED},
+ {0x21, STORAGE_U16},
+ {0x22, STORAGE_U16},
+ {0x23, STORAGE_U16},
+ {0x24, STORAGE_U8},
+ {0x25, STORAGE_U8},
+ {0x26, STORAGE_PREFIXPAIR},
+ {0x27, STORAGE_U32},
+ {0x28, STORAGE_U8},
+ {0x29, STORAGE_U8},
+ {0x2A, STORAGE_U8}
+};
/*
* TODO
@@ -15,6 +49,8 @@ static uint64_t last_maintenance = 0;
* * use topic aliases if possible
* * mqtt v3.1.1 local filtering
* * modifiable output mappings
+ * * TLS
+ * * JSON subchannels
*/
MM_PLUGIN_API int init(){
@@ -91,7 +127,7 @@ static int mqtt_generate_instanceid(instance* inst){
return mmbackend_strdup(&(data->client_id), clientid);
}
-static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){
+static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){
size_t value = 0, offset = 0;
do {
if(offset >= len){
@@ -138,6 +174,21 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten
return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content));
}
+static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){
+ size_t length = 0;
+ *data = NULL;
+
+ if(buffer_length < 2){
+ return 0;
+ }
+
+ length = (buffer[0] << 8) | buffer[1];
+ if(buffer_length >= length + 2){
+ *data = (char*) buffer + 2;
+ }
+ return length;
+}
+
static void mqtt_disconnect(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
data->last_control = 0;
@@ -305,6 +356,11 @@ static int mqtt_push_subscriptions(instance* inst){
payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0;
data->packet_identifier++;
+ //zero is not a valid packet identifier
+ if(!data->packet_identifier){
+ data->packet_identifier++;
+ }
+
mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload);
subs++;
}
@@ -324,6 +380,7 @@ static int mqtt_instance(instance* inst){
data->fd = -1;
data->mqtt_version = MQTT_VERSION_DEFAULT;
+ data->packet_identifier = 1;
inst->impl = data;
if(mqtt_generate_instanceid(inst)){
@@ -456,6 +513,62 @@ static int mqtt_maintenance(){
return 0;
}
+static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ char* topic = NULL, *payload = NULL;
+ channel* changed = NULL;
+ channel_value val;
+ uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0;
+ uint32_t property_length = 0;
+ size_t u = data->nchannels, property_offset, payload_offset, payload_length;
+ size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic);
+
+ property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0);
+ if(data->mqtt_version == 0x05){
+ //read properties length
+ payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length);
+ payload_offset += property_length;
+
+ //TODO parse properties
+ //find topic alias
+ //find type code
+ }
+
+ //match via topic alias
+ if(topic_length == 0){
+ //TODO match topic aliases
+ //TODO build topic alias database
+ }
+ //match via topic
+ else{
+ for(u = 0; u < data->nchannels; u++){
+ if(!strncmp(data->channel[u].topic, topic, topic_length)){
+ break;
+ }
+ }
+ }
+
+ if(content_utf8){
+ payload_length = mqtt_pop_utf8(variable_header + payload_offset, length - payload_offset, &payload);
+ }
+ else{
+ payload_length = length - payload_offset;
+ payload = (char*) (variable_header + payload_offset);
+ }
+
+ if(u != data->nchannels && payload_length && payload){
+ DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length);
+ //FIXME implement json subchannels
+ //FIXME implement input mappings
+ changed = mm_channel(inst, u, 0);
+ if(changed){
+ val.normalised = strtod(payload, NULL);
+ mm_channel_event(changed, val);
+ }
+ }
+ return 0;
+}
+
static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
@@ -482,6 +595,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h
//FIXME error check SUBACK
break;
default:
+ if((type & 0xF0) == MSG_PUBLISH){
+ return mqtt_handle_publish(inst, type, variable_header, length);
+ }
LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);
}
return 0;