From 983313fe782047a5fe84314fafa2eb0b56e0e3a1 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 6 Feb 2021 22:51:54 +0100 Subject: Implement MQTT output --- backends/mqtt.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- backends/mqtt.h | 3 ++- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index d3e516f..00045c0 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -9,6 +9,14 @@ static uint64_t last_maintenance = 0; +/* + * TODO + * * proper RETAIN handling + * * use topic aliases if possible + * * mqtt v3.1.1 local filtering + * * modifiable output mappings + */ + MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -267,13 +275,20 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return mqtt_generate_instanceid(inst); } } + else if(!strcmp(option, "protocol")){ + data->mqtt_version = MQTT_VERSION_DEFAULT; + if(!strcmp(value, "3.1.1")){ + data->mqtt_version = 4; + } + return 0; + } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); return 1; } static int mqtt_push_subscriptions(instance* inst){ - mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; uint8_t variable_header[3] = {0}; uint8_t payload[MQTT_BUFFER_LENGTH]; size_t u, subs = 0, payload_offset = 0; @@ -282,6 +297,7 @@ static int mqtt_push_subscriptions(instance* inst){ for(u = 0; u < data->nchannels; u++){ payload_offset = 0; if(data->channel[u].flags & mmchannel_input){ + DBGPF("Subscribing %s.%s, channel %" PRIsize_t ", flags %d", inst->name, data->channel[u].topic, u, data->channel[u].flags); variable_header[0] = (data->packet_identifier >> 8) & 0xFF; variable_header[1] = (data->packet_identifier) & 0xFF; @@ -330,6 +346,7 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ for(u = 0; u < data->nchannels; u++){ if(!strcmp(spec, data->channel[u].topic)){ data->channel[u].flags |= flags; + DBGPF("Reusing existing channel %" PRIsize_t " for spec %s.%s, flags are now %02X", u, inst->name, spec, data->channel[u].flags); break; } } @@ -343,13 +360,16 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ } data->channel[u].topic = strdup(spec); - data->channel[u].topic_alias = 0; + data->channel[u].topic_alias_sent = 0; + data->channel[u].topic_alias_rcvd = 0; data->channel[u].flags = flags; if(!data->channel[u].topic){ LOG("Failed to allocate memory"); return NULL; } + + DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags); data->nchannels++; } @@ -357,7 +377,52 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ } static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ - //TODO mqtt v3.1.1 local filtering + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + uint8_t variable_header[MQTT_BUFFER_LENGTH]; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t vh_length = 0, payload_length = 0; + size_t u; + + for(u = 0; u < num; u++){ + vh_length = payload_length = 0; + + if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){ + //push zero-length topic + variable_header[vh_length++] = 0; + variable_header[vh_length++] = 0; + + //push property length + variable_header[vh_length++] = 5; + + //push payload type (0x01) + variable_header[vh_length++] = 0x01; + variable_header[vh_length++] = 1; + + //push topic alias (0x23) + variable_header[vh_length++] = 0x23; + variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; + variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; + } + else{ + //push topic + vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); + if(data->mqtt_version == 0x05){ + //push property length + variable_header[vh_length++] = 2; + + //push payload type (0x01) + variable_header[vh_length++] = 0x01; + variable_header[vh_length++] = 1; + } + } + payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); + //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); + //payload[0] = (payload_length >> 8) & 0xFF; + //payload[1] = payload_length & 0xFF; + //payload_length += 2; + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); + } + return 0; } @@ -412,7 +477,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } break; case MSG_PINGRESP: - //ignore ping responses + case MSG_SUBACK: + //ignore most responses + //FIXME error check SUBACK break; default: LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); diff --git a/backends/mqtt.h b/backends/mqtt.h index c9bce81..6483364 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -40,7 +40,8 @@ enum { //qos, subscribe typedef struct /*_mqtt_channel*/ { char* topic; - uint16_t topic_alias; + uint16_t topic_alias_sent; + uint16_t topic_alias_rcvd; uint8_t flags; } mqtt_channel_data; -- cgit v1.2.3