diff options
Diffstat (limited to 'backends/mqtt.c')
| -rw-r--r-- | backends/mqtt.c | 197 | 
1 files changed, 153 insertions, 44 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 8bff531..29f0436 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -46,7 +46,6 @@ static struct {  /*   * TODO   *	* proper RETAIN handling - *	* use topic aliases if possible   *	* mqtt v3.1.1 local filtering   *	* modifiable output mappings   *	* TLS @@ -138,10 +137,52 @@ static size_t mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){  		offset++;  	} while(buffer[offset - 1] & 0x80); -	*result = value; +	if(result){ +		*result = value; +	}  	return offset;  } +static size_t mqtt_pop_property(uint8_t* buffer, size_t bytes){ +	size_t length = 0, u; + +	if(bytes){ +		for(u = 0; u < sizeof(property_lengths)/sizeof(property_lengths[0]); u++){ +			if(property_lengths[u].property == buffer[0]){ +				switch(property_lengths[u].storage){ +					case STORAGE_U8: +						return 2; +					case STORAGE_U16: +						return 3; +					case STORAGE_U32: +						return 5; +					case STORAGE_VARINT: +						return mqtt_pop_varint(buffer + 1, bytes - 1, NULL) + 1; +					case STORAGE_PREFIXED: +						if(bytes >= 3){ +							return ((buffer[1] << 8) | buffer[2]) + 1; +						} +						//best-effort guess +						return 3; +					case STORAGE_PREFIXPAIR: +						if(bytes >= 3){ +							length = ((buffer[1] << 8) | buffer[2]); +							if(bytes >= length + 5){ +								return (1 + 2 + length + 2 + ((buffer[length + 3] << 8) | buffer[length + 4])); +							} +							return length + 3; +						} +						//best-effort guess +						return 5; +				} +			} +		} +	} + +	LOGPF("Storage class for property %02X was unknown", buffer[0]); +	return 1; +} +  static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){  	//implementation conforming to spec 1.5.5  	size_t offset = 0; @@ -191,8 +232,18 @@ static size_t mqtt_pop_utf8(uint8_t* buffer, size_t buffer_length, char** data){  static void mqtt_disconnect(instance* inst){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	size_t u; +  	data->last_control = 0; +	//reset aliases as they can not be reused across sessions +	data->server_max_alias = 0; +	data->current_alias = 1; +	for(u = 0; u < data->nchannels; u++){ +		data->channel[u].topic_alias_sent = 0; +		data->channel[u].topic_alias_rcvd = 0; +	} +  	//unmanage the fd  	mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -241,11 +292,12 @@ static int mqtt_reconnect(instance* inst){  		mqtt_disconnect(inst);  	} -	LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s)", +	LOGPF("Connecting instance %s to host %s port %s (TLS: %s, Authentication: %s, Protocol: %s)",  			inst->name, data->host,  			data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT),  			data->tls ? "yes " : "no", -			(data->user || data->password) ? "yes" : "no"); +			(data->user || data->password) ? "yes" : "no", +			(data->mqtt_version == 0x05) ? "v5" : "v3.1.1");  	data->fd = mmbackend_socket(data->host,  			data->port ? data->port : (data->tls ? MQTT_TLS_PORT : MQTT_PORT), @@ -260,9 +312,6 @@ static int mqtt_reconnect(instance* inst){  	variable_header[6] = data->mqtt_version;  	variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00); -	//TODO set session expiry interval option -	//TODO re-use previos session on reconnect -  	if(data->mqtt_version == 0x05){ //mqtt v5 has additional options  		//push number of option bytes (as a varint, no less) before actually pushing the option data.  		//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. @@ -381,6 +430,7 @@ static int mqtt_instance(instance* inst){  	data->fd = -1;  	data->mqtt_version = MQTT_VERSION_DEFAULT;  	data->packet_identifier = 1; +	data->current_alias = 1;  	inst->impl = data;  	if(mqtt_generate_instanceid(inst)){ @@ -443,22 +493,40 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  	for(u = 0; u < num; u++){  		vh_length = payload_length = 0; -		if(data->mqtt_version == 0x05 && data->channel[c[u]->ident].topic_alias_sent){ -			//push zero-length topic -			variable_header[vh_length++] = 0; -			variable_header[vh_length++] = 0; +		if(data->mqtt_version == 0x05){ +			if(data->channel[c[u]->ident].topic_alias_sent){ +				//push zero-length topic +				variable_header[vh_length++] = 0; +				variable_header[vh_length++] = 0; +			} +			else{ +				//push topic +				vh_length += mqtt_push_utf8(variable_header + vh_length, sizeof(variable_header) - vh_length, data->channel[c[u]->ident].topic); +				//generate topic alias if possible +				if(data->current_alias <= data->server_max_alias){ +					data->channel[c[u]->ident].topic_alias_sent = data->current_alias++; +					DBGPF("Assigned outbound topic alias %" PRIu16 " to topic %s.%s", data->channel[c[u]->ident].topic_alias_sent, inst->name, data->channel[c[u]->ident].topic); +				} +			}  			//push property length -			variable_header[vh_length++] = 5; +			variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent) ? 5 : 2;  			//push payload type (0x01)  			variable_header[vh_length++] = 0x01;  			variable_header[vh_length++] = 1; -			//push topic alias (0x23) -			variable_header[vh_length++] = 0x23; -			variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; -			variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; +			if(data->channel[c[u]->ident].topic_alias_sent){ +				//push topic alias (0x23) +				variable_header[vh_length++] = 0x23; +				variable_header[vh_length++] = (data->channel[c[u]->ident].topic_alias_sent >> 8) & 0xFF; +				variable_header[vh_length++] = data->channel[c[u]->ident].topic_alias_sent & 0xFF; +			} + +			payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); +			payload[0] = (payload_length >> 8) & 0xFF; +			payload[1] = payload_length & 0xFF; +			payload_length += 2;  		}  		else{  			//push topic @@ -471,12 +539,9 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  				variable_header[vh_length++] = 0x01;  				variable_header[vh_length++] = 1;  			} +			payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised);  		} -		payload_length = snprintf((char*) payload, sizeof(payload), "%f", v[u].normalised); -		//payload_length = snprintf((char*) (payload + 2), sizeof(payload) - 2, "%f", v[u].normalised); -		//payload[0] = (payload_length >> 8) & 0xFF; -		//payload[1] = payload_length & 0xFF; -		//payload_length += 2; +  		mqtt_transmit(inst, MSG_PUBLISH, vh_length, variable_header, payload_length, payload);  	} @@ -519,6 +584,7 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h  	channel* changed = NULL;  	channel_value val;  	uint8_t qos = (type & 0x06) >> 1, content_utf8 = 0; +	uint16_t topic_alias = 0;  	uint32_t property_length = 0;  	size_t u = data->nchannels, property_offset, payload_offset, payload_length;  	size_t topic_length = mqtt_pop_utf8(variable_header, length, &topic); @@ -529,23 +595,42 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h  		payload_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, &property_length);  		payload_offset += property_length; -		//TODO parse properties -		//find topic alias -		//find type code +		property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); +		//parse properties +		while(property_offset < payload_offset){ +			DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIu32, variable_header[property_offset], property_offset, property_length); +			//read payload format indicator +			if(variable_header[property_offset] == 0x01){ +				content_utf8 = variable_header[property_offset + 1]; +			} +			//read topic alias +			else if(variable_header[property_offset] == 0x23){ +				topic_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; +			} + +			property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset); +		}  	}  	//match via topic alias -	if(topic_length == 0){ -		//TODO match topic aliases -		//TODO build topic alias database +	if(!topic_length && topic_alias){ +		for(u = 0; u < data->nchannels; u++){ +			if(data->channel[u].topic_alias_rcvd == topic_alias){ +				break; +			} +		}  	}  	//match via topic -	else{ +	else if(topic_length){  		for(u = 0; u < data->nchannels; u++){  			if(!strncmp(data->channel[u].topic, topic, topic_length)){  				break;  			}  		} + +		if(topic_alias){ +			data->channel[u].topic_alias_rcvd = topic_alias; +		}  	}  	if(content_utf8){ @@ -569,26 +654,50 @@ static int mqtt_handle_publish(instance* inst, uint8_t type, uint8_t* variable_h  	return 0;  } -static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +static int mqtt_handle_connack(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	size_t property_offset = 2; + +	if(length >= 2){ +		if(variable_header[1]){ +			if(variable_header[1] == 1 && data->mqtt_version == 0x05){ +				LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); +				data->mqtt_version = 0x04; +				return 0; +			} +			LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); +			mqtt_disconnect(inst); +			return 0; +		} -	switch(type){ -		case MSG_CONNACK: -			if(length >= 2){ -				if(variable_header[1]){ -					if(variable_header[1] == 1 && data->mqtt_version == 0x05){ -						LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); -						data->mqtt_version = 0x04; -						return 0; -					} -					LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); -				} -				else{ -					LOGPF("Connection on %s established", inst->name); -					return mqtt_push_subscriptions(inst); +		//parse response properties if present +		if(data->mqtt_version == 0x05){ +			property_offset += mqtt_pop_varint(variable_header + property_offset, length - property_offset, NULL); +			while(property_offset < length){ +				DBGPF("Property %02X at offset %" PRIsize_t " of %" PRIsize_t, variable_header[property_offset], property_offset, length); +		 +				//read maximum topic alias +				if(variable_header[property_offset] == 0x22){ +					data->server_max_alias = (variable_header[property_offset + 1] << 8) | variable_header[property_offset + 2]; +					DBGPF("Connection supports maximum connection alias %" PRIu16, data->server_max_alias);  				} + +				property_offset += mqtt_pop_property(variable_header + property_offset, length - property_offset);  			} -			break; +		} + +		LOGPF("Connection on %s established", inst->name); +		return mqtt_push_subscriptions(inst); +	} + +	LOGPF("Received malformed CONNACK on %s", inst->name); +	return 1; +} + +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	switch(type){ +		case MSG_CONNACK: +			return mqtt_handle_connack(inst, type, variable_header, length);  		case MSG_PINGRESP:  		case MSG_SUBACK:  			//ignore most responses  | 
