From 6334a78d0c475ebfa76a739577a561bded135086 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sun, 10 Jan 2021 09:42:54 +0100 Subject: Subscribe to input channels --- backends/mqtt.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++------- backends/mqtt.h | 17 +++++++++---- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index e9493a9..d3e516f 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -9,9 +9,6 @@ static uint64_t last_maintenance = 0; -//TODO -// * Periodic connection retries - MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -80,7 +77,7 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){ static int mqtt_generate_instanceid(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char clientid[23] = ""; + char clientid[24] = ""; snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); return mmbackend_strdup(&(data->client_id), clientid); @@ -275,6 +272,32 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ return 1; } +static int mqtt_push_subscriptions(instance* inst){ + mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + uint8_t variable_header[3] = {0}; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t u, subs = 0, payload_offset = 0; + + //FIXME might want to aggregate multiple subscribes into one packet + for(u = 0; u < data->nchannels; u++){ + payload_offset = 0; + if(data->channel[u].flags & mmchannel_input){ + variable_header[0] = (data->packet_identifier >> 8) & 0xFF; + variable_header[1] = (data->packet_identifier) & 0xFF; + + payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->channel[u].topic); + payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0; + + data->packet_identifier++; + mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload); + subs++; + } + } + + LOGPF("Subscribed %" PRIsize_t " channels on %s", subs, inst->name); + return 0; +} + static int mqtt_instance(instance* inst){ mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); @@ -294,12 +317,47 @@ static int mqtt_instance(instance* inst){ } static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ - //TODO - return NULL; + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + size_t u; + + //check spec for compliance + if(strchr(spec, '+') || strchr(spec, '#')){ + LOGPF("Invalid character in channel specification %s", spec); + return NULL; + } + + //find matching channel + for(u = 0; u < data->nchannels; u++){ + if(!strcmp(spec, data->channel[u].topic)){ + data->channel[u].flags |= flags; + break; + } + } + + //allocate new channel + if(u == data->nchannels){ + data->channel = realloc(data->channel, (data->nchannels + 1) * sizeof(mqtt_channel_data)); + if(!data->channel){ + LOG("Failed to allocate memory"); + return NULL; + } + + data->channel[u].topic = strdup(spec); + data->channel[u].topic_alias = 0; + data->channel[u].flags = flags; + + if(!data->channel[u].topic){ + LOG("Failed to allocate memory"); + return NULL; + } + data->nchannels++; + } + + return mm_channel(inst, u, 1); } static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ - //TODO + //TODO mqtt v3.1.1 local filtering return 0; } @@ -349,6 +407,7 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h } else{ LOGPF("Connection on %s established", inst->name); + return mqtt_push_subscriptions(inst); } } break; @@ -424,7 +483,7 @@ static int mqtt_handle(size_t num, managed_fd* fds){ } static int mqtt_start(size_t n, instance** inst){ - size_t u = 0; + size_t u = 0, fds = 0; for(u = 0; u < n; u++){ switch(mqtt_reconnect(inst[u])){ @@ -435,9 +494,11 @@ static int mqtt_start(size_t n, instance** inst){ LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name); return 1; default: + fds++; break; } } + LOGPF("Registered %" PRIsize_t " descriptors to core", fds); //initialize maintenance timer last_maintenance = mm_timestamp(); @@ -453,7 +514,7 @@ static int mqtt_shutdown(size_t n, instance** inst){ mqtt_disconnect(inst[u]); for(p = 0; p < data->nchannels; p++){ - free(data->channel[p]); + free(data->channel[p].topic); } free(data->channel); free(data->host); diff --git a/backends/mqtt.h b/backends/mqtt.h index a0f5356..c9bce81 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -16,6 +16,8 @@ static int mqtt_shutdown(size_t n, instance** inst); #define MQTT_KEEPALIVE 10 #define MQTT_VERSION_DEFAULT 0x05 +#define MQTT5_NO_LOCAL 0x04 + enum { MSG_RESERVED = 0x00, MSG_CONNECT = 0x10, @@ -25,7 +27,7 @@ enum { MSG_PUBREC = 0x50, MSG_PUBREL = 0x60, MSG_PUBCOMP = 0x70, - MSG_SUBSCRIBE = 0x80, + MSG_SUBSCRIBE = 0x82, MSG_SUBACK = 0x90, MSG_UNSUBSCRIBE = 0xA0, MSG_UNSUBACK = 0xB0, @@ -35,6 +37,13 @@ enum { MSG_AUTH = 0xF0 }; +//qos, subscribe +typedef struct /*_mqtt_channel*/ { + char* topic; + uint16_t topic_alias; + uint8_t flags; +} mqtt_channel_data; + typedef struct /*_mqtt_instance_data*/ { uint8_t tls; char* host; @@ -46,14 +55,12 @@ typedef struct /*_mqtt_instance_data*/ { char* client_id; size_t nchannels; - char** channel; + mqtt_channel_data* channel; int fd; uint8_t receive_buffer[MQTT_BUFFER_LENGTH]; size_t receive_offset; uint64_t last_control; + uint16_t packet_identifier; } mqtt_instance_data; - -//per-channel -//qos, subscribe -- cgit v1.2.3