diff options
Diffstat (limited to 'backends')
-rw-r--r-- | backends/mqtt.c | 213 | ||||
-rw-r--r-- | backends/mqtt.h | 5 |
2 files changed, 206 insertions, 12 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 4f56aa6..d9cf2d8 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -1,8 +1,14 @@ #define BACKEND_NAME "mqtt" +#define DEBUG #include <string.h> + +#include "libmmbackend.h" #include "mqtt.h" +//TODO +// * Periodic connection retries + MM_PLUGIN_API int init(){ backend mqtt = { .name = BACKEND_NAME, @@ -24,8 +30,100 @@ MM_PLUGIN_API int init(){ return 0; } +static int mqtt_parse_hostspec(instance* inst, char* hostspec){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + char* host = strchr(hostspec, '@'), *password = NULL, *port = NULL; + + //mqtt[s]://[username][:password]@host.domain[:port] + if(!strncmp(hostspec, "mqtt://", 7)){ + hostspec += 7; + } + else if(!strncmp(hostspec, "mqtts://", 8)){ + data->tls = 1; + hostspec += 8; + } + + if(host){ + //parse credentials, separate out host spec + *host = 0; + host++; + + password = strchr(hostspec, ':'); + if(password){ + //password supplied, store + *password = 0; + password++; + mmbackend_strdup(&(data->password), password); + } + + //store username + mmbackend_strdup(&(data->user), hostspec); + } + else{ + host = hostspec; + } + + //parse port if supplied + port = strchr(host, ':'); + if(port){ + *port = 0; + port++; + mmbackend_strdup(&(data->port), port); + } + + mmbackend_strdup(&(data->host), host); + return 0; +} + static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ - //TODO + size_t value = 0, offset = 0; + do { + value |= (buffer[offset] & 0x7F) << (7 * offset); + offset++; + } while(buffer[offset - 1] & 0x80); + return 0; +} + +static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ + //implementation conforming to spec 1.5.5 + size_t offset = 0; + do { + buffer[offset] = value % 128; + value = value / 128; + if(value){ + buffer[offset] |= 0x80; + } + offset++; + } while(value); + return offset; +} + +static void mqtt_disconnect(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + + //unmanage the fd + mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); + + close(data->fd); + data->fd = -1; +} + +static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t* vh, size_t payload_length, uint8_t* payload){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + uint8_t fixed_header[5]; + size_t offset = 0; + + fixed_header[offset++] = type; + offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); + + if(mmbackend_send(data->fd, fixed_header, offset) + || mmbackend_send(data->fd, vh, vh_length) + || mmbackend_send(data->fd, payload, payload_length)){ + LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name); + mqtt_disconnect(inst); + return 1; + } + return 0; } @@ -34,25 +132,91 @@ static int mqtt_configure(char* option, char* value){ return 1; } +static int mqtt_reconnect(instance* inst){ + uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; + uint8_t payload[MQTT_BUFFER_LENGTH]; + size_t vh_offset = 10, payload_offset = 0; + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + + if(!data->host){ + LOGPF("No host specified for instance %s", inst->name); + return 2; + } + + if(data->fd >= 0){ + mqtt_disconnect(inst); + } + + LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)", + inst->name, data->host, + data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), + data->tls ? "yes " : "no", + (data->user || data->password) ? "yes" : "no"); + + data->fd = mmbackend_socket(data->host, + data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), + SOCK_STREAM, 0, 0, 1); + + if(data->fd < 0){ + //retry later + return 1; + } + + //prepare CONNECT message + variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); + //TODO set session expiry interval option + //TODO re-use previos session on reconnect + + //push number of option bytes (as a varint, no less) before actually pushing the option data. + //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. + variable_header[vh_offset++] = 7; + //push maximum packet size option + variable_header[vh_offset++] = 0x27; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; + variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; + //push topic alias maximum option + variable_header[vh_offset++] = 0x22; + variable_header[vh_offset++] = 0xFF; + variable_header[vh_offset++] = 0xFF; + + //push client_id as utf8 + //payload_offset += mqtt_push_utf8(); + if(data->user){ + //push user name as utf8 + } + if(data->password){ + //push password as binary + } + + mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); + + //register the fd + if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){ + LOG("Failed to register FD"); + return 2; + } + + return 0; +} + static int mqtt_configure_instance(instance* inst, char* option, char* value){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; - char* token = value; if(!strcmp(option, "user")){ - free(data->user); - data->user = strdup(value); + mmbackend_strdup(&(data->user), value); return 0; } else if(!strcmp(option, "password")){ - free(data->password); - data->user = strdup(value); + mmbackend_strdup(&(data->password), value); return 0; } else if(!strcmp(option, "host")){ - //mqtt url may be of the form - //mqtt[s]://[username][:password]@host.domain[:port] - token = strchr(value, ':'); - //TODO + if(mqtt_parse_hostspec(inst, value)){ + return 1; + } + return 0; } LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); @@ -60,7 +224,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){ } static int mqtt_instance(instance* inst){ - //TODO + mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + + if(!data){ + LOG("Failed to allocate memory"); + return 1; + } + + data->fd = -1; + inst->impl = data; return 0; } @@ -75,12 +247,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ } static int mqtt_handle(size_t num, managed_fd* fds){ + LOG("Handling"); //TODO return 0; } static int mqtt_start(size_t n, instance** inst){ - //TODO + size_t u = 0; + + for(u = 0; u < n; u++){ + switch(mqtt_reconnect(inst[u])){ + case 1: + LOGPF("Failed to connect to host for instance %s, will be retried", inst[u]->name); + break; + case 2: + LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name); + return 1; + default: + break; + } + } + return 0; } @@ -90,6 +277,8 @@ static int mqtt_shutdown(size_t n, instance** inst){ for(u = 0; u < n; u++){ data = (mqtt_instance_data*) inst[u]->impl; + mqtt_disconnect(inst[u]); + for(p = 0; p < data->nchannels; p++){ free(data->channel[p]); } diff --git a/backends/mqtt.h b/backends/mqtt.h index 165f2ba..1c8b47d 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -12,6 +12,8 @@ static int mqtt_shutdown(size_t n, instance** inst); #define MQTT_PORT "1883" #define MQTT_TLS_PORT "8883" +#define MQTT_BUFFER_LENGTH 8192 +#define MQTT_KEEPALIVE 10 enum { MSG_RESERVED = 0x00, @@ -39,9 +41,12 @@ typedef struct /*_mqtt_instance_data*/ { char* user; char* password; + char* client_id; size_t nchannels; char** channel; + + int fd; } mqtt_instance_data; //per-channel |