diff options
Diffstat (limited to 'backends/mqtt.c')
| -rw-r--r-- | backends/mqtt.c | 118 | 
1 files changed, 117 insertions, 1 deletions
| diff --git a/backends/mqtt.c b/backends/mqtt.c index 00045c0..8bff531 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -8,6 +8,40 @@  #include "mqtt.h"  static uint64_t last_maintenance = 0; +/* according to spec 2.2.2.2 */ +static struct { +	uint8_t property; +	uint8_t storage; +} property_lengths[] = { +	{0x01, STORAGE_U8}, +	{0x02, STORAGE_U32}, +	{0x03, STORAGE_PREFIXED}, +	{0x08, STORAGE_PREFIXED}, +	{0x09, STORAGE_PREFIXED}, +	{0x0B, STORAGE_VARINT}, +	{0x11, STORAGE_U32}, + +	{0x12, STORAGE_PREFIXED}, +	{0x13, STORAGE_U16}, +	{0x15, STORAGE_PREFIXED}, +	{0x16, STORAGE_PREFIXED}, +	{0x17, STORAGE_U8}, +	{0x18, STORAGE_U32}, +	{0x19, STORAGE_U8}, +	{0x1A, STORAGE_PREFIXED}, +	{0x1C, STORAGE_PREFIXED}, +	{0x1F, STORAGE_PREFIXED}, +	{0x21, STORAGE_U16}, +	{0x22, STORAGE_U16}, +	{0x23, STORAGE_U16}, +	{0x24, STORAGE_U8}, +	{0x25, STORAGE_U8}, +	{0x26, STORAGE_PREFIXPAIR}, +	{0x27, STORAGE_U32}, +	{0x28, STORAGE_U8}, +	{0x29, STORAGE_U8}, +	{0x2A, STORAGE_U8} +};  /*   * TODO @@ -15,6 +49,8 @@ static uint64_t last_maintenance = 0;   *	* use topic aliases if possible   *	* mqtt v3.1.1 local filtering   *	* modifiable output mappings + *	* TLS + *	* JSON subchannels   */  MM_PLUGIN_API int init(){ @@ -91,7 +127,7 @@ static int mqtt_generate_instanceid(instance* inst){  	return mmbackend_strdup(&(data->client_id), clientid);  } -static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ +static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){  	size_t value = 0, offset = 0;  	do {  		if(offset >= len){ @@ -138,6 +174,21 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten  	return mqtt_push_binary(buffer, buffer_length, (uint8_t*) content, strlen(content));  } +static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){ +	size_t length = 0; +	*data = NULL; +	 +	if(buffer_length < 2){ +		return 0; +	} + +	length = (buffer[0] << 8) | buffer[1]; +	if(buffer_length >= length + 2){ +		*data = (char*) buffer + 2; +	} +	return length; +} +  static void mqtt_disconnect(instance* inst){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;  	data->last_control = 0; @@ -305,6 +356,11 @@ static int mqtt_push_subscriptions(instance* inst){  			payload[payload_offset++] = (data->mqtt_version == 0x05) ? MQTT5_NO_LOCAL : 0;  			data->packet_identifier++; +			//zero is not a valid packet identifier +			if(!data->packet_identifier){ +				data->packet_identifier++; +			} +  			mqtt_transmit(inst, MSG_SUBSCRIBE, data->mqtt_version == 0x05 ? 3 : 2, variable_header, payload_offset, payload);  			subs++;  		} @@ -324,6 +380,7 @@ static int mqtt_instance(instance* inst){  	data->fd = -1;  	data->mqtt_version = MQTT_VERSION_DEFAULT; +	data->packet_identifier = 1;  	inst->impl = data;  	if(mqtt_generate_instanceid(inst)){ @@ -456,6 +513,62 @@ static int mqtt_maintenance(){  	return 0;  } +static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	char* topic = NULL, *payload = NULL; +	channel* changed = NULL; +	channel_value val; +	uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; +	uint32_t property_length = 0; +	size_t u = data->nchannels, property_offset, payload_offset, payload_length; +	size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); + +	property_offset = payload_offset = topic_length + 2 + ((qos > 0) ? 2 : 0); +	if(data->mqtt_version == 0x05){ +		//read properties length +		payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length); +		payload_offset += property_length; + +		//TODO parse properties +		//find topic alias +		//find type code +	} + +	//match via topic alias +	if(topic_length == 0){ +		//TODO match topic aliases +		//TODO build topic alias database +	} +	//match via topic +	else{ +		for(u = 0; u < data->nchannels; u++){ +			if(!strncmp(data->channel[u].topic, topic, topic_length)){ +				break; +			} +		} +	} + +	if(content_utf8){ +		payload_length = mqtt_pop_utf8(variable_header + payload_offset, length - payload_offset, &payload); +	} +	else{ +		payload_length = length - payload_offset; +		payload = (char*) (variable_header + payload_offset); +	} + +	if(u != data->nchannels && payload_length && payload){ +		DBGPF("Received PUBLISH for %s.%s, QoS %d, payload length %" PRIsize_t, inst->name, data->channel[u].topic, qos, payload_length); +		//FIXME implement json subchannels +		//FIXME implement input mappings +		changed = mm_channel(inst, u, 0); +		if(changed){ +			val.normalised = strtod(payload, NULL); +			mm_channel_event(changed, val); +		} +	} +	return 0; +} +  static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -482,6 +595,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h  			//FIXME error check SUBACK  			break;  		default: +			if((type & 0xF0) == MSG_PUBLISH){ +				return mqtt_handle_publish(inst, type, variable_header, length); +			}  			LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);  	}  	return 0; | 
