aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/mqtt.c
diff options
context:
space:
mode:
authorcbdev <cb@cbcdn.com>2021-01-10 09:42:54 +0100
committercbdev <cb@cbcdn.com>2021-01-10 09:42:54 +0100
commit6334a78d0c475ebfa76a739577a561bded135086 (patch)
tree6d4ac9b4dcb36ec808098d11bc61d38f16924080 /backends/mqtt.c
parentcbef6a61a92453afba5005c287873001354f5090 (diff)
downloadmidimonster-6334a78d0c475ebfa76a739577a561bded135086.tar.gz
midimonster-6334a78d0c475ebfa76a739577a561bded135086.tar.bz2
midimonster-6334a78d0c475ebfa76a739577a561bded135086.zip
Subscribe to input channels
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r--backends/mqtt.c79
1 files changed, 70 insertions, 9 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index e9493a9..d3e516f 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -9,9 +9,6 @@
static uint64_t last_maintenance = 0;
-//TODO
-// * Periodic connection retries
-
MM_PLUGIN_API int init(){
backend mqtt = {
.name = BACKEND_NAME,
@@ -80,7 +77,7 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){
static int mqtt_generate_instanceid(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
- char clientid[23] = "";
+ char clientid[24] = "";
snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name);
return mmbackend_strdup(&(data->client_id), clientid);
@@ -275,6 +272,32 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
return 1;
}
+static int mqtt_push_subscriptions(instance* inst){
+ mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
+ uint8_t variable_header[3] = {0};
+ uint8_t payload[MQTT_BUFFER_LENGTH];
+ size_t u, subs = 0, payload_offset = 0;
+
+ //FIXME might want to aggregate multiple subscribes into one packet
+ for(u = 0; u < data->nchannels; u++){
+ payload_offset = 0;
+ if(data->channel[u].flags & mmchannel_input){
+ variable_header[0] = (data->packet_identifier >> 8) & 0xFF;
+ variable_header[1] = (data->packet_identifier) & 0xFF;
+
+ payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->channel[u].topic);
+ payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0;
+
+ data->packet_identifier++;
+ mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload);
+ subs++;
+ }
+ }
+
+ LOGPF("Subscribed %" PRIsize_t " channels on %s", subs, inst->name);
+ return 0;
+}
+
static int mqtt_instance(instance* inst){
mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
@@ -294,12 +317,47 @@ static int mqtt_instance(instance* inst){
}
static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
- //TODO
- return NULL;
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ size_t u;
+
+ //check spec for compliance
+ if(strchr(spec, '+') || strchr(spec, '#')){
+ LOGPF("Invalid character in channel specification %s", spec);
+ return NULL;
+ }
+
+ //find matching channel
+ for(u = 0; u < data->nchannels; u++){
+ if(!strcmp(spec, data->channel[u].topic)){
+ data->channel[u].flags |= flags;
+ break;
+ }
+ }
+
+ //allocate new channel
+ if(u == data->nchannels){
+ data->channel = realloc(data->channel, (data->nchannels + 1) * sizeof(mqtt_channel_data));
+ if(!data->channel){
+ LOG("Failed to allocate memory");
+ return NULL;
+ }
+
+ data->channel[u].topic = strdup(spec);
+ data->channel[u].topic_alias = 0;
+ data->channel[u].flags = flags;
+
+ if(!data->channel[u].topic){
+ LOG("Failed to allocate memory");
+ return NULL;
+ }
+ data->nchannels++;
+ }
+
+ return mm_channel(inst, u, 1);
}
static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
- //TODO
+ //TODO mqtt v3.1.1 local filtering
return 0;
}
@@ -349,6 +407,7 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h
}
else{
LOGPF("Connection on %s established", inst->name);
+ return mqtt_push_subscriptions(inst);
}
}
break;
@@ -424,7 +483,7 @@ static int mqtt_handle(size_t num, managed_fd* fds){
}
static int mqtt_start(size_t n, instance** inst){
- size_t u = 0;
+ size_t u = 0, fds = 0;
for(u = 0; u < n; u++){
switch(mqtt_reconnect(inst[u])){
@@ -435,9 +494,11 @@ static int mqtt_start(size_t n, instance** inst){
LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name);
return 1;
default:
+ fds++;
break;
}
}
+ LOGPF("Registered %" PRIsize_t " descriptors to core", fds);
//initialize maintenance timer
last_maintenance = mm_timestamp();
@@ -453,7 +514,7 @@ static int mqtt_shutdown(size_t n, instance** inst){
mqtt_disconnect(inst[u]);
for(p = 0; p < data->nchannels; p++){
- free(data->channel[p]);
+ free(data->channel[p].topic);
}
free(data->channel);
free(data->host);