aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--backends/mqtt.c75
-rw-r--r--backends/mqtt.h3
2 files changed, 68 insertions, 10 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index d9cf2d8..8c4a9fd 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -2,10 +2,13 @@
#define DEBUG
#include <string.h>
+#include <time.h>
#include "libmmbackend.h"
#include "mqtt.h"
+static uint64_t last_maintenance = 0;
+
//TODO
// * Periodic connection retries
@@ -75,6 +78,14 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){
return 0;
}
+static int mqtt_generate_instanceid(instance* inst){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ char clientid[23] = "";
+
+ snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name);
+ return mmbackend_strdup(&(data->client_id), clientid);
+}
+
static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){
size_t value = 0, offset = 0;
do {
@@ -98,6 +109,24 @@ static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){
return offset;
}
+static size_t mqtt_push_binary(uint8_t* buffer, size_t buffer_length, uint8_t* content, size_t length){
+ if(buffer_length < length + 2 || length > 65535){
+ LOG("Failed to push length-prefixed data blob, buffer size exceeded");
+ return 0;
+ }
+
+ buffer[0] = (length >> 8) & 0xFF;
+ buffer[1] = length & 0xFF;
+
+ memcpy(buffer + 2, content, length);
+ return length + 2;
+}
+
+static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* content){
+ //FIXME might want to validate the string for valid UTF-8
+ return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content));
+}
+
static void mqtt_disconnect(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
@@ -133,7 +162,7 @@ static int mqtt_configure(char* option, char* value){
}
static int mqtt_reconnect(instance* inst){
- uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF};
+ uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF};
uint8_t payload[MQTT_BUFFER_LENGTH];
size_t vh_offset = 10, payload_offset = 0;
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
@@ -162,14 +191,15 @@ static int mqtt_reconnect(instance* inst){
return 1;
}
- //prepare CONNECT message
+ //prepare CONNECT message flags
variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00);
+
//TODO set session expiry interval option
//TODO re-use previos session on reconnect
-
+
//push number of option bytes (as a varint, no less) before actually pushing the option data.
//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart..
- variable_header[vh_offset++] = 7;
+ variable_header[vh_offset++] = 8;
//push maximum packet size option
variable_header[vh_offset++] = 0x27;
variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF;
@@ -181,13 +211,14 @@ static int mqtt_reconnect(instance* inst){
variable_header[vh_offset++] = 0xFF;
variable_header[vh_offset++] = 0xFF;
- //push client_id as utf8
- //payload_offset += mqtt_push_utf8();
+ //prepare CONNECT payload
+ //push client id
+ payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->client_id);
if(data->user){
- //push user name as utf8
+ payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->user);
}
if(data->password){
- //push password as binary
+ payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->password);
}
mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload);
@@ -218,6 +249,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
}
return 0;
}
+ else if(!strcmp(option, "clientid")){
+ if(strlen(value)){
+ mmbackend_strdup(&(data->client_id), value);
+ return 0;
+ }
+ else{
+ return mqtt_generate_instanceid(inst);
+ }
+ }
LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name);
return 1;
@@ -233,6 +273,10 @@ static int mqtt_instance(instance* inst){
data->fd = -1;
inst->impl = data;
+
+ if(mqtt_generate_instanceid(inst)){
+ return 1;
+ }
return 0;
}
@@ -247,8 +291,17 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
}
static int mqtt_handle(size_t num, managed_fd* fds){
- LOG("Handling");
- //TODO
+ size_t n = 0;
+
+ //for(n = 0; n < num; n++){
+ //}
+
+ //keepalive/reconnect processing
+ if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){
+ //TODO run reconnects
+ last_maintenance = mm_timestamp();
+ }
+
return 0;
}
@@ -268,6 +321,8 @@ static int mqtt_start(size_t n, instance** inst){
}
}
+ //initialize maintenance timer
+ last_maintenance = mm_timestamp();
return 0;
}
diff --git a/backends/mqtt.h b/backends/mqtt.h
index 1c8b47d..df63319 100644
--- a/backends/mqtt.h
+++ b/backends/mqtt.h
@@ -14,6 +14,7 @@ static int mqtt_shutdown(size_t n, instance** inst);
#define MQTT_TLS_PORT "8883"
#define MQTT_BUFFER_LENGTH 8192
#define MQTT_KEEPALIVE 10
+#define MQTT_VERSION 0x05
enum {
MSG_RESERVED = 0x00,
@@ -47,6 +48,8 @@ typedef struct /*_mqtt_instance_data*/ {
char** channel;
int fd;
+ uint8_t receive_buffer[MQTT_BUFFER_LENGTH];
+ size_t receive_offset;
} mqtt_instance_data;
//per-channel