diff options
| -rw-r--r-- | backends/mqtt.c | 296 | ||||
| -rw-r--r-- | backends/mqtt.h | 11 | ||||
| -rw-r--r-- | backends/mqtt.md | 38 | 
3 files changed, 282 insertions, 63 deletions
| diff --git a/backends/mqtt.c b/backends/mqtt.c index 41fa772..36aed03 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -3,6 +3,7 @@  #include <string.h>  #include <time.h> +#include <math.h>  #include "libmmbackend.h"  #include "mqtt.h" @@ -46,8 +47,6 @@ static struct {  /*   * TODO   *	* proper RETAIN handling - *	* mqtt v3.1.1 local filtering - *	* modifiable output mappings   *	* TLS   *	* JSON subchannels   */ @@ -218,7 +217,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten  static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){  	size_t length = 0;  	*data = NULL; -	 +  	if(buffer_length < 2){  		return 0;  	} @@ -349,6 +348,92 @@ static int mqtt_reconnect(instance* inst){  	return 0;  } +static int mqtt_configure_channel(instance* inst, char* option, char* value){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char* next_token = NULL; +	channel* configure = NULL; +	uint8_t mark = 0; +	mqtt_channel_value config = { +		0 +	}; + +	if(!strncmp(value, "range ", 6)){ +		//we support min > max for range configurations +		value += 6; + +		config.min = strtod(value, &next_token); +		if(value == next_token){ +			LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); +			return 1; +		} + +		config.max = strtod(next_token, &value); +		if(value == next_token){ +			LOGPF("Failed to parse range preconfiguration for topic %s.%s", inst->name, option); +			return 1; +		} +	} +	else if(!strncmp(value, "discrete ", 9)){ +		value += 9; + +		for(; *value && isspace(*value); value++){ +		} +		if(value[0] == '!'){ +			mark = 1; +			value++; +		} +		config.min = clamp(strtod(value, &next_token), 1.0, 0.0); +		value = next_token; + +		for(; *value && isspace(*value); value++){ +		} +		if(value[0] == '!'){ +			mark = 2; +			value++; +		} + +		config.max = clamp(strtod(value, &next_token), 1.0, 0.0); +		value = next_token; +		if(config.max < config.min){ +			LOGPF("Discrete topic configuration for %s.%s has invalid limit ordering", inst->name, option); +			return 1; +		} + +		for(; *value && isspace(*value); value++){ +		} + +		config.discrete = strdup(value); +		config.normal = mark ? ((mark == 1) ? config.min : config.max) : (config.min + (config.max - config.min) / 2); +	} +	else{ +		LOGPF("Unknown instance configuration option or invalid preconfiguration %s on instance %s", option, inst->name); +		return 1; +	} + +	configure = mqtt_channel(inst, option, 0); +	if(!configure +			//if configuring scale, no other config is possible +			|| (!config.discrete && data->channel[configure->ident].values) +			//if configuring discrete, the previous one can't be a a scale +			|| (config.discrete && data->channel[configure->ident].values && !data->channel[configure->ident].value[0].discrete)){ +		LOGPF("Failed to configure topic %s.%s", inst->name, option); +		free(config.discrete); +		return 1; +	} + +	data->channel[configure->ident].value = realloc(data->channel[configure->ident].value, (data->channel[configure->ident].values + 1) * sizeof(mqtt_channel_value)); +	if(!data->channel[configure->ident].value){ +		LOG("Failed to allocate memory"); +		return 1; +	} + +	DBGPF("Configuring value on %s.%s: min %f max %f normal %f discrete %s", inst->name, option, config.min, config.max, config.normal, config.discrete ? config.discrete : "-"); +	data->channel[configure->ident].value[data->channel[configure->ident].values] = config; +	data->channel[configure->ident].values++; +	DBGPF("Value configuration for %s.%s now at %" PRIsize_t " entries", inst->name, option, data->channel[configure->ident].values); +	return 0; +} +  static int mqtt_configure_instance(instance* inst, char* option, char* value){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -383,8 +468,8 @@ static int mqtt_configure_instance(instance* inst, char* option, char* value){  		return 0;  	} -	LOGPF("Unknown instance configuration option %s on instance %s", option, inst->name); -	return 1; +	//try to register as channel preconfig +	return mqtt_configure_channel(inst, option, value);  }  static int mqtt_push_subscriptions(instance* inst){ @@ -470,12 +555,14 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){  		data->channel[u].topic_alias_sent = 0;  		data->channel[u].topic_alias_rcvd = 0;  		data->channel[u].flags = flags; +		data->channel[u].values = 0; +		data->channel[u].value = NULL;  		if(!data->channel[u].topic){  			LOG("Failed to allocate memory");  			return NULL;  		} -		 +  		DBGPF("Allocated channel %" PRIsize_t " for spec %s.%s, flags are %02X", u, inst->name, spec, data->channel[u].flags);  		data->nchannels++;  	} @@ -483,15 +570,125 @@ static channel* mqtt_channel(instance* inst, char* spec, uint8_t flags){  	return mm_channel(inst, u, 1);  } +static int mqtt_maintenance(){ +	size_t n, u; +	instance** inst = NULL; +	mqtt_instance_data* data = NULL; + +	if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ +		LOG("Failed to fetch instance list"); +		return 1; +	} + +	DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); +	for(u = 0; u < n; u++){ +       		data = (mqtt_instance_data*) inst[u]->impl; +		if(data->fd <= 0){ +			if(mqtt_reconnect(inst[u]) >= 2){ +				LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); +				free(inst); +				return 1; +			} +		} +		else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ +			//send keepalive ping requests +			mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); +		} +	} + +	free(inst); +	return 0; +} + +static int mqtt_deserialize(instance* inst, channel* output, mqtt_channel_data* input, char* buffer, size_t length){ +	char* next_token = NULL, conversion_buffer[1024] = {0}; +	channel_value val; +	double range, raw; +	size_t u; +	//FIXME implement json subchannels + +	//unconfigured channel +	if(!input->values){ +		//the original buffer is the result of an unterminated receive, move it over +		memcpy(conversion_buffer, buffer, length); +		val.normalised = clamp(strtod(conversion_buffer, &next_token), 1.0, 0.0); +		if(conversion_buffer == next_token){ +			LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); +			return 1; +		} +	} +	//ranged channel +	else if(!input->value[0].discrete){ +		memcpy(conversion_buffer, buffer, length); +		raw = clamp(strtod(conversion_buffer, &next_token), max(input->value[0].max, input->value[0].min), min(input->value[0].max, input->value[0].min)); +		if(conversion_buffer == next_token){ +			LOGPF("Failed to parse incoming data for %s.%s", inst->name, input->topic); +			return 1; +		} +		range = fabs(input->value[0].max - input->value[0].min); +		val.normalised = (raw - input->value[0].min) / range; +		if(input->value[0].max < input->value[0].min){ +			val.normalised = fabs(val.normalised); +		} +	} +	else{ +		for(u = 0; u < input->values; u++){ +			if(length == strlen(input->value[u].discrete) +					&& !strncmp(input->value[u].discrete, buffer, length)){ +				val.normalised = input->value[u].normal; +				break; +			} +		} + +		if(u == input->values){ +			LOGPF("Failed to parse incoming data for %s.%s, no matching discrete token", inst->name, input->topic); +			return 1; +		} +	} + +	val.normalised = clamp(val.normalised, 1.0, 0.0); +	mm_channel_event(output, val); +	return 0; +} + +static size_t mqtt_serialize(instance* inst, mqtt_channel_data* input, char* output, size_t length, double value){ +	double range; +	size_t u, invert = 0; + +	//unconfigured channel +	if(!input->values){ +		return snprintf(output, length, "%f", value); +	} +	//ranged channel +	else if(!input->value[0].discrete){ +		range = fabs(input->value[0].max - input->value[0].min); +		if(input->value[0].max < input->value[0].min){ +			invert = 1; +		} +		return snprintf(output, length, "%f", (value * range) * (invert ? -1 : 1) + input->value[0].min); +	} +	else{ +		for(u = 0; u < input->values; u++){ +			if(input->value[u].min <= value +					&& input->value[u].max >= value){ +				memcpy(output, input->value[u].discrete, min(strlen(input->value[u].discrete), length)); +				return min(strlen(input->value[u].discrete), length); +			} +		} +	} + +	LOGPF("No discrete value on %s.%s defined for normalized value %f", inst->name, input->topic, value); +	return 0; +} +  static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;  	uint8_t variable_header[MQTT_BUFFER_LENGTH]; -	uint8_t payload[MQTT_BUFFER_LENGTH]; -	size_t vh_length = 0, payload_length = 0; -	size_t u; +	uint8_t payload[MQTT_BUFFER_LENGTH], alias_assigned = 0; +	size_t vh_length = 0, payload_length = 0, u;  	for(u = 0; u < num; u++){ -		vh_length = payload_length = 0; +		vh_length = payload_length = alias_assigned = 0;  		if(data->mqtt_version == 0x05){  			if(data->channel[c[u]->ident].topic_alias_sent){ @@ -506,6 +703,8 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  				if(data->current_alias <= data->server_max_alias){  					data->channel[c[u]->ident].topic_alias_sent = data->current_alias++;  					DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); + +					alias_assigned = 1;  				}  			} @@ -523,10 +722,12 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  				variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF;  			} -			payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); -			payload[0] = (payload_length >> 8) & 0xFF; -			payload[1] = payload_length & 0xFF; -			payload_length += 2; +			payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) (payload + 2), sizeof(payload) - 2, v[u].normalised); +			if(payload_length){ +				payload[0] = (payload_length >> 8) & 0xFF; +				payload[1] = payload_length & 0xFF; +				payload_length += 2; +			}  		}  		else{  			//push topic @@ -539,50 +740,27 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  				variable_header[vh_length++] = 0x01;  				variable_header[vh_length++] = 1;  			} -			payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); +			payload_length = mqtt_serialize(inst, data->channel + c[u]->ident, (char*) payload, sizeof(payload), v[u].normalised);  		} -		mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload); -	} - -	return 0; -} - -static int mqtt_maintenance(){ -	size_t n, u; -	instance** inst = NULL; -	mqtt_instance_data* data = NULL; - -	if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ -		LOG("Failed to fetch instance list"); -		return 1; -	} - -	DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); -	for(u = 0; u < n; u++){ -       		data = (mqtt_instance_data*) inst[u]->impl; -		if(data->fd <= 0){ -			if(mqtt_reconnect(inst[u]) >= 2){ -				LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); -				free(inst); -				return 1; -			} +		if(payload_length){ +			DBGPF("Transmitting %" PRIsize_t " bytes for %s", payload_length, inst->name); +			mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);  		} -		else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ -			//send keepalive ping requests -			mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); +		else if(alias_assigned){ +			//undo alias assignment +			data->channel[c[u]->ident].topic_alias_sent = 0; +			data->current_alias--;  		}  	} -	free(inst);  	return 0;  }  static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; -	char* topic = NULL, *payload = NULL, *conversion_end = NULL; +	char* topic = NULL, *payload = NULL;  	channel* changed = NULL; -	channel_value val;  	uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0;  	uint16_t topic_alias = 0;  	uint32_t property_length = 0; @@ -643,16 +821,9 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h  	if(u != data->nchannels && payload_length && payload){  		DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); -		//FIXME implement json subchannels -		//FIXME implement input mappings  		changed = mm_channel(inst, u, 0);  		if(changed){ -			val.normalised = clamp(strtod(payload, &conversion_end), 1.0, 0.0); -			if(payload == conversion_end){ -				LOGPF("Failed to parse incoming data for %s.%s", inst->name, data->channel[u].topic); -				return 0; -			} -			mm_channel_event(changed, val); +			mqtt_deserialize(inst, changed, data->channel + u, payload, payload_length);  		}  	}  	return 0; @@ -679,7 +850,7 @@ static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_h  			property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL);  			while(property_offset < length){  				DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); -		 +  				//read maximum topic alias  				if(variable_header[property_offset] == 0x22){  					data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; @@ -736,9 +907,8 @@ static int mqtt_handle_fd(instance* inst){  	DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read);  	data->receive_offset += bytes_read; -	//TODO loop this while at least one unhandled message is in the buffer -	//check for complete message -	if(data->receive_offset >= 2){ +	while(data->receive_offset >= 2){ +		//check for complete message  		header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length);  		if(header_length && data->receive_offset >= message_length + header_length + 1){  			DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); @@ -752,6 +922,9 @@ static int mqtt_handle_fd(instance* inst){  			}  			data->receive_offset -= message_length + header_length + 1;  		} +		else{ +			break; +		}  	}  	return 0; @@ -802,7 +975,7 @@ static int mqtt_start(size_t n, instance** inst){  }  static int mqtt_shutdown(size_t n, instance** inst){ -	size_t u, p; +	size_t u, p, v;  	mqtt_instance_data* data = NULL;  	for(u = 0; u < n; u++){ @@ -810,6 +983,10 @@ static int mqtt_shutdown(size_t n, instance** inst){  		mqtt_disconnect(inst[u]);  		for(p = 0; p < data->nchannels; p++){ +			for(v = 0; v < data->channel[p].values; v++){ +				free(data->channel[p].value[v].discrete); +			} +			free(data->channel[p].value);  			free(data->channel[p].topic);  		}  		free(data->channel); @@ -817,6 +994,7 @@ static int mqtt_shutdown(size_t n, instance** inst){  		free(data->port);  		free(data->user);  		free(data->password); +		free(data->client_id);  		free(inst[u]->impl);  		inst[u]->impl = NULL; diff --git a/backends/mqtt.h b/backends/mqtt.h index d40e83d..c684f99 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -46,12 +46,21 @@ enum {  	MSG_AUTH = 0xF0  }; -//qos, subscribe +typedef struct /*_mqtt_value_mapping*/ { +	double min; +	double max; +	double normal; +	char* discrete; +} mqtt_channel_value; +  typedef struct /*_mqtt_channel*/ {  	char* topic;  	uint16_t topic_alias_sent;  	uint16_t topic_alias_rcvd;  	uint8_t flags; + +	size_t values; +	mqtt_channel_value* value;  } mqtt_channel_data;  typedef struct /*_mqtt_instance_data*/ { diff --git a/backends/mqtt.md b/backends/mqtt.md index 895831c..6623438 100644 --- a/backends/mqtt.md +++ b/backends/mqtt.md @@ -30,10 +30,42 @@ The MQTT protocol places very few restrictions on the exchanged data. Thus, it i  and output data formats accepted respectively output by the MIDIMonster.  The basic format, without further channel-specific configuration, is an ASCII/UTF-8 string representing a floating -point number between `0.0` and `1.0`. The MIDIMonster will read these and use the value as the normalised event value. +point number between `0.0` and `1.0`. The MIDIMonster will read these and use the value as the normalized event value. -Values above the maximum or below the minimum will be clamped. The MIDIMonster will not output values out of those -bounds. +Channels may be specified to use a different value range or even freeform discrete values by preconfiguring +the channels in the instance configuration section. This is done by specifying options of the form + +``` +<channel> = range <min> <max> +<channel> = discrete [!]<min> [!]<max> <value> +``` + +Example configurations: +``` +/a/topic = range -10 10 +/another/topic = discrete !0.0 0.5 off +/another/topic = discrete 0.5 !1.0 on +``` + +Note that there may be only one range configuration per topic, but there may be multiple discrete configurations. + +The first channel preconfiguration example will change the channel value scale to values between `-10` and `10`. +For input channels, this sets the normalization range. The MIDIMonster will normalize the input value according to the scale. +For output channels, this sets the output scaling factors. + +The second and third channel preconfigurations define two discrete values (`on` and `off`) with accompanying normalized +values. For input channels, the normalized channel value for a discrete input will be the value marked with an exclamation mark `!`. +For output channels, the output will be the first discrete value for which the range between `<min>` and `<max>` contains +the normalized channel value. + +These examples mean +* For `/a/topic`, when mapped as input, the input value `5.0` will generate a normalized event value of `0.75`. +* For `/a/topic`, when mapped as output, a normalized event value `0.25` will generate an output of `-5.0`. +* For `/another/topic`, when mapped as an input, the input value `off` will generate a normalized event value of `0.0`. +* For `/another/topic`, when mapped as an output, a normalized event value of `0.75` will generate an output of `on`. + +Values above the maximum or below the minimum will be clamped. The MIDIMonster will not output values out of the +configured bounds.  #### Channel specification | 
