aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r--backends/mqtt.c197
1 files changed, 153 insertions, 44 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 8bff531..29f0436 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -46,7 +46,6 @@ static struct {
/*
* TODO
* * proper RETAIN handling
- * * use topic aliases if possible
* * mqtt v3.1.1 local filtering
* * modifiable output mappings
* * TLS
@@ -138,10 +137,52 @@ static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){
offset++;
} while(buffer[offset - 1] & 0x80);
- *result = value;
+ if(result){
+ *result = value;
+ }
return offset;
}
+static size_t mqtt_pop_property(uint8_t* buffer, size_t bytes){
+ size_t length = 0, u;
+
+ if(bytes){
+ for(u = 0; u < sizeof(property_lengths)/sizeof(property_lengths[0]); u++){
+ if(property_lengths[u].property == buffer[0]){
+ switch(property_lengths[u].storage){
+ case STORAGE_U8:
+ return 2;
+ case STORAGE_U16:
+ return 3;
+ case STORAGE_U32:
+ return 5;
+ case STORAGE_VARINT:
+ return mqtt_pop_varint(buffer + 1, bytes - 1, NULL) + 1;
+ case STORAGE_PREFIXED:
+ if(bytes >= 3){
+ return ((buffer[1] << 8) | buffer[2]) + 1;
+ }
+ //best-effort guess
+ return 3;
+ case STORAGE_PREFIXPAIR:
+ if(bytes >= 3){
+ length = ((buffer[1] << 8) | buffer[2]);
+ if(bytes >= length + 5){
+ return (1 + 2 + length + 2 + ((buffer[length + 3] << 8) | buffer[length + 4]));
+ }
+ return length + 3;
+ }
+ //best-effort guess
+ return 5;
+ }
+ }
+ }
+ }
+
+ LOGPF("Storage class for property %02X was unknown", buffer[0]);
+ return 1;
+}
+
static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){
//implementation conforming to spec 1.5.5
size_t offset = 0;
@@ -191,8 +232,18 @@ static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){
static void mqtt_disconnect(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ size_t u;
+
data->last_control = 0;
+ //reset aliases as they can not be reused across sessions
+ data->server_max_alias = 0;
+ data->current_alias = 1;
+ for(u = 0; u < data->nchannels; u++){
+ data->channel[u].topic_alias_sent = 0;
+ data->channel[u].topic_alias_rcvd = 0;
+ }
+
//unmanage the fd
mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
@@ -241,11 +292,12 @@ static int mqtt_reconnect(instance* inst){
mqtt_disconnect(inst);
}
- LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)",
+ LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s, Protocol: %s)",
inst->name, data->host,
data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT),
data->tls ? "yes " : "no",
- (data->user || data->password) ? "yes" : "no");
+ (data->user || data->password) ? "yes" : "no",
+ (data->mqtt_version == 0x05) ? "v5" : "v3.1.1");
data->fd = mmbackend_socket(data->host,
data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT),
@@ -260,9 +312,6 @@ static int mqtt_reconnect(instance* inst){
variable_header[6] = data->mqtt_version;
variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00);
- //TODO set session expiry interval option
- //TODO re-use previos session on reconnect
-
if(data->mqtt_version == 0x05){ //mqtt v5 has additional options
//push number of option bytes (as a varint, no less) before actually pushing the option data.
//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart..
@@ -381,6 +430,7 @@ static int mqtt_instance(instance* inst){
data->fd = -1;
data->mqtt_version = MQTT_VERSION_DEFAULT;
data->packet_identifier = 1;
+ data->current_alias = 1;
inst->impl = data;
if(mqtt_generate_instanceid(inst)){
@@ -443,22 +493,40 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
for(u = 0; u < num; u++){
vh_length = payload_length = 0;
- if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){
- //push zero-length topic
- variable_header[vh_length++] = 0;
- variable_header[vh_length++] = 0;
+ if(data->mqtt_version == 0x05){
+ if(data->channel[c[u]->ident].topic_alias_sent){
+ //push zero-length topic
+ variable_header[vh_length++] = 0;
+ variable_header[vh_length++] = 0;
+ }
+ else{
+ //push topic
+ vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic);
+ //generate topic alias if possible
+ if(data->current_alias <= data->server_max_alias){
+ data->channel[c[u]->ident].topic_alias_sent = data->current_alias++;
+ DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic);
+ }
+ }
//push property length
- variable_header[vh_length++] = 5;
+ variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent) ? 5 : 2;
//push payload type (0x01)
variable_header[vh_length++] = 0x01;
variable_header[vh_length++] = 1;
- //push topic alias (0x23)
- variable_header[vh_length++] = 0x23;
- variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF;
- variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF;
+ if(data->channel[c[u]->ident].topic_alias_sent){
+ //push topic alias (0x23)
+ variable_header[vh_length++] = 0x23;
+ variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF;
+ variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF;
+ }
+
+ payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised);
+ payload[0] = (payload_length >> 8) & 0xFF;
+ payload[1] = payload_length & 0xFF;
+ payload_length += 2;
}
else{
//push topic
@@ -471,12 +539,9 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
variable_header[vh_length++] = 0x01;
variable_header[vh_length++] = 1;
}
+ payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised);
}
- payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised);
- //payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised);
- //payload[0] = (payload_length >> 8) & 0xFF;
- //payload[1] = payload_length & 0xFF;
- //payload_length += 2;
+
mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);
}
@@ -519,6 +584,7 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h
channel* changed = NULL;
channel_value val;
uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0;
+ uint16_t topic_alias = 0;
uint32_t property_length = 0;
size_t u = data->nchannels, property_offset, payload_offset, payload_length;
size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic);
@@ -529,23 +595,42 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h
payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length);
payload_offset += property_length;
- //TODO parse properties
- //find topic alias
- //find type code
+ property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL);
+ //parse properties
+ while(property_offset < payload_offset){
+ DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length);
+ //read payload format indicator
+ if(variable_header[property_offset] == 0x01){
+ content_utf8 = variable_header[property_offset + 1];
+ }
+ //read topic alias
+ else if(variable_header[property_offset] == 0x23){
+ topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2];
+ }
+
+ property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset);
+ }
}
//match via topic alias
- if(topic_length == 0){
- //TODO match topic aliases
- //TODO build topic alias database
+ if(!topic_length && topic_alias){
+ for(u = 0; u < data->nchannels; u++){
+ if(data->channel[u].topic_alias_rcvd == topic_alias){
+ break;
+ }
+ }
}
//match via topic
- else{
+ else if(topic_length){
for(u = 0; u < data->nchannels; u++){
if(!strncmp(data->channel[u].topic, topic, topic_length)){
break;
}
}
+
+ if(topic_alias){
+ data->channel[u].topic_alias_rcvd = topic_alias;
+ }
}
if(content_utf8){
@@ -569,26 +654,50 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h
return 0;
}
-static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
+static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ size_t property_offset = 2;
+
+ if(length >= 2){
+ if(variable_header[1]){
+ if(variable_header[1] == 1 && data->mqtt_version == 0x05){
+ LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name);
+ data->mqtt_version = 0x04;
+ return 0;
+ }
+ LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]);
+ mqtt_disconnect(inst);
+ return 0;
+ }
- switch(type){
- case MSG_CONNACK:
- if(length >= 2){
- if(variable_header[1]){
- if(variable_header[1] == 1 && data->mqtt_version == 0x05){
- LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name);
- data->mqtt_version = 0x04;
- return 0;
- }
- LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]);
- }
- else{
- LOGPF("Connection on %s established", inst->name);
- return mqtt_push_subscriptions(inst);
+ //parse response properties if present
+ if(data->mqtt_version == 0x05){
+ property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL);
+ while(property_offset < length){
+ DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length);
+
+ //read maximum topic alias
+ if(variable_header[property_offset] == 0x22){
+ data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2];
+ DBGPF("Connection supports maximum connection alias %" PRIu16, data->server_max_alias);
}
+
+ property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset);
}
- break;
+ }
+
+ LOGPF("Connection on %s established", inst->name);
+ return mqtt_push_subscriptions(inst);
+ }
+
+ LOGPF("Received malformed CONNACK on %s", inst->name);
+ return 1;
+}
+
+static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
+ switch(type){
+ case MSG_CONNACK:
+ return mqtt_handle_connack(inst, type, variable_header, length);
case MSG_PINGRESP:
case MSG_SUBACK:
//ignore most responses