diff options
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r-- | backends/mqtt.c | 296 |
1 files changed, 237 insertions, 59 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 41fa772..36aed03 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -3,6 +3,7 @@ #include <string.h> #include <time.h> +#include <math.h> #include "libmmbackend.h" #include "mqtt.h" @@ -46,8 +47,6 @@ static struct { /* * TODO * * proper RETAIN handling - * * mqtt v3.1.1 local filtering - * * modifiable output mappings * * TLS * * JSON subchannels */ @@ -218,7 +217,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ size_t length = 0; *data = NULL; - + if(buffer_length < 2){ return 0; } @@ -349,6 +348,92 @@ static int mqtt_reconnect(instance* inst){ return 0; } +static int mqtt_configure_channel(instance* inst, char* option, char* value){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* next_token = NULL; + channel* configure = NULL; + uint8_t mark = 0; + mqtt_channel_value config = { + 0 + }; + + if(!strncmp(value, "range ", 6)){ + //we support min > max for range configurations + value += 6; + + config.min = strtod(value, &next_token); + if(value == next_token){ + LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); + return 1; + } + + config.max = strtod(next_token, &value); + if(value == next_token){ + LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); + return 1; + } + } + else if(!strncmp(value, "discrete ", 9)){ + value += 9; + + for(; *value && isspace(*value); value++){ + } + if(value[0] == '!'){ + mark = 1; + value++; + } + config.min = clamp(strtod(value, &next_token), 1.0, 0.0); + value = next_token; + + for(; *value && isspace(*value); value++){ + } + if(value[0] == '!'){ + mark = 2; + value++; + } + + config.max = clamp(strtod(value, &next_token), 1.0, 0.0); + value = next_token; + if(config.max < config.min){ + LOGPF("Discrete topic configuration for %s.%s has invalid limit ordering", inst->name, option); + return 1; + } + + for(; *value && isspace(*value); value++){ + } + + config.discrete = strdup(value); + config.normal = mark ? ((mark == 1) ? config.min : config.max) : (config.min + (config.max - config.min) / 2); + } + else{ + LOGPF("Unknown instance configuration option or invalid preconfiguration %s on instance %s", option, inst->name); + return 1; + } + + configure = mqtt_channel(inst, option, 0); + if(!configure + //if configuring scale, no other config is possible + || (!config.discrete && data->channel[configure->ident].values) + //if configuring discrete, the previous one can't be a a scale + || (config.discrete && data->channel[configure->ident].values && !data->channel[configure->ident].value[0].discrete)){ + LOGPF("Failed to configure topic %s.%s", inst->name, option); + free(config.discrete); + return 1; + } + + data->channel[configure->ident].value = realloc(data->channel[configure->ident].value, (data->channel[configure->ident].values + 1) * sizeof(mqtt_channel_value)); + if(!data->channel[configure->ident].value){ + LOG("Failed to allocate memory"); + return 1; + } + + DBGPF("Configuring value on %s.%s: min %f max %f normal %f discrete %s", inst->name, option, config.min, config.max, config.normal, config.discrete ? config.discrete : "-"); + data->channel[configure->ident].value[data->channel[configure->ident].values] = config; + data->channel[configure->ident].values++; + DBGPF("Value configuration for %s.%s now at %" PRIsize_t " entries", inst->name, option, data->channel[configure->ident].values); + return 0; +} + static int mqtt_configure_instance(instance* inst, char* option, char* value){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -383,8 +468,8 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return 0; } - LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); - return 1; + //try to register as channel preconfig + return mqtt_configure_channel(inst, option, value); } static int mqtt_push_subscriptions(instance* inst){ @@ -470,12 +555,14 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ data->channel[u].topic_alias_sent = 0; data->channel[u].topic_alias_rcvd = 0; data->channel[u].flags = flags; + data->channel[u].values = 0; + data->channel[u].value = NULL; if(!data->channel[u].topic){ LOG("Failed to allocate memory"); return NULL; } - + DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags); data->nchannels++; } @@ -483,15 +570,125 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ return mm_channel(inst, u, 1); } +static int mqtt_maintenance(){ + size_t n, u; + instance** inst = NULL; + mqtt_instance_data* data = NULL; + + if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ + LOG("Failed to fetch instance list"); + return 1; + } + + DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + if(data->fd <= 0){ + if(mqtt_reconnect(inst[u]) >= 2){ + LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); + free(inst); + return 1; + } + } + else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ + //send keepalive ping requests + mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + } + } + + free(inst); + return 0; +} + +static int mqtt_deserialize(instance* inst, channel* output, mqtt_channel_data* input, char* buffer, size_t length){ + char* next_token = NULL, conversion_buffer[1024] = {0}; + channel_value val; + double range, raw; + size_t u; + //FIXME implement json subchannels + + //unconfigured channel + if(!input->values){ + //the original buffer is the result of an unterminated receive, move it over + memcpy(conversion_buffer, buffer, length); + val.normalised = clamp(strtod(conversion_buffer, &next_token), 1.0, 0.0); + if(conversion_buffer == next_token){ + LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); + return 1; + } + } + //ranged channel + else if(!input->value[0].discrete){ + memcpy(conversion_buffer, buffer, length); + raw = clamp(strtod(conversion_buffer, &next_token), max(input->value[0].max, input->value[0].min), min(input->value[0].max, input->value[0].min)); + if(conversion_buffer == next_token){ + LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); + return 1; + } + range = fabs(input->value[0].max - input->value[0].min); + val.normalised = (raw - input->value[0].min) / range; + if(input->value[0].max < input->value[0].min){ + val.normalised = fabs(val.normalised); + } + } + else{ + for(u = 0; u < input->values; u++){ + if(length == strlen(input->value[u].discrete) + && !strncmp(input->value[u].discrete, buffer, length)){ + val.normalised = input->value[u].normal; + break; + } + } + + if(u == input->values){ + LOGPF("Failed to parse incoming data for %s.%s, no matching discrete token", inst->name, input->topic); + return 1; + } + } + + val.normalised = clamp(val.normalised, 1.0, 0.0); + mm_channel_event(output, val); + return 0; +} + +static size_t mqtt_serialize(instance* inst, mqtt_channel_data* input, char* output, size_t length, double value){ + double range; + size_t u, invert = 0; + + //unconfigured channel + if(!input->values){ + return snprintf(output, length, "%f", value); + } + //ranged channel + else if(!input->value[0].discrete){ + range = fabs(input->value[0].max - input->value[0].min); + if(input->value[0].max < input->value[0].min){ + invert = 1; + } + return snprintf(output, length, "%f", (value * range) * (invert ? -1 : 1) + input->value[0].min); + } + else{ + for(u = 0; u < input->values; u++){ + if(input->value[u].min <= value + && input->value[u].max >= value){ + memcpy(output, input->value[u].discrete, min(strlen(input->value[u].discrete), length)); + return min(strlen(input->value[u].discrete), length); + } + } + } + + LOGPF("No discrete value on %s.%s defined for normalized value %f", inst->name, input->topic, value); + return 0; +} + static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; uint8_t variable_header[MQTT_BUFFER_LENGTH]; - uint8_t payload[MQTT_BUFFER_LENGTH]; - size_t vh_length = 0, payload_length = 0; - size_t u; + uint8_t payload[MQTT_BUFFER_LENGTH], alias_assigned = 0; + size_t vh_length = 0, payload_length = 0, u; for(u = 0; u < num; u++){ - vh_length = payload_length = 0; + vh_length = payload_length = alias_assigned = 0; if(data->mqtt_version == 0x05){ if(data->channel[c[u]->ident].topic_alias_sent){ @@ -506,6 +703,8 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ if(data->current_alias <= data->server_max_alias){ data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + + alias_assigned = 1; } } @@ -523,10 +722,12 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; } - payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); - payload[0] = (payload_length >> 8) & 0xFF; - payload[1] = payload_length & 0xFF; - payload_length += 2; + payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) (payload + 2), sizeof(payload) - 2, v[u].normalised); + if(payload_length){ + payload[0] = (payload_length >> 8) & 0xFF; + payload[1] = payload_length & 0xFF; + payload_length += 2; + } } else{ //push topic @@ -539,50 +740,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ variable_header[vh_length++] = 0x01; variable_header[vh_length++] = 1; } - payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); + payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) payload, sizeof(payload), v[u].normalised); } - mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); - } - - return 0; -} - -static int mqtt_maintenance(){ - size_t n, u; - instance** inst = NULL; - mqtt_instance_data* data = NULL; - - if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ - LOG("Failed to fetch instance list"); - return 1; - } - - DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); - for(u = 0; u < n; u++){ - data = (mqtt_instance_data*) inst[u]->impl; - if(data->fd <= 0){ - if(mqtt_reconnect(inst[u]) >= 2){ - LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); - free(inst); - return 1; - } + if(payload_length){ + DBGPF("Transmitting %" PRIsize_t " bytes for %s", payload_length, inst->name); + mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); } - else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ - //send keepalive ping requests - mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); + else if(alias_assigned){ + //undo alias assignment + data->channel[c[u]->ident].topic_alias_sent = 0; + data->current_alias--; } } - free(inst); return 0; } static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char* topic = NULL, *payload = NULL, *conversion_end = NULL; + char* topic = NULL, *payload = NULL; channel* changed = NULL; - channel_value val; uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; uint16_t topic_alias = 0; uint32_t property_length = 0; @@ -643,16 +821,9 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h if(u != data->nchannels && payload_length && payload){ DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); - //FIXME implement json subchannels - //FIXME implement input mappings changed = mm_channel(inst, u, 0); if(changed){ - val.normalised = clamp(strtod(payload, &conversion_end), 1.0, 0.0); - if(payload == conversion_end){ - LOGPF("Failed to parse incoming data for %s.%s", inst->name, data->channel[u].topic); - return 0; - } - mm_channel_event(changed, val); + mqtt_deserialize(inst, changed, data->channel + u, payload, payload_length); } } return 0; @@ -679,7 +850,7 @@ static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_h property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); while(property_offset < length){ DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); - + //read maximum topic alias if(variable_header[property_offset] == 0x22){ data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; @@ -736,9 +907,8 @@ static int mqtt_handle_fd(instance* inst){ DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); data->receive_offset += bytes_read; - //TODO loop this while at least one unhandled message is in the buffer - //check for complete message - if(data->receive_offset >= 2){ + while(data->receive_offset >= 2){ + //check for complete message header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); if(header_length && data->receive_offset >= message_length + header_length + 1){ DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); @@ -752,6 +922,9 @@ static int mqtt_handle_fd(instance* inst){ } data->receive_offset -= message_length + header_length + 1; } + else{ + break; + } } return 0; @@ -802,7 +975,7 @@ static int mqtt_start(size_t n, instance** inst){ } static int mqtt_shutdown(size_t n, instance** inst){ - size_t u, p; + size_t u, p, v; mqtt_instance_data* data = NULL; for(u = 0; u < n; u++){ @@ -810,6 +983,10 @@ static int mqtt_shutdown(size_t n, instance** inst){ mqtt_disconnect(inst[u]); for(p = 0; p < data->nchannels; p++){ + for(v = 0; v < data->channel[p].values; v++){ + free(data->channel[p].value[v].discrete); + } + free(data->channel[p].value); free(data->channel[p].topic); } free(data->channel); @@ -817,6 +994,7 @@ static int mqtt_shutdown(size_t n, instance** inst){ free(data->port); free(data->user); free(data->password); + free(data->client_id); free(inst[u]->impl); inst[u]->impl = NULL; |