diff options
| -rw-r--r-- | backends/mqtt.c | 56 | ||||
| -rw-r--r-- | backends/mqtt.h | 5 | 
2 files changed, 41 insertions, 20 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index d42d1f5..e9493a9 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -135,6 +135,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten  static void mqtt_disconnect(instance* inst){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	data->last_control = 0;  	//unmanage the fd  	mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); @@ -153,13 +154,14 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t  	offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);  	if(mmbackend_send(data->fd, fixed_header, offset) -			|| mmbackend_send(data->fd, vh, vh_length) -			|| mmbackend_send(data->fd, payload, payload_length)){ +			|| (vh && vh_length && mmbackend_send(data->fd, vh, vh_length)) +			|| (payload && payload_length && mmbackend_send(data->fd, payload, payload_length))){  		LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name);  		mqtt_disconnect(inst);  		return 1;  	} +	data->last_control = mm_timestamp();  	return 0;  } @@ -169,7 +171,7 @@ static int mqtt_configure(char* option, char* value){  }  static int mqtt_reconnect(instance* inst){ -	uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF}; +	uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION_DEFAULT, 0x00 /*flags*/, ((MQTT_KEEPALIVE * 2) >> 8) & 0xFF, (MQTT_KEEPALIVE * 2) & 0xFF};  	uint8_t payload[MQTT_BUFFER_LENGTH];  	size_t vh_offset = 10, payload_offset = 0;  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; @@ -198,25 +200,28 @@ static int mqtt_reconnect(instance* inst){  		return 1;  	} -	//prepare CONNECT message flags +	//prepare CONNECT message header +	variable_header[6] = data->mqtt_version;  	variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00);  	//TODO set session expiry interval option  	//TODO re-use previos session on reconnect -	//push number of option bytes (as a varint, no less) before actually pushing the option data. -	//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. -	variable_header[vh_offset++] = 8; -	//push maximum packet size option -	variable_header[vh_offset++] = 0x27; -	variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; -	variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; -	variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; -	variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; -	//push topic alias maximum option -	variable_header[vh_offset++] = 0x22; -	variable_header[vh_offset++] = 0xFF; -	variable_header[vh_offset++] = 0xFF; +	if(data->mqtt_version == 0x05){ //mqtt v5 has additional options +		//push number of option bytes (as a varint, no less) before actually pushing the option data. +		//obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart.. +		variable_header[vh_offset++] = 8; +		//push maximum packet size option +		variable_header[vh_offset++] = 0x27; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF; +		variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF; +		//push topic alias maximum option +		variable_header[vh_offset++] = 0x22; +		variable_header[vh_offset++] = 0xFF; +		variable_header[vh_offset++] = 0xFF; +	}  	//prepare CONNECT payload  	//push client id @@ -279,6 +284,7 @@ static int mqtt_instance(instance* inst){  	}  	data->fd = -1; +	data->mqtt_version = MQTT_VERSION_DEFAULT;  	inst->impl = data;  	if(mqtt_generate_instanceid(inst)){ @@ -317,6 +323,10 @@ static int mqtt_maintenance(){  				return 1;  			}  		} +		else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){ +			//send keepalive ping requests +			mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL); +		}  	}  	free(inst); @@ -324,10 +334,17 @@ static int mqtt_maintenance(){  }  static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +  	switch(type){  		case MSG_CONNACK:  			if(length >= 2){  				if(variable_header[1]){ +					if(variable_header[1] == 1 && data->mqtt_version == 0x05){ +						LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name); +						data->mqtt_version = 0x04; +						return 0; +					}  					LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]);  				}  				else{ @@ -335,6 +352,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h  				}  			}  			break; +		case MSG_PINGRESP: +			//ignore ping responses +			break;  		default:  			LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);  	} @@ -379,13 +399,11 @@ static int mqtt_handle_fd(instance* inst){  		}  	} -	data->receive_offset += bytes_read;  	return 0;  }  static int mqtt_handle(size_t num, managed_fd* fds){  	size_t n = 0; -	int rv = 0;  	for(n = 0; n < num; n++){  		if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ diff --git a/backends/mqtt.h b/backends/mqtt.h index df63319..a0f5356 100644 --- a/backends/mqtt.h +++ b/backends/mqtt.h @@ -14,7 +14,7 @@ static int mqtt_shutdown(size_t n, instance** inst);  #define MQTT_TLS_PORT "8883"  #define MQTT_BUFFER_LENGTH 8192  #define MQTT_KEEPALIVE 10  -#define MQTT_VERSION 0x05 +#define MQTT_VERSION_DEFAULT 0x05  enum {  	MSG_RESERVED = 0x00, @@ -39,6 +39,7 @@ typedef struct /*_mqtt_instance_data*/ {  	uint8_t tls;  	char* host;  	char* port; +	uint8_t mqtt_version;  	char* user;  	char* password; @@ -50,6 +51,8 @@ typedef struct /*_mqtt_instance_data*/ {  	int fd;  	uint8_t receive_buffer[MQTT_BUFFER_LENGTH];  	size_t receive_offset; + +	uint64_t last_control;  } mqtt_instance_data;  //per-channel  | 
