aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r--backends/mqtt.c296
1 files changed, 237 insertions, 59 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 41fa772..36aed03 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -3,6 +3,7 @@
#include <string.h>
#include <time.h>
+#include <math.h>
#include "libmmbackend.h"
#include "mqtt.h"
@@ -46,8 +47,6 @@ static struct {
/*
* TODO
* * proper RETAIN handling
- * * mqtt v3.1.1 local filtering
- * * modifiable output mappings
* * TLS
* * JSON subchannels
*/
@@ -218,7 +217,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten
static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){
size_t length = 0;
*data = NULL;
-
+
if(buffer_length < 2){
return 0;
}
@@ -349,6 +348,92 @@ static int mqtt_reconnect(instance* inst){
return 0;
}
+static int mqtt_configure_channel(instance* inst, char* option, char* value){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ char* next_token = NULL;
+ channel* configure = NULL;
+ uint8_t mark = 0;
+ mqtt_channel_value config = {
+ 0
+ };
+
+ if(!strncmp(value, "range ", 6)){
+ //we support min > max for range configurations
+ value += 6;
+
+ config.min = strtod(value, &next_token);
+ if(value == next_token){
+ LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option);
+ return 1;
+ }
+
+ config.max = strtod(next_token, &value);
+ if(value == next_token){
+ LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option);
+ return 1;
+ }
+ }
+ else if(!strncmp(value, "discrete ", 9)){
+ value += 9;
+
+ for(; *value && isspace(*value); value++){
+ }
+ if(value[0] == '!'){
+ mark = 1;
+ value++;
+ }
+ config.min = clamp(strtod(value, &next_token), 1.0, 0.0);
+ value = next_token;
+
+ for(; *value && isspace(*value); value++){
+ }
+ if(value[0] == '!'){
+ mark = 2;
+ value++;
+ }
+
+ config.max = clamp(strtod(value, &next_token), 1.0, 0.0);
+ value = next_token;
+ if(config.max < config.min){
+ LOGPF("Discrete topic configuration for %s.%s has invalid limit ordering", inst->name, option);
+ return 1;
+ }
+
+ for(; *value && isspace(*value); value++){
+ }
+
+ config.discrete = strdup(value);
+ config.normal = mark ? ((mark == 1) ? config.min : config.max) : (config.min + (config.max - config.min) / 2);
+ }
+ else{
+ LOGPF("Unknown instance configuration option or invalid preconfiguration %s on instance %s", option, inst->name);
+ return 1;
+ }
+
+ configure = mqtt_channel(inst, option, 0);
+ if(!configure
+ //if configuring scale, no other config is possible
+ || (!config.discrete && data->channel[configure->ident].values)
+ //if configuring discrete, the previous one can't be a a scale
+ || (config.discrete && data->channel[configure->ident].values && !data->channel[configure->ident].value[0].discrete)){
+ LOGPF("Failed to configure topic %s.%s", inst->name, option);
+ free(config.discrete);
+ return 1;
+ }
+
+ data->channel[configure->ident].value = realloc(data->channel[configure->ident].value, (data->channel[configure->ident].values + 1) * sizeof(mqtt_channel_value));
+ if(!data->channel[configure->ident].value){
+ LOG("Failed to allocate memory");
+ return 1;
+ }
+
+ DBGPF("Configuring value on %s.%s: min %f max %f normal %f discrete %s", inst->name, option, config.min, config.max, config.normal, config.discrete ? config.discrete : "-");
+ data->channel[configure->ident].value[data->channel[configure->ident].values] = config;
+ data->channel[configure->ident].values++;
+ DBGPF("Value configuration for %s.%s now at %" PRIsize_t " entries", inst->name, option, data->channel[configure->ident].values);
+ return 0;
+}
+
static int mqtt_configure_instance(instance* inst, char* option, char* value){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
@@ -383,8 +468,8 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
return 0;
}
- LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name);
- return 1;
+ //try to register as channel preconfig
+ return mqtt_configure_channel(inst, option, value);
}
static int mqtt_push_subscriptions(instance* inst){
@@ -470,12 +555,14 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
data->channel[u].topic_alias_sent = 0;
data->channel[u].topic_alias_rcvd = 0;
data->channel[u].flags = flags;
+ data->channel[u].values = 0;
+ data->channel[u].value = NULL;
if(!data->channel[u].topic){
LOG("Failed to allocate memory");
return NULL;
}
-
+
DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags);
data->nchannels++;
}
@@ -483,15 +570,125 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
return mm_channel(inst, u, 1);
}
+static int mqtt_maintenance(){
+ size_t n, u;
+ instance** inst = NULL;
+ mqtt_instance_data* data = NULL;
+
+ if(mm_backend_instances(BACKEND_NAME, &n, &inst)){
+ LOG("Failed to fetch instance list");
+ return 1;
+ }
+
+ DBGPF("Running maintenance operations on %" PRIsize_t " instances", n);
+ for(u = 0; u < n; u++){
+ data = (mqtt_instance_data*) inst[u]->impl;
+ if(data->fd <= 0){
+ if(mqtt_reconnect(inst[u]) >= 2){
+ LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name);
+ free(inst);
+ return 1;
+ }
+ }
+ else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){
+ //send keepalive ping requests
+ mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL);
+ }
+ }
+
+ free(inst);
+ return 0;
+}
+
+static int mqtt_deserialize(instance* inst, channel* output, mqtt_channel_data* input, char* buffer, size_t length){
+ char* next_token = NULL, conversion_buffer[1024] = {0};
+ channel_value val;
+ double range, raw;
+ size_t u;
+ //FIXME implement json subchannels
+
+ //unconfigured channel
+ if(!input->values){
+ //the original buffer is the result of an unterminated receive, move it over
+ memcpy(conversion_buffer, buffer, length);
+ val.normalised = clamp(strtod(conversion_buffer, &next_token), 1.0, 0.0);
+ if(conversion_buffer == next_token){
+ LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic);
+ return 1;
+ }
+ }
+ //ranged channel
+ else if(!input->value[0].discrete){
+ memcpy(conversion_buffer, buffer, length);
+ raw = clamp(strtod(conversion_buffer, &next_token), max(input->value[0].max, input->value[0].min), min(input->value[0].max, input->value[0].min));
+ if(conversion_buffer == next_token){
+ LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic);
+ return 1;
+ }
+ range = fabs(input->value[0].max - input->value[0].min);
+ val.normalised = (raw - input->value[0].min) / range;
+ if(input->value[0].max < input->value[0].min){
+ val.normalised = fabs(val.normalised);
+ }
+ }
+ else{
+ for(u = 0; u < input->values; u++){
+ if(length == strlen(input->value[u].discrete)
+ && !strncmp(input->value[u].discrete, buffer, length)){
+ val.normalised = input->value[u].normal;
+ break;
+ }
+ }
+
+ if(u == input->values){
+ LOGPF("Failed to parse incoming data for %s.%s, no matching discrete token", inst->name, input->topic);
+ return 1;
+ }
+ }
+
+ val.normalised = clamp(val.normalised, 1.0, 0.0);
+ mm_channel_event(output, val);
+ return 0;
+}
+
+static size_t mqtt_serialize(instance* inst, mqtt_channel_data* input, char* output, size_t length, double value){
+ double range;
+ size_t u, invert = 0;
+
+ //unconfigured channel
+ if(!input->values){
+ return snprintf(output, length, "%f", value);
+ }
+ //ranged channel
+ else if(!input->value[0].discrete){
+ range = fabs(input->value[0].max - input->value[0].min);
+ if(input->value[0].max < input->value[0].min){
+ invert = 1;
+ }
+ return snprintf(output, length, "%f", (value * range) * (invert ? -1 : 1) + input->value[0].min);
+ }
+ else{
+ for(u = 0; u < input->values; u++){
+ if(input->value[u].min <= value
+ && input->value[u].max >= value){
+ memcpy(output, input->value[u].discrete, min(strlen(input->value[u].discrete), length));
+ return min(strlen(input->value[u].discrete), length);
+ }
+ }
+ }
+
+ LOGPF("No discrete value on %s.%s defined for normalized value %f", inst->name, input->topic, value);
+ return 0;
+}
+
static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
uint8_t variable_header[MQTT_BUFFER_LENGTH];
- uint8_t payload[MQTT_BUFFER_LENGTH];
- size_t vh_length = 0, payload_length = 0;
- size_t u;
+ uint8_t payload[MQTT_BUFFER_LENGTH], alias_assigned = 0;
+ size_t vh_length = 0, payload_length = 0, u;
for(u = 0; u < num; u++){
- vh_length = payload_length = 0;
+ vh_length = payload_length = alias_assigned = 0;
if(data->mqtt_version == 0x05){
if(data->channel[c[u]->ident].topic_alias_sent){
@@ -506,6 +703,8 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
if(data->current_alias <= data->server_max_alias){
data->channel[c[u]->ident].topic_alias_sent = data->current_alias++;
DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic);
+
+ alias_assigned = 1;
}
}
@@ -523,10 +722,12 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF;
}
- payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised);
- payload[0] = (payload_length >> 8) & 0xFF;
- payload[1] = payload_length & 0xFF;
- payload_length += 2;
+ payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) (payload + 2), sizeof(payload) - 2, v[u].normalised);
+ if(payload_length){
+ payload[0] = (payload_length >> 8) & 0xFF;
+ payload[1] = payload_length & 0xFF;
+ payload_length += 2;
+ }
}
else{
//push topic
@@ -539,50 +740,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
variable_header[vh_length++] = 0x01;
variable_header[vh_length++] = 1;
}
- payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised);
+ payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) payload, sizeof(payload), v[u].normalised);
}
- mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);
- }
-
- return 0;
-}
-
-static int mqtt_maintenance(){
- size_t n, u;
- instance** inst = NULL;
- mqtt_instance_data* data = NULL;
-
- if(mm_backend_instances(BACKEND_NAME, &n, &inst)){
- LOG("Failed to fetch instance list");
- return 1;
- }
-
- DBGPF("Running maintenance operations on %" PRIsize_t " instances", n);
- for(u = 0; u < n; u++){
- data = (mqtt_instance_data*) inst[u]->impl;
- if(data->fd <= 0){
- if(mqtt_reconnect(inst[u]) >= 2){
- LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name);
- free(inst);
- return 1;
- }
+ if(payload_length){
+ DBGPF("Transmitting %" PRIsize_t " bytes for %s", payload_length, inst->name);
+ mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);
}
- else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){
- //send keepalive ping requests
- mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL);
+ else if(alias_assigned){
+ //undo alias assignment
+ data->channel[c[u]->ident].topic_alias_sent = 0;
+ data->current_alias--;
}
}
- free(inst);
return 0;
}
static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
- char* topic = NULL, *payload = NULL, *conversion_end = NULL;
+ char* topic = NULL, *payload = NULL;
channel* changed = NULL;
- channel_value val;
uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0;
uint16_t topic_alias = 0;
uint32_t property_length = 0;
@@ -643,16 +821,9 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h
if(u != data->nchannels && payload_length && payload){
DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length);
- //FIXME implement json subchannels
- //FIXME implement input mappings
changed = mm_channel(inst, u, 0);
if(changed){
- val.normalised = clamp(strtod(payload, &conversion_end), 1.0, 0.0);
- if(payload == conversion_end){
- LOGPF("Failed to parse incoming data for %s.%s", inst->name, data->channel[u].topic);
- return 0;
- }
- mm_channel_event(changed, val);
+ mqtt_deserialize(inst, changed, data->channel + u, payload, payload_length);
}
}
return 0;
@@ -679,7 +850,7 @@ static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_h
property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL);
while(property_offset < length){
DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length);
-
+
//read maximum topic alias
if(variable_header[property_offset] == 0x22){
data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2];
@@ -736,9 +907,8 @@ static int mqtt_handle_fd(instance* inst){
DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read);
data->receive_offset += bytes_read;
- //TODO loop this while at least one unhandled message is in the buffer
- //check for complete message
- if(data->receive_offset >= 2){
+ while(data->receive_offset >= 2){
+ //check for complete message
header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length);
if(header_length && data->receive_offset >= message_length + header_length + 1){
DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]);
@@ -752,6 +922,9 @@ static int mqtt_handle_fd(instance* inst){
}
data->receive_offset -= message_length + header_length + 1;
}
+ else{
+ break;
+ }
}
return 0;
@@ -802,7 +975,7 @@ static int mqtt_start(size_t n, instance** inst){
}
static int mqtt_shutdown(size_t n, instance** inst){
- size_t u, p;
+ size_t u, p, v;
mqtt_instance_data* data = NULL;
for(u = 0; u < n; u++){
@@ -810,6 +983,10 @@ static int mqtt_shutdown(size_t n, instance** inst){
mqtt_disconnect(inst[u]);
for(p = 0; p < data->nchannels; p++){
+ for(v = 0; v < data->channel[p].values; v++){
+ free(data->channel[p].value[v].discrete);
+ }
+ free(data->channel[p].value);
free(data->channel[p].topic);
}
free(data->channel);
@@ -817,6 +994,7 @@ static int mqtt_shutdown(size_t n, instance** inst){
free(data->port);
free(data->user);
free(data->password);
+ free(data->client_id);
free(inst[u]->impl);
inst[u]->impl = NULL;