diff options
| -rw-r--r-- | backends/Makefile | 8 | ||||
| -rw-r--r-- | backends/mqtt.c | 1005 | ||||
| -rw-r--r-- | backends/mqtt.h | 87 | ||||
| -rw-r--r-- | backends/mqtt.md | 85 | 
4 files changed, 1183 insertions, 2 deletions
diff --git a/backends/Makefile b/backends/Makefile index d815f84..aa9c988 100644 --- a/backends/Makefile +++ b/backends/Makefile @@ -2,9 +2,9 @@  # Backends that can only be built on Linux  LINUX_BACKENDS = midi.so evdev.so  # Backends that can only be built on Windows (mostly due to the .DLL extension) -WINDOWS_BACKENDS = artnet.dll osc.dll loopback.dll sacn.dll maweb.dll winmidi.dll openpixelcontrol.dll rtpmidi.dll wininput.dll visca.dll +WINDOWS_BACKENDS = artnet.dll osc.dll loopback.dll sacn.dll maweb.dll winmidi.dll openpixelcontrol.dll rtpmidi.dll wininput.dll visca.dll mqtt.dll  # Backends that can be built on any platform that can load .SO libraries -BACKENDS = artnet.so osc.so loopback.so sacn.so lua.so maweb.so jack.so openpixelcontrol.so python.so rtpmidi.so visca.so +BACKENDS = artnet.so osc.so loopback.so sacn.so lua.so maweb.so jack.so openpixelcontrol.so python.so rtpmidi.so visca.so mqtt.so  # Backends that require huge dependencies to be installed  OPTIONAL_BACKENDS = ola.so  # Backends that need to be built manually (but still should be included in the clean target) @@ -51,6 +51,10 @@ visca.so: ADDITIONAL_OBJS += $(BACKEND_LIB)  visca.dll: ADDITIONAL_OBJS += $(BACKEND_LIB)  visca.dll: LDLIBS += -lws2_32 +mqtt.so: ADDITIONAL_OBJS += $(BACKEND_LIB) +mqtt.dll: ADDITIONAL_OBJS += $(BACKEND_LIB) +mqtt.dll: LDLIBS += -lws2_32 +  openpixelcontrol.so: ADDITIONAL_OBJS += $(BACKEND_LIB)  openpixelcontrol.dll: ADDITIONAL_OBJS += $(BACKEND_LIB)  openpixelcontrol.dll: LDLIBS += -lws2_32 diff --git a/backends/mqtt.c b/backends/mqtt.c new file mode 100644 index 0000000..36aed03 --- /dev/null +++ b/backends/mqtt.c @@ -0,0 +1,1005 @@ +#define BACKEND_NAME "mqtt" +#define DEBUG + +#include <string.h> +#include <time.h> +#include <math.h> + +#include "libmmbackend.h" +#include "mqtt.h" + +static uint64_t last_maintenance = 0; +/* according to spec 2.2.2.2 */ +static struct { +	uint8_t property; +	uint8_t storage; +} property_lengths[] = { +	{0x01, STORAGE_U8}, +	{0x02, STORAGE_U32}, +	{0x03, STORAGE_PREFIXED}, +	{0x08, STORAGE_PREFIXED}, +	{0x09, STORAGE_PREFIXED}, +	{0x0B, STORAGE_VARINT}, +	{0x11, STORAGE_U32}, + +	{0x12, STORAGE_PREFIXED}, +	{0x13, STORAGE_U16}, +	{0x15, STORAGE_PREFIXED}, +	{0x16, STORAGE_PREFIXED}, +	{0x17, STORAGE_U8}, +	{0x18, STORAGE_U32}, +	{0x19, STORAGE_U8}, +	{0x1A, STORAGE_PREFIXED}, +	{0x1C, STORAGE_PREFIXED}, +	{0x1F, STORAGE_PREFIXED}, +	{0x21, STORAGE_U16}, +	{0x22, STORAGE_U16}, +	{0x23, STORAGE_U16}, +	{0x24, STORAGE_U8}, +	{0x25, STORAGE_U8}, +	{0x26, STORAGE_PREFIXPAIR}, +	{0x27, STORAGE_U32}, +	{0x28, STORAGE_U8}, +	{0x29, STORAGE_U8}, +	{0x2A, STORAGE_U8} +}; + +/* + * TODO + *	* proper RETAIN handling + *	* TLS + *	* JSON subchannels + */ + +MM_PLUGIN_API int init(){ +	backend mqtt = { +		.name = BACKEND_NAME, +		.conf = mqtt_configure, +		.create = mqtt_instance, +		.conf_instance = mqtt_configure_instance, +		.channel = mqtt_channel, +		.handle = mqtt_set, +		.process = mqtt_handle, +		.start = mqtt_start, +		.shutdown = mqtt_shutdown +	}; + +	//register backend +	if(mm_backend_register(mqtt)){ +		LOG("Failed to register backend"); +		return 1; +	} +	return 0; +} + +static int mqtt_parse_hostspec(instance* inst, char* hostspec){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char* host = strchr(hostspec, '@'), *password = NULL, *port = NULL; + +	//mqtt[s]://[username][:password]@host.domain[:port] +	if(!strncmp(hostspec, "mqtt://", 7)){ +		hostspec += 7; +	} +	else if(!strncmp(hostspec, "mqtts://", 8)){ +		data->tls = 1; +		hostspec += 8; +	} + +	if(host){ +		//parse credentials, separate out host spec +		*host = 0; +		host++; + +		password = strchr(hostspec, ':'); +		if(password){ +			//password supplied, store +			*password = 0; +			password++; +			mmbackend_strdup(&(data->password), password); +		} + +		//store username +		mmbackend_strdup(&(data->user), hostspec); +	} +	else{ +		host = hostspec; +	} + +	//parse port if supplied +	port = strchr(host, ':'); +	if(port){ +		*port = 0; +		port++; +		mmbackend_strdup(&(data->port), port); +	} + +	mmbackend_strdup(&(data->host), host); +	return 0; +} + +static int mqtt_generate_instanceid(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char clientid[24] = ""; + +	snprintf(clientid, sizeof(clientid), "MIDIMonster-%d-%s", (uint32_t) time(NULL), inst->name); +	return mmbackend_strdup(&(data->client_id), clientid); +} + +static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ +	size_t value = 0, offset = 0; +	do { +		if(offset >= len){ +			return 0; +		} + +		value |= (buffer[offset] & 0x7F) << (7 * offset); +		offset++; +	} while(buffer[offset - 1] & 0x80); + +	if(result){ +		*result = value; +	} +	return offset; +} + +static size_t mqtt_pop_property(uint8_t* buffer, size_t bytes){ +	size_t length = 0, u; + +	if(bytes){ +		for(u = 0; u < sizeof(property_lengths)/sizeof(property_lengths[0]); u++){ +			if(property_lengths[u].property == buffer[0]){ +				switch(property_lengths[u].storage){ +					case STORAGE_U8: +						return 2; +					case STORAGE_U16: +						return 3; +					case STORAGE_U32: +						return 5; +					case STORAGE_VARINT: +						return mqtt_pop_varint(buffer + 1, bytes - 1, NULL) + 1; +					case STORAGE_PREFIXED: +						if(bytes >= 3){ +							return ((buffer[1] << 8) | buffer[2]) + 1; +						} +						//best-effort guess +						return 3; +					case STORAGE_PREFIXPAIR: +						if(bytes >= 3){ +							length = ((buffer[1] << 8) | buffer[2]); +							if(bytes >= length + 5){ +								return (1 + 2 + length + 2 + ((buffer[length + 3] << 8) | buffer[length + 4])); +							} +							return length + 3; +						} +						//best-effort guess +						return 5; +				} +			} +		} +	} + +	LOGPF("Storage class for property %02X was unknown", buffer[0]); +	return 1; +} + +static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){ +	//implementation conforming to spec 1.5.5 +	size_t offset = 0; +	do { +		buffer[offset] = value % 128; +		value = value / 128; +		if(value){ +			buffer[offset] |= 0x80; +		} +		offset++; +	} while(value); +	return offset; +} + +static size_t mqtt_push_binary(uint8_t* buffer, size_t buffer_length, uint8_t* content, size_t length){ +	if(buffer_length < length + 2 || length > 65535){ +		LOG("Failed to push length-prefixed data blob, buffer size exceeded"); +		return 0; +	} + +	buffer[0] = (length >> 8) & 0xFF; +	buffer[1] = length & 0xFF; + +	memcpy(buffer + 2, content, length); +	return length + 2; +} + +static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* content){ +	//FIXME might want to validate the string for valid UTF-8 +	return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content)); +} + +static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ +	size_t length = 0; +	*data = NULL; + +	if(buffer_length < 2){ +		return 0; +	} + +	length = (buffer[0] << 8) | buffer[1]; +	if(buffer_length >= length + 2){ +		*data = (char*) buffer + 2; +	} +	return length; +} + +static void mqtt_disconnect(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	size_t u; + +	data->last_control = 0; + +	//reset aliases as they can not be reused across sessions +	data->server_max_alias = 0; +	data->current_alias = 1; +	for(u = 0; u < data->nchannels; u++){ +		data->channel[u].topic_alias_sent = 0; +		data->channel[u].topic_alias_rcvd = 0; +	} + +	//unmanage the fd +	mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); + +	close(data->fd); +	data->fd = -1; +} + +static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t* vh, size_t payload_length, uint8_t* payload){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	uint8_t fixed_header[5]; +	size_t offset = 0; + +	//how in the world is it a _fixed_ header if it contains a variable length integer? eh... +	fixed_header[offset++] = type; +	offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); + +	if(mmbackend_send(data->fd, fixed_header, offset) +			|| (vh && vh_length && mmbackend_send(data->fd, vh, vh_length)) +			|| (payload && payload_length && mmbackend_send(data->fd, payload, payload_length))){ +		LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name); +		mqtt_disconnect(inst); +		return 1; +	} + +	data->last_control = mm_timestamp(); +	return 0; +} + +static int mqtt_configure(char* option, char* value){ +	LOG("This backend does not take global configuration"); +	return 1; +} + +static int mqtt_reconnect(instance* inst){ +	uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION_DEFAULT, 0x00 /*flags*/, ((MQTT_KEEPALIVE * 2) >> 8) & 0xFF, (MQTT_KEEPALIVE * 2) & 0xFF}; +	uint8_t payload[MQTT_BUFFER_LENGTH]; +	size_t vh_offset = 10, payload_offset = 0; +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + +	if(!data->host){ +		LOGPF("No host specified for instance %s", inst->name); +		return 2; +	} + +	if(data->fd >= 0){ +		mqtt_disconnect(inst); +	} + +	LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s, Protocol: %s)", +			inst->name, data->host, +			data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), +			data->tls ? "yes " : "no", +			(data->user || data->password) ? "yes" : "no", +			(data->mqtt_version == 0x05) ? "v5" : "v3.1.1"); + +	data->fd = mmbackend_socket(data->host, +			data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), +			SOCK_STREAM, 0, 0, 1); + +	if(data->fd < 0){ +		//retry later +		return 1; +	} + +	//prepare CONNECT message header +	variable_header[6] = data->mqtt_version; +	variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); + +	if(data->mqtt_version == 0x05){ //mqtt v5 has additional options +		//push number of option bytes (as a varint, no less) before actually pushing the option data. +		//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. +		variable_header[vh_offset++] = 8; +		//push maximum packet size option +		variable_header[vh_offset++] = 0x27; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; +		//push topic alias maximum option +		variable_header[vh_offset++] = 0x22; +		variable_header[vh_offset++] = 0xFF; +		variable_header[vh_offset++] = 0xFF; +	} + +	//prepare CONNECT payload +	//push client id +	payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->client_id); +	if(data->user){ +		payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->user); +	} +	if(data->password){ +		payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->password); +	} + +	mqtt_transmit(inst, MSG_CONNECT, vh_offset, variable_header, payload_offset, payload); + +	//register the fd +	if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){ +		LOG("Failed to register FD"); +		return 2; +	} + +	return 0; +} + +static int mqtt_configure_channel(instance* inst, char* option, char* value){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char* next_token = NULL; +	channel* configure = NULL; +	uint8_t mark = 0; +	mqtt_channel_value config = { +		0 +	}; + +	if(!strncmp(value, "range ", 6)){ +		//we support min > max for range configurations +		value += 6; + +		config.min = strtod(value, &next_token); +		if(value == next_token){ +			LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); +			return 1; +		} + +		config.max = strtod(next_token, &value); +		if(value == next_token){ +			LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); +			return 1; +		} +	} +	else if(!strncmp(value, "discrete ", 9)){ +		value += 9; + +		for(; *value && isspace(*value); value++){ +		} +		if(value[0] == '!'){ +			mark = 1; +			value++; +		} +		config.min = clamp(strtod(value, &next_token), 1.0, 0.0); +		value = next_token; + +		for(; *value && isspace(*value); value++){ +		} +		if(value[0] == '!'){ +			mark = 2; +			value++; +		} + +		config.max = clamp(strtod(value, &next_token), 1.0, 0.0); +		value = next_token; +		if(config.max < config.min){ +			LOGPF("Discrete topic configuration for %s.%s has invalid limit ordering", inst->name, option); +			return 1; +		} + +		for(; *value && isspace(*value); value++){ +		} + +		config.discrete = strdup(value); +		config.normal = mark ? ((mark == 1) ? config.min : config.max) : (config.min + (config.max - config.min) / 2); +	} +	else{ +		LOGPF("Unknown instance configuration option or invalid preconfiguration %s on instance %s", option, inst->name); +		return 1; +	} + +	configure = mqtt_channel(inst, option, 0); +	if(!configure +			//if configuring scale, no other config is possible +			|| (!config.discrete && data->channel[configure->ident].values) +			//if configuring discrete, the previous one can't be a a scale +			|| (config.discrete && data->channel[configure->ident].values && !data->channel[configure->ident].value[0].discrete)){ +		LOGPF("Failed to configure topic %s.%s", inst->name, option); +		free(config.discrete); +		return 1; +	} + +	data->channel[configure->ident].value = realloc(data->channel[configure->ident].value, (data->channel[configure->ident].values + 1) * sizeof(mqtt_channel_value)); +	if(!data->channel[configure->ident].value){ +		LOG("Failed to allocate memory"); +		return 1; +	} + +	DBGPF("Configuring value on %s.%s: min %f max %f normal %f discrete %s", inst->name, option, config.min, config.max, config.normal, config.discrete ? config.discrete : "-"); +	data->channel[configure->ident].value[data->channel[configure->ident].values] = config; +	data->channel[configure->ident].values++; +	DBGPF("Value configuration for %s.%s now at %" PRIsize_t " entries", inst->name, option, data->channel[configure->ident].values); +	return 0; +} + +static int mqtt_configure_instance(instance* inst, char* option, char* value){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + +	if(!strcmp(option, "user")){ +		mmbackend_strdup(&(data->user), value); +		return 0; +	} +	else if(!strcmp(option, "password")){ +		mmbackend_strdup(&(data->password), value); +		return 0; +	} +	else if(!strcmp(option, "host")){ +		if(mqtt_parse_hostspec(inst, value)){ +			return 1; +		} +		return 0; +	} +	else if(!strcmp(option, "clientid")){ +		if(strlen(value)){ +			mmbackend_strdup(&(data->client_id), value); +			return 0; +		} +		else{ +			return mqtt_generate_instanceid(inst); +		} +	} +	else if(!strcmp(option, "protocol")){ +		data->mqtt_version = MQTT_VERSION_DEFAULT; +		if(!strcmp(value, "3.1.1")){ +			data->mqtt_version = 4; +		} +		return 0; +	} + +	//try to register as channel preconfig +	return mqtt_configure_channel(inst, option, value); +} + +static int mqtt_push_subscriptions(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	uint8_t variable_header[3] = {0}; +	uint8_t payload[MQTT_BUFFER_LENGTH]; +	size_t u, subs = 0, payload_offset = 0; + +	//FIXME might want to aggregate multiple subscribes into one packet +	for(u = 0; u < data->nchannels; u++){ +		payload_offset = 0; +		if(data->channel[u].flags & mmchannel_input){ +			DBGPF("Subscribing %s.%s, channel %" PRIsize_t ", flags %d", inst->name, data->channel[u].topic, u, data->channel[u].flags); +			variable_header[0] = (data->packet_identifier >> 8) & 0xFF; +			variable_header[1] = (data->packet_identifier) & 0xFF; + +			payload_offset += mqtt_push_utf8(payload + payload_offset, sizeof(payload) - payload_offset, data->channel[u].topic); +			payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0; + +			data->packet_identifier++; +			//zero is not a valid packet identifier +			if(!data->packet_identifier){ +				data->packet_identifier++; +			} + +			mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload); +			subs++; +		} +	} + +	LOGPF("Subscribed %" PRIsize_t " channels on %s", subs, inst->name); +	return 0; +} + +static int mqtt_instance(instance* inst){ +	mqtt_instance_data* data = calloc(1, sizeof(mqtt_instance_data)); + +	if(!data){ +		LOG("Failed to allocate memory"); +		return 1; +	} + +	data->fd = -1; +	data->mqtt_version = MQTT_VERSION_DEFAULT; +	data->packet_identifier = 1; +	data->current_alias = 1; +	inst->impl = data; + +	if(mqtt_generate_instanceid(inst)){ +		return 1; +	} +	return 0; +} + +static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	size_t u; + +	//check spec for compliance +	if(strchr(spec, '+') || strchr(spec, '#')){ +		LOGPF("Invalid character in channel specification %s", spec); +		return NULL; +	} + +	//find matching channel +	for(u = 0; u < data->nchannels; u++){ +		if(!strcmp(spec, data->channel[u].topic)){ +			data->channel[u].flags |= flags; +			DBGPF("Reusing existing channel %" PRIsize_t " for spec %s.%s, flags are now %02X", u, inst->name, spec, data->channel[u].flags); +			break; +		} +	} + +	//allocate new channel +	if(u == data->nchannels){ +		data->channel = realloc(data->channel, (data->nchannels + 1) * sizeof(mqtt_channel_data)); +		if(!data->channel){ +			LOG("Failed to allocate memory"); +			return NULL; +		} + +		data->channel[u].topic = strdup(spec); +		data->channel[u].topic_alias_sent = 0; +		data->channel[u].topic_alias_rcvd = 0; +		data->channel[u].flags = flags; +		data->channel[u].values = 0; +		data->channel[u].value = NULL; + +		if(!data->channel[u].topic){ +			LOG("Failed to allocate memory"); +			return NULL; +		} + +		DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags); +		data->nchannels++; +	} + +	return mm_channel(inst, u, 1); +} + +static int mqtt_maintenance(){ +	size_t n, u; +	instance** inst = NULL; +	mqtt_instance_data* data = NULL; + +	if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ +		LOG("Failed to fetch instance list"); +		return 1; +	} + +	DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); +	for(u = 0; u < n; u++){ +       		data = (mqtt_instance_data*) inst[u]->impl; +		if(data->fd <= 0){ +			if(mqtt_reconnect(inst[u]) >= 2){ +				LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); +				free(inst); +				return 1; +			} +		} +		else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ +			//send keepalive ping requests +			mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); +		} +	} + +	free(inst); +	return 0; +} + +static int mqtt_deserialize(instance* inst, channel* output, mqtt_channel_data* input, char* buffer, size_t length){ +	char* next_token = NULL, conversion_buffer[1024] = {0}; +	channel_value val; +	double range, raw; +	size_t u; +	//FIXME implement json subchannels + +	//unconfigured channel +	if(!input->values){ +		//the original buffer is the result of an unterminated receive, move it over +		memcpy(conversion_buffer, buffer, length); +		val.normalised = clamp(strtod(conversion_buffer, &next_token), 1.0, 0.0); +		if(conversion_buffer == next_token){ +			LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); +			return 1; +		} +	} +	//ranged channel +	else if(!input->value[0].discrete){ +		memcpy(conversion_buffer, buffer, length); +		raw = clamp(strtod(conversion_buffer, &next_token), max(input->value[0].max, input->value[0].min), min(input->value[0].max, input->value[0].min)); +		if(conversion_buffer == next_token){ +			LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); +			return 1; +		} +		range = fabs(input->value[0].max - input->value[0].min); +		val.normalised = (raw - input->value[0].min) / range; +		if(input->value[0].max < input->value[0].min){ +			val.normalised = fabs(val.normalised); +		} +	} +	else{ +		for(u = 0; u < input->values; u++){ +			if(length == strlen(input->value[u].discrete) +					&& !strncmp(input->value[u].discrete, buffer, length)){ +				val.normalised = input->value[u].normal; +				break; +			} +		} + +		if(u == input->values){ +			LOGPF("Failed to parse incoming data for %s.%s, no matching discrete token", inst->name, input->topic); +			return 1; +		} +	} + +	val.normalised = clamp(val.normalised, 1.0, 0.0); +	mm_channel_event(output, val); +	return 0; +} + +static size_t mqtt_serialize(instance* inst, mqtt_channel_data* input, char* output, size_t length, double value){ +	double range; +	size_t u, invert = 0; + +	//unconfigured channel +	if(!input->values){ +		return snprintf(output, length, "%f", value); +	} +	//ranged channel +	else if(!input->value[0].discrete){ +		range = fabs(input->value[0].max - input->value[0].min); +		if(input->value[0].max < input->value[0].min){ +			invert = 1; +		} +		return snprintf(output, length, "%f", (value * range) * (invert ? -1 : 1) + input->value[0].min); +	} +	else{ +		for(u = 0; u < input->values; u++){ +			if(input->value[u].min <= value +					&& input->value[u].max >= value){ +				memcpy(output, input->value[u].discrete, min(strlen(input->value[u].discrete), length)); +				return min(strlen(input->value[u].discrete), length); +			} +		} +	} + +	LOGPF("No discrete value on %s.%s defined for normalized value %f", inst->name, input->topic, value); +	return 0; +} + +static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	uint8_t variable_header[MQTT_BUFFER_LENGTH]; +	uint8_t payload[MQTT_BUFFER_LENGTH], alias_assigned = 0; +	size_t vh_length = 0, payload_length = 0, u; + +	for(u = 0; u < num; u++){ +		vh_length = payload_length = alias_assigned = 0; + +		if(data->mqtt_version == 0x05){ +			if(data->channel[c[u]->ident].topic_alias_sent){ +				//push zero-length topic +				variable_header[vh_length++] = 0; +				variable_header[vh_length++] = 0; +			} +			else{ +				//push topic +				vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); +				//generate topic alias if possible +				if(data->current_alias <= data->server_max_alias){ +					data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; +					DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + +					alias_assigned = 1; +				} +			} + +			//push property length +			variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent) ? 5 : 2; + +			//push payload type (0x01) +			variable_header[vh_length++] = 0x01; +			variable_header[vh_length++] = 1; + +			if(data->channel[c[u]->ident].topic_alias_sent){ +				//push topic alias (0x23) +				variable_header[vh_length++] = 0x23; +				variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; +				variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; +			} + +			payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) (payload + 2), sizeof(payload) - 2, v[u].normalised); +			if(payload_length){ +				payload[0] = (payload_length >> 8) & 0xFF; +				payload[1] = payload_length & 0xFF; +				payload_length += 2; +			} +		} +		else{ +			//push topic +			vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); +			if(data->mqtt_version == 0x05){ +				//push property length +				variable_header[vh_length++] = 2; + +				//push payload type (0x01) +				variable_header[vh_length++] = 0x01; +				variable_header[vh_length++] = 1; +			} +			payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) payload, sizeof(payload), v[u].normalised); +		} + +		if(payload_length){ +			DBGPF("Transmitting %" PRIsize_t " bytes for %s", payload_length, inst->name); +			mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); +		} +		else if(alias_assigned){ +			//undo alias assignment +			data->channel[c[u]->ident].topic_alias_sent = 0; +			data->current_alias--; +		} +	} + +	return 0; +} + +static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char* topic = NULL, *payload = NULL; +	channel* changed = NULL; +	uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; +	uint16_t topic_alias = 0; +	uint32_t property_length = 0; +	size_t u = data->nchannels, property_offset, payload_offset, payload_length; +	size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); + +	property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0); +	if(data->mqtt_version == 0x05){ +		//read properties length +		payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); +		payload_offset += property_length; + +		property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); +		//parse properties +		while(property_offset < payload_offset){ +			DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length); +			//read payload format indicator +			if(variable_header[property_offset] == 0x01){ +				content_utf8 = variable_header[property_offset + 1]; +			} +			//read topic alias +			else if(variable_header[property_offset] == 0x23){ +				topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; +			} + +			property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); +		} +	} + +	//match via topic alias +	if(!topic_length && topic_alias){ +		for(u = 0; u < data->nchannels; u++){ +			if(data->channel[u].topic_alias_rcvd == topic_alias){ +				break; +			} +		} +	} +	//match via topic +	else if(topic_length){ +		for(u = 0; u < data->nchannels; u++){ +			if(!strncmp(data->channel[u].topic, topic, topic_length)){ +				break; +			} +		} + +		if(topic_alias){ +			data->channel[u].topic_alias_rcvd = topic_alias; +		} +	} + +	if(content_utf8){ +		payload_length = mqtt_pop_utf8(variable_header + payload_offset, length - payload_offset, &payload); +	} +	else{ +		payload_length = length - payload_offset; +		payload = (char*) (variable_header + payload_offset); +	} + +	if(u != data->nchannels && payload_length && payload){ +		DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); +		changed = mm_channel(inst, u, 0); +		if(changed){ +			mqtt_deserialize(inst, changed, data->channel + u, payload, payload_length); +		} +	} +	return 0; +} + +static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	size_t property_offset = 2; + +	if(length >= 2){ +		if(variable_header[1]){ +			if(variable_header[1] == 1 && data->mqtt_version == 0x05){ +				LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); +				data->mqtt_version = 0x04; +				return 0; +			} +			LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); +			mqtt_disconnect(inst); +			return 0; +		} + +		//parse response properties if present +		if(data->mqtt_version == 0x05){ +			property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); +			while(property_offset < length){ +				DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); + +				//read maximum topic alias +				if(variable_header[property_offset] == 0x22){ +					data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; +					DBGPF("Connection supports maximum connection alias %" PRIu16, data->server_max_alias); +				} + +				property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); +			} +		} + +		LOGPF("Connection on %s established", inst->name); +		return mqtt_push_subscriptions(inst); +	} + +	LOGPF("Received malformed CONNACK on %s", inst->name); +	return 1; +} + +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	switch(type){ +		case MSG_CONNACK: +			return mqtt_handle_connack(inst, type, variable_header, length); +		case MSG_PINGRESP: +		case MSG_SUBACK: +			//ignore most responses +			//FIXME error check SUBACK +			break; +		default: +			if((type & 0xF0) == MSG_PUBLISH){ +				return mqtt_handle_publish(inst, type, variable_header, length); +			} +			LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); +	} +	return 0; +} + +static int mqtt_handle_fd(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; +	uint32_t message_length = 0, header_length = 0; + +	bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); +	if(bytes_read < 0){ +		LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno)); +		return 1; +	} +	else if(bytes_read == 0){ +		//disconnected, try to reconnect +		LOGPF("Instance %s disconnected, reconnection queued", inst->name); +		mqtt_disconnect(inst); +		return 1; +	} + +	DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); +	data->receive_offset += bytes_read; + +	while(data->receive_offset >= 2){ +		//check for complete message +		header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); +		if(header_length && data->receive_offset >= message_length + header_length + 1){ +			DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); +			if(mqtt_handle_message(inst, data->receive_buffer[0], data->receive_buffer + header_length + 1, message_length)){ +				//TODO handle failures properly +			} + +			//remove handled message +			if(data->receive_offset > message_length + header_length + 1){ +				memmove(data->receive_buffer, data->receive_buffer + message_length + header_length + 1, data->receive_offset - (message_length + header_length + 1)); +			} +			data->receive_offset -= message_length + header_length + 1; +		} +		else{ +			break; +		} +	} + +	return 0; +} + +static int mqtt_handle(size_t num, managed_fd* fds){ +	size_t n = 0; + +	for(n = 0; n < num; n++){ +		if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ +			//propagate critical failures +			return 1; +		} +	} + +	//keepalive/reconnect processing +	if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ +		if(mqtt_maintenance()){ +			return 1; +		} +		last_maintenance = mm_timestamp(); +	} + +	return 0; +} + +static int mqtt_start(size_t n, instance** inst){ +	size_t u = 0, fds = 0; + +	for(u = 0; u < n; u++){ +		switch(mqtt_reconnect(inst[u])){ +			case 1: +				LOGPF("Failed to connect to host for instance %s, will be retried", inst[u]->name); +				break; +			case 2: +				LOGPF("Failed to connect to host for instance %s, aborting", inst[u]->name); +				return 1; +			default: +				fds++; +				break; +		} +	} +	LOGPF("Registered %" PRIsize_t " descriptors to core", fds); + +	//initialize maintenance timer +	last_maintenance = mm_timestamp(); +	return 0; +} + +static int mqtt_shutdown(size_t n, instance** inst){ +	size_t u, p, v; +	mqtt_instance_data* data = NULL; + +	for(u = 0; u < n; u++){ +		data = (mqtt_instance_data*) inst[u]->impl; +		mqtt_disconnect(inst[u]); + +		for(p = 0; p < data->nchannels; p++){ +			for(v = 0; v < data->channel[p].values; v++){ +				free(data->channel[p].value[v].discrete); +			} +			free(data->channel[p].value); +			free(data->channel[p].topic); +		} +		free(data->channel); +		free(data->host); +		free(data->port); +		free(data->user); +		free(data->password); +		free(data->client_id); + +		free(inst[u]->impl); +		inst[u]->impl = NULL; +	} + +	LOG("Backend shut down"); +	return 0; +} diff --git a/backends/mqtt.h b/backends/mqtt.h new file mode 100644 index 0000000..c684f99 --- /dev/null +++ b/backends/mqtt.h @@ -0,0 +1,87 @@ +#include "midimonster.h" + +MM_PLUGIN_API int init(); +static int mqtt_configure(char* option, char* value); +static int mqtt_configure_instance(instance* inst, char* option, char* value); +static int mqtt_instance(instance* inst); +static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags); +static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v); +static int mqtt_handle(size_t num, managed_fd* fds); +static int mqtt_start(size_t n, instance** inst); +static int mqtt_shutdown(size_t n, instance** inst); + +#define MQTT_PORT "1883" +#define MQTT_TLS_PORT "8883" +#define MQTT_BUFFER_LENGTH 8192 +#define MQTT_KEEPALIVE 10  +#define MQTT_VERSION_DEFAULT 0x05 + +#define MQTT5_NO_LOCAL 0x04 + +enum /*_mqtt_property_storage_classes*/ { +	STORAGE_U8, +	STORAGE_U16, +	STORAGE_U32, +	STORAGE_VARINT, +	STORAGE_PREFIXED, +	STORAGE_PREFIXPAIR +}; + +enum { +	MSG_RESERVED = 0x00, +	MSG_CONNECT = 0x10, +	MSG_CONNACK = 0x20, +	MSG_PUBLISH = 0x30, +	MSG_PUBACK = 0x40, +	MSG_PUBREC = 0x50, +	MSG_PUBREL = 0x60, +	MSG_PUBCOMP = 0x70, +	MSG_SUBSCRIBE = 0x82, +	MSG_SUBACK = 0x90, +	MSG_UNSUBSCRIBE = 0xA0, +	MSG_UNSUBACK = 0xB0, +	MSG_PINGREQ = 0xC0, +	MSG_PINGRESP = 0xD0, +	MSG_DISCONNECT = 0xE0, +	MSG_AUTH = 0xF0 +}; + +typedef struct /*_mqtt_value_mapping*/ { +	double min; +	double max; +	double normal; +	char* discrete; +} mqtt_channel_value; + +typedef struct /*_mqtt_channel*/ { +	char* topic; +	uint16_t topic_alias_sent; +	uint16_t topic_alias_rcvd; +	uint8_t flags; + +	size_t values; +	mqtt_channel_value* value; +} mqtt_channel_data; + +typedef struct /*_mqtt_instance_data*/ { +	uint8_t tls; +	char* host; +	char* port; +	uint8_t mqtt_version; + +	char* user; +	char* password; +	char* client_id; + +	size_t nchannels; +	mqtt_channel_data* channel; + +	int fd; +	uint8_t receive_buffer[MQTT_BUFFER_LENGTH]; +	size_t receive_offset; + +	uint64_t last_control; +	uint16_t packet_identifier; +	uint16_t server_max_alias; +	uint16_t current_alias; +} mqtt_instance_data; diff --git a/backends/mqtt.md b/backends/mqtt.md new file mode 100644 index 0000000..6623438 --- /dev/null +++ b/backends/mqtt.md @@ -0,0 +1,85 @@ +### The `mqtt` backend + +This backend provides input from and output to an message queueing telemetry transport (MQTT) +broker. The MQTT protocol is used in lightweight sensor/actor applications, a wide selection +of smart home implementations and as a generic message bus in many other domains. + +The backend implements both the older protocol version MQTT v3.1.1 as well as the current specification +for MQTT v5.0. + +#### Global configuration + +This backend does not take any global configuration. + +#### Instance configuration + +| Option	| Example value		| Default value 	| Description				| +|---------------|-----------------------|-----------------------|---------------------------------------| +| `host`	| `mqtt://10.23.23.1`	| none			| Host or URI of the MQTT broker	| +| `user`	| `midimonster`		| none			| User name for broker authentication	| +| `password`	| `mm`			| none			| Password for broker authentication	| +| `clientid`	| `MM-main`		| random		| MQTT client identifier (generated randomly at start) | +| `protocol`	| `3.1.1`		| `5`			| MQTT protocol version (`5` or `3.1.1`) to use for the connection | + +The `host` option can be specified as an URI of the form `mqtt[s]://[username][:password]@host.domain[:port]`. +This allows specifying all necessary settings in one configuration option. + +#### Data exchange format + +The MQTT protocol places very few restrictions on the exchanged data. Thus, it is necessary to specify the input +and output data formats accepted respectively output by the MIDIMonster. + +The basic format, without further channel-specific configuration, is an ASCII/UTF-8 string representing a floating +point number between `0.0` and `1.0`. The MIDIMonster will read these and use the value as the normalized event value. + +Channels may be specified to use a different value range or even freeform discrete values by preconfiguring +the channels in the instance configuration section. This is done by specifying options of the form + +``` +<channel> = range <min> <max> +<channel> = discrete [!]<min> [!]<max> <value> +``` + +Example configurations: +``` +/a/topic = range -10 10 +/another/topic = discrete !0.0 0.5 off +/another/topic = discrete 0.5 !1.0 on +``` + +Note that there may be only one range configuration per topic, but there may be multiple discrete configurations. + +The first channel preconfiguration example will change the channel value scale to values between `-10` and `10`. +For input channels, this sets the normalization range. The MIDIMonster will normalize the input value according to the scale. +For output channels, this sets the output scaling factors. + +The second and third channel preconfigurations define two discrete values (`on` and `off`) with accompanying normalized +values. For input channels, the normalized channel value for a discrete input will be the value marked with an exclamation mark `!`. +For output channels, the output will be the first discrete value for which the range between `<min>` and `<max>` contains +the normalized channel value. + +These examples mean +* For `/a/topic`, when mapped as input, the input value `5.0` will generate a normalized event value of `0.75`. +* For `/a/topic`, when mapped as output, a normalized event value `0.25` will generate an output of `-5.0`. +* For `/another/topic`, when mapped as an input, the input value `off` will generate a normalized event value of `0.0`. +* For `/another/topic`, when mapped as an output, a normalized event value of `0.75` will generate an output of `on`. + +Values above the maximum or below the minimum will be clamped. The MIDIMonster will not output values out of the +configured bounds. + +#### Channel specification + +A channel specification may be any MQTT topic designator not containing the wildcard characters `+` and `#`. + +Example mapping:  +``` +mq1./midimonster/in > mq2./midimonster/out +``` + +#### Known bugs / problems + +If the connection to a server is lost, the connection will be retried in approximately 10 seconds. +If the server rejects the connection with reason code `0x01`, a protocol failure is assumed. If the initial +connection was made with `MQTT v5.0`, it is retried with the older protocol version `MQTT v3.1.1`. + +Support for TLS-secured connections is planned, but not yet implemented.  | 
