aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends
diff options
context:
space:
mode:
Diffstat (limited to 'backends')
-rw-r--r--backends/mqtt.c213
-rw-r--r--backends/mqtt.h5
2 files changed, 206 insertions, 12 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 4f56aa6..d9cf2d8 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -1,8 +1,14 @@
#define BACKEND_NAME "mqtt"
+#define DEBUG
#include <string.h>
+
+#include "libmmbackend.h"
#include "mqtt.h"
+//TODO
+// * Periodic connection retries
+
MM_PLUGIN_API int init(){
backend mqtt = {
.name = BACKEND_NAME,
@@ -24,8 +30,100 @@ MM_PLUGIN_API int init(){
return 0;
}
+static int mqtt_parse_hostspec(instance* inst, char* hostspec){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ char* host = strchr(hostspec, '@'), *password = NULL, *port = NULL;
+
+ //mqtt[s]://[username][:password]@host.domain[:port]
+ if(!strncmp(hostspec, "mqtt://", 7)){
+ hostspec += 7;
+ }
+ else if(!strncmp(hostspec, "mqtts://", 8)){
+ data->tls = 1;
+ hostspec += 8;
+ }
+
+ if(host){
+ //parse credentials, separate out host spec
+ *host = 0;
+ host++;
+
+ password = strchr(hostspec, ':');
+ if(password){
+ //password supplied, store
+ *password = 0;
+ password++;
+ mmbackend_strdup(&(data->password), password);
+ }
+
+ //store username
+ mmbackend_strdup(&(data->user), hostspec);
+ }
+ else{
+ host = hostspec;
+ }
+
+ //parse port if supplied
+ port = strchr(host, ':');
+ if(port){
+ *port = 0;
+ port++;
+ mmbackend_strdup(&(data->port), port);
+ }
+
+ mmbackend_strdup(&(data->host), host);
+ return 0;
+}
+
static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){
- //TODO
+ size_t value = 0, offset = 0;
+ do {
+ value |= (buffer[offset] & 0x7F) << (7 * offset);
+ offset++;
+ } while(buffer[offset - 1] & 0x80);
+ return 0;
+}
+
+static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){
+ //implementation conforming to spec 1.5.5
+ size_t offset = 0;
+ do {
+ buffer[offset] = value % 128;
+ value = value / 128;
+ if(value){
+ buffer[offset] |= 0x80;
+ }
+ offset++;
+ } while(value);
+ return offset;
+}
+
+static void mqtt_disconnect(instance* inst){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+
+ //unmanage the fd
+ mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
+
+ close(data->fd);
+ data->fd = -1;
+}
+
+static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t* vh, size_t payload_length, uint8_t* payload){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ uint8_t fixed_header[5];
+ size_t offset = 0;
+
+ fixed_header[offset++] = type;
+ offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);
+
+ if(mmbackend_send(data->fd, fixed_header, offset)
+ || mmbackend_send(data->fd, vh, vh_length)
+ || mmbackend_send(data->fd, payload, payload_length)){
+ LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name);
+ mqtt_disconnect(inst);
+ return 1;
+ }
+
return 0;
}
@@ -34,25 +132,91 @@ static int mqtt_configure(char* option, char* value){
return 1;
}
+static int mqtt_reconnect(instance* inst){
+ uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF};
+ uint8_t payload[MQTT_BUFFER_LENGTH];
+ size_t vh_offset = 10, payload_offset = 0;
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+
+ if(!data->host){
+ LOGPF("No host specified for instance %s", inst->name);
+ return 2;
+ }
+
+ if(data->fd >= 0){
+ mqtt_disconnect(inst);
+ }
+
+ LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)",
+ inst->name, data->host,
+ data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT),
+ data->tls ? "yes " : "no",
+ (data->user || data->password) ? "yes" : "no");
+
+ data->fd = mmbackend_socket(data->host,
+ data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT),
+ SOCK_STREAM, 0, 0, 1);
+
+ if(data->fd < 0){
+ //retry later
+ return 1;
+ }
+
+ //prepare CONNECT message
+ variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00);
+ //TODO set session expiry interval option
+ //TODO re-use previos session on reconnect
+
+ //push number of option bytes (as a varint, no less) before actually pushing the option data.
+ //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart..
+ variable_header[vh_offset++] = 7;
+ //push maximum packet size option
+ variable_header[vh_offset++] = 0x27;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF;
+ //push topic alias maximum option
+ variable_header[vh_offset++] = 0x22;
+ variable_header[vh_offset++] = 0xFF;
+ variable_header[vh_offset++] = 0xFF;
+
+ //push client_id as utf8
+ //payload_offset += mqtt_push_utf8();
+ if(data->user){
+ //push user name as utf8
+ }
+ if(data->password){
+ //push password as binary
+ }
+
+ mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload);
+
+ //register the fd
+ if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){
+ LOG("Failed to register FD");
+ return 2;
+ }
+
+ return 0;
+}
+
static int mqtt_configure_instance(instance* inst, char* option, char* value){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
- char* token = value;
if(!strcmp(option, "user")){
- free(data->user);
- data->user = strdup(value);
+ mmbackend_strdup(&(data->user), value);
return 0;
}
else if(!strcmp(option, "password")){
- free(data->password);
- data->user = strdup(value);
+ mmbackend_strdup(&(data->password), value);
return 0;
}
else if(!strcmp(option, "host")){
- //mqtt url may be of the form
- //mqtt[s]://[username][:password]@host.domain[:port]
- token = strchr(value, ':');
- //TODO
+ if(mqtt_parse_hostspec(inst, value)){
+ return 1;
+ }
+ return 0;
}
LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name);
@@ -60,7 +224,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){
}
static int mqtt_instance(instance* inst){
- //TODO
+ mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data));
+
+ if(!data){
+ LOG("Failed to allocate memory");
+ return 1;
+ }
+
+ data->fd = -1;
+ inst->impl = data;
return 0;
}
@@ -75,12 +247,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
}
static int mqtt_handle(size_t num, managed_fd* fds){
+ LOG("Handling");
//TODO
return 0;
}
static int mqtt_start(size_t n, instance** inst){
- //TODO
+ size_t u = 0;
+
+ for(u = 0; u < n; u++){
+ switch(mqtt_reconnect(inst[u])){
+ case 1:
+ LOGPF("Failed to connect to host for instance %s, will be retried", inst[u]->name);
+ break;
+ case 2:
+ LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name);
+ return 1;
+ default:
+ break;
+ }
+ }
+
return 0;
}
@@ -90,6 +277,8 @@ static int mqtt_shutdown(size_t n, instance** inst){
for(u = 0; u < n; u++){
data = (mqtt_instance_data*) inst[u]->impl;
+ mqtt_disconnect(inst[u]);
+
for(p = 0; p < data->nchannels; p++){
free(data->channel[p]);
}
diff --git a/backends/mqtt.h b/backends/mqtt.h
index 165f2ba..1c8b47d 100644
--- a/backends/mqtt.h
+++ b/backends/mqtt.h
@@ -12,6 +12,8 @@ static int mqtt_shutdown(size_t n, instance** inst);
#define MQTT_PORT "1883"
#define MQTT_TLS_PORT "8883"
+#define MQTT_BUFFER_LENGTH 8192
+#define MQTT_KEEPALIVE 10
enum {
MSG_RESERVED = 0x00,
@@ -39,9 +41,12 @@ typedef struct /*_mqtt_instance_data*/ {
char* user;
char* password;
+ char* client_id;
size_t nchannels;
char** channel;
+
+ int fd;
} mqtt_instance_data;
//per-channel