aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorcbdev <cb@cbcdn.com>2021-01-10 09:42:54 +0100
committercbdev <cb@cbcdn.com>2021-01-10 09:42:54 +0100
commit6334a78d0c475ebfa76a739577a561bded135086 (patch)
tree6d4ac9b4dcb36ec808098d11bc61d38f16924080
parentcbef6a61a92453afba5005c287873001354f5090 (diff)
downloadmidimonster-6334a78d0c475ebfa76a739577a561bded135086.tar.gz
midimonster-6334a78d0c475ebfa76a739577a561bded135086.tar.bz2
midimonster-6334a78d0c475ebfa76a739577a561bded135086.zip
Subscribe to input channels
-rw-r--r--backends/mqtt.c79
-rw-r--r--backends/mqtt.h17
2 files changed, 82 insertions, 14 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index e9493a9..d3e516f 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -9,9 +9,6 @@
static uint64_t last_maintenance = 0;
-//TODO
-// * Periodic connection retries
-
MM_PLUGIN_API int init(){
backend mqtt = {
.name = BACKEND_NAME,
@@ -80,7 +77,7 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){
static int mqtt_generate_instanceid(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
- char clientid[23] = "";
+ char clientid[24] = "";
snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name);
return mmbackend_strdup(&(data->client_id), clientid);
@@ -275,6 +272,32 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
return 1;
}
+static int mqtt_push_subscriptions(instance* inst){
+ mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
+ uint8_t variable_header[3] = {0};
+ uint8_t payload[MQTT_BUFFER_LENGTH];
+ size_t u, subs = 0, payload_offset = 0;
+
+ //FIXME might want to aggregate multiple subscribes into one packet
+ for(u = 0; u < data->nchannels; u++){
+ payload_offset = 0;
+ if(data->channel[u].flags & mmchannel_input){
+ variable_header[0] = (data->packet_identifier >> 8) & 0xFF;
+ variable_header[1] = (data->packet_identifier) & 0xFF;
+
+ payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->channel[u].topic);
+ payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0;
+
+ data->packet_identifier++;
+ mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload);
+ subs++;
+ }
+ }
+
+ LOGPF("Subscribed %" PRIsize_t " channels on %s", subs, inst->name);
+ return 0;
+}
+
static int mqtt_instance(instance* inst){
mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
@@ -294,12 +317,47 @@ static int mqtt_instance(instance* inst){
}
static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){
- //TODO
- return NULL;
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ size_t u;
+
+ //check spec for compliance
+ if(strchr(spec, '+') || strchr(spec, '#')){
+ LOGPF("Invalid character in channel specification %s", spec);
+ return NULL;
+ }
+
+ //find matching channel
+ for(u = 0; u < data->nchannels; u++){
+ if(!strcmp(spec, data->channel[u].topic)){
+ data->channel[u].flags |= flags;
+ break;
+ }
+ }
+
+ //allocate new channel
+ if(u == data->nchannels){
+ data->channel = realloc(data->channel, (data->nchannels + 1) * sizeof(mqtt_channel_data));
+ if(!data->channel){
+ LOG("Failed to allocate memory");
+ return NULL;
+ }
+
+ data->channel[u].topic = strdup(spec);
+ data->channel[u].topic_alias = 0;
+ data->channel[u].flags = flags;
+
+ if(!data->channel[u].topic){
+ LOG("Failed to allocate memory");
+ return NULL;
+ }
+ data->nchannels++;
+ }
+
+ return mm_channel(inst, u, 1);
}
static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
- //TODO
+ //TODO mqtt v3.1.1 local filtering
return 0;
}
@@ -349,6 +407,7 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h
}
else{
LOGPF("Connection on %s established", inst->name);
+ return mqtt_push_subscriptions(inst);
}
}
break;
@@ -424,7 +483,7 @@ static int mqtt_handle(size_t num, managed_fd* fds){
}
static int mqtt_start(size_t n, instance** inst){
- size_t u = 0;
+ size_t u = 0, fds = 0;
for(u = 0; u < n; u++){
switch(mqtt_reconnect(inst[u])){
@@ -435,9 +494,11 @@ static int mqtt_start(size_t n, instance** inst){
LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name);
return 1;
default:
+ fds++;
break;
}
}
+ LOGPF("Registered %" PRIsize_t " descriptors to core", fds);
//initialize maintenance timer
last_maintenance = mm_timestamp();
@@ -453,7 +514,7 @@ static int mqtt_shutdown(size_t n, instance** inst){
mqtt_disconnect(inst[u]);
for(p = 0; p < data->nchannels; p++){
- free(data->channel[p]);
+ free(data->channel[p].topic);
}
free(data->channel);
free(data->host);
diff --git a/backends/mqtt.h b/backends/mqtt.h
index a0f5356..c9bce81 100644
--- a/backends/mqtt.h
+++ b/backends/mqtt.h
@@ -16,6 +16,8 @@ static int mqtt_shutdown(size_t n, instance** inst);
#define MQTT_KEEPALIVE 10
#define MQTT_VERSION_DEFAULT 0x05
+#define MQTT5_NO_LOCAL 0x04
+
enum {
MSG_RESERVED = 0x00,
MSG_CONNECT = 0x10,
@@ -25,7 +27,7 @@ enum {
MSG_PUBREC = 0x50,
MSG_PUBREL = 0x60,
MSG_PUBCOMP = 0x70,
- MSG_SUBSCRIBE = 0x80,
+ MSG_SUBSCRIBE = 0x82,
MSG_SUBACK = 0x90,
MSG_UNSUBSCRIBE = 0xA0,
MSG_UNSUBACK = 0xB0,
@@ -35,6 +37,13 @@ enum {
MSG_AUTH = 0xF0
};
+//qos, subscribe
+typedef struct /*_mqtt_channel*/ {
+ char* topic;
+ uint16_t topic_alias;
+ uint8_t flags;
+} mqtt_channel_data;
+
typedef struct /*_mqtt_instance_data*/ {
uint8_t tls;
char* host;
@@ -46,14 +55,12 @@ typedef struct /*_mqtt_instance_data*/ {
char* client_id;
size_t nchannels;
- char** channel;
+ mqtt_channel_data* channel;
int fd;
uint8_t receive_buffer[MQTT_BUFFER_LENGTH];
size_t receive_offset;
uint64_t last_control;
+ uint16_t packet_identifier;
} mqtt_instance_data;
-
-//per-channel
-//qos, subscribe