diff options
Diffstat (limited to 'backends')
| -rw-r--r-- | backends/mqtt.c | 75 | ||||
| -rw-r--r-- | backends/mqtt.h | 3 | 
2 files changed, 68 insertions, 10 deletions
| diff --git a/backends/mqtt.c b/backends/mqtt.c index d9cf2d8..8c4a9fd 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -2,10 +2,13 @@  #define DEBUG  #include <string.h> +#include <time.h>  #include "libmmbackend.h"  #include "mqtt.h" +static uint64_t last_maintenance = 0; +  //TODO  // * Periodic connection retries @@ -75,6 +78,14 @@ static int mqtt_parse_hostspec(instance* inst, char* hostspec){  	return 0;  } +static int mqtt_generate_instanceid(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char clientid[23] = ""; + +	snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); +	return mmbackend_strdup(&(data->client_id), clientid); +} +  static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){  	size_t value = 0, offset = 0;  	do { @@ -98,6 +109,24 @@ static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){  	return offset;  } +static size_t mqtt_push_binary(uint8_t* buffer, size_t buffer_length, uint8_t* content, size_t length){ +	if(buffer_length < length + 2 || length > 65535){ +		LOG("Failed to push length-prefixed data blob, buffer size exceeded"); +		return 0; +	} + +	buffer[0] = (length >> 8) & 0xFF; +	buffer[1] = length & 0xFF; + +	memcpy(buffer + 2, content, length); +	return length + 2; +} + +static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* content){ +	//FIXME might want to validate the string for valid UTF-8 +	return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); +} +  static void mqtt_disconnect(instance* inst){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -133,7 +162,7 @@ static int mqtt_configure(char* option, char* value){  }  static int mqtt_reconnect(instance* inst){ -	uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x05, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; +	uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF};  	uint8_t payload[MQTT_BUFFER_LENGTH];  	size_t vh_offset = 10, payload_offset = 0;  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -162,14 +191,15 @@ static int mqtt_reconnect(instance* inst){  		return 1;  	} -	//prepare CONNECT message +	//prepare CONNECT message flags  	variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); +  	//TODO set session expiry interval option  	//TODO re-use previos session on reconnect -	 +  	//push number of option bytes (as a varint, no less) before actually pushing the option data.  	//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. -	variable_header[vh_offset++] = 7; +	variable_header[vh_offset++] = 8;  	//push maximum packet size option  	variable_header[vh_offset++] = 0x27;  	variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; @@ -181,13 +211,14 @@ static int mqtt_reconnect(instance* inst){  	variable_header[vh_offset++] = 0xFF;  	variable_header[vh_offset++] = 0xFF; -	//push client_id as utf8 -	//payload_offset += mqtt_push_utf8(); +	//prepare CONNECT payload +	//push client id +	payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->client_id);  	if(data->user){ -		//push user name as utf8 +		payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->user);  	}  	if(data->password){ -		//push password as binary +		payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->password);  	}  	mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); @@ -218,6 +249,15 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){  		}  		return 0;  	} +	else if(!strcmp(option, "clientid")){ +		if(strlen(value)){ +			mmbackend_strdup(&(data->client_id), value); +			return 0; +		} +		else{ +			return mqtt_generate_instanceid(inst); +		} +	}  	LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name);  	return 1; @@ -233,6 +273,10 @@ static int mqtt_instance(instance* inst){  	data->fd = -1;  	inst->impl = data; + +	if(mqtt_generate_instanceid(inst)){ +		return 1; +	}  	return 0;  } @@ -247,8 +291,17 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  }  static int mqtt_handle(size_t num, managed_fd* fds){ -	LOG("Handling"); -	//TODO +	size_t n = 0; + +	//for(n = 0; n < num; n++){ +	//} + +	//keepalive/reconnect processing +	if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ +		//TODO run reconnects +		last_maintenance = mm_timestamp(); +	} +  	return 0;  } @@ -268,6 +321,8 @@ static int mqtt_start(size_t n, instance** inst){  		}  	} +	//initialize maintenance timer +	last_maintenance = mm_timestamp();  	return 0;  } diff --git a/backends/mqtt.h b/backends/mqtt.h index 1c8b47d..df63319 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -14,6 +14,7 @@ static int mqtt_shutdown(size_t n, instance** inst);  #define MQTT_TLS_PORT "8883"  #define MQTT_BUFFER_LENGTH 8192  #define MQTT_KEEPALIVE 10  +#define MQTT_VERSION 0x05  enum {  	MSG_RESERVED = 0x00, @@ -47,6 +48,8 @@ typedef struct /*_mqtt_instance_data*/ {  	char** channel;  	int fd; +	uint8_t receive_buffer[MQTT_BUFFER_LENGTH]; +	size_t receive_offset;  } mqtt_instance_data;  //per-channel | 
