aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--backends/mqtt.c75
-rw-r--r--backends/mqtt.h3
2 files changed, 73 insertions, 5 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index d3e516f..00045c0 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -9,6 +9,14 @@
static uint64_t last_maintenance = 0;
+/*
+ * TODO
+ * * proper RETAIN handling
+ * * use topic aliases if possible
+ * * mqtt v3.1.1 local filtering
+ * * modifiable output mappings
+ */
+
MM_PLUGIN_API int init(){
backend mqtt = {
.name = BACKEND_NAME,
@@ -267,13 +275,20 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
return mqtt_generate_instanceid(inst);
}
}
+ else if(!strcmp(option, "protocol")){
+ data->mqtt_version = MQTT_VERSION_DEFAULT;
+ if(!strcmp(value, "3.1.1")){
+ data->mqtt_version = 4;
+ }
+ return 0;
+ }
LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name);
return 1;
}
static int mqtt_push_subscriptions(instance* inst){
- mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
uint8_t variable_header[3] = {0};
uint8_t payload[MQTT_BUFFER_LENGTH];
size_t u, subs = 0, payload_offset = 0;
@@ -282,6 +297,7 @@ static int mqtt_push_subscriptions(instance* inst){
for(u = 0; u < data->nchannels; u++){
payload_offset = 0;
if(data->channel[u].flags & mmchannel_input){
+ DBGPF("Subscribing %s.%s, channel %" PRIsize_t ", flags %d", inst->name, data->channel[u].topic, u, data->channel[u].flags);
variable_header[0] = (data->packet_identifier >> 8) & 0xFF;
variable_header[1] = (data->packet_identifier) & 0xFF;
@@ -330,6 +346,7 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
for(u = 0; u < data->nchannels; u++){
if(!strcmp(spec, data->channel[u].topic)){
data->channel[u].flags |= flags;
+ DBGPF("Reusing existing channel %" PRIsize_t " for spec %s.%s, flags are now %02X", u, inst->name, spec, data->channel[u].flags);
break;
}
}
@@ -343,13 +360,16 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
}
data->channel[u].topic = strdup(spec);
- data->channel[u].topic_alias = 0;
+ data->channel[u].topic_alias_sent = 0;
+ data->channel[u].topic_alias_rcvd = 0;
data->channel[u].flags = flags;
if(!data->channel[u].topic){
LOG("Failed to allocate memory");
return NULL;
}
+
+ DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags);
data->nchannels++;
}
@@ -357,7 +377,52 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
}
static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
- //TODO mqtt v3.1.1 local filtering
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ uint8_t variable_header[MQTT_BUFFER_LENGTH];
+ uint8_t payload[MQTT_BUFFER_LENGTH];
+ size_t vh_length = 0, payload_length = 0;
+ size_t u;
+
+ for(u = 0; u < num; u++){
+ vh_length = payload_length = 0;
+
+ if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){
+ //push zero-length topic
+ variable_header[vh_length++] = 0;
+ variable_header[vh_length++] = 0;
+
+ //push property length
+ variable_header[vh_length++] = 5;
+
+ //push payload type (0x01)
+ variable_header[vh_length++] = 0x01;
+ variable_header[vh_length++] = 1;
+
+ //push topic alias (0x23)
+ variable_header[vh_length++] = 0x23;
+ variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF;
+ variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF;
+ }
+ else{
+ //push topic
+ vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic);
+ if(data->mqtt_version == 0x05){
+ //push property length
+ variable_header[vh_length++] = 2;
+
+ //push payload type (0x01)
+ variable_header[vh_length++] = 0x01;
+ variable_header[vh_length++] = 1;
+ }
+ }
+ payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised);
+ //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised);
+ //payload[0] = (payload_length >> 8) & 0xFF;
+ //payload[1] = payload_length & 0xFF;
+ //payload_length += 2;
+ mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);
+ }
+
return 0;
}
@@ -412,7 +477,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h
}
break;
case MSG_PINGRESP:
- //ignore ping responses
+ case MSG_SUBACK:
+ //ignore most responses
+ //FIXME error check SUBACK
break;
default:
LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);
diff --git a/backends/mqtt.h b/backends/mqtt.h
index c9bce81..6483364 100644
--- a/backends/mqtt.h
+++ b/backends/mqtt.h
@@ -40,7 +40,8 @@ enum {
//qos, subscribe
typedef struct /*_mqtt_channel*/ {
char* topic;
- uint16_t topic_alias;
+ uint16_t topic_alias_sent;
+ uint16_t topic_alias_rcvd;
uint8_t flags;
} mqtt_channel_data;