aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/mqtt.c')
-rw-r--r--backends/mqtt.c56
1 files changed, 37 insertions, 19 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index d42d1f5..e9493a9 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -135,6 +135,7 @@ static size_t mqtt_push_utf8(uint8_t* buffer, size_t buffer_length, char* conten
static void mqtt_disconnect(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ data->last_control = 0;
//unmanage the fd
mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
@@ -153,13 +154,14 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t
offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);
if(mmbackend_send(data->fd, fixed_header, offset)
- || mmbackend_send(data->fd, vh, vh_length)
- || mmbackend_send(data->fd, payload, payload_length)){
+ || (vh && vh_length && mmbackend_send(data->fd, vh, vh_length))
+ || (payload && payload_length && mmbackend_send(data->fd, payload, payload_length))){
LOGPF("Failed to transmit control message for %s, assuming connection failure", inst->name);
mqtt_disconnect(inst);
return 1;
}
+ data->last_control = mm_timestamp();
return 0;
}
@@ -169,7 +171,7 @@ static int mqtt_configure(char* option, char* value){
}
static int mqtt_reconnect(instance* inst){
- uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION, 0x00 /*flags*/, (MQTT_KEEPALIVE >> 8) & 0xFF, MQTT_KEEPALIVE & 0xFF};
+ uint8_t variable_header[MQTT_BUFFER_LENGTH] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION_DEFAULT, 0x00 /*flags*/, ((MQTT_KEEPALIVE * 2) >> 8) & 0xFF, (MQTT_KEEPALIVE * 2) & 0xFF};
uint8_t payload[MQTT_BUFFER_LENGTH];
size_t vh_offset = 10, payload_offset = 0;
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
@@ -198,25 +200,28 @@ static int mqtt_reconnect(instance* inst){
return 1;
}
- //prepare CONNECT message flags
+ //prepare CONNECT message header
+ variable_header[6] = data->mqtt_version;
variable_header[7] = 0x02 /*clean start*/ | (data->user ? 0x80 : 0x00) | (data->user ? 0x40 : 0x00);
//TODO set session expiry interval option
//TODO re-use previos session on reconnect
- //push number of option bytes (as a varint, no less) before actually pushing the option data.
- //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart..
- variable_header[vh_offset++] = 8;
- //push maximum packet size option
- variable_header[vh_offset++] = 0x27;
- variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF;
- variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF;
- variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF;
- variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF;
- //push topic alias maximum option
- variable_header[vh_offset++] = 0x22;
- variable_header[vh_offset++] = 0xFF;
- variable_header[vh_offset++] = 0xFF;
+ if(data->mqtt_version == 0x05){ //mqtt v5 has additional options
+ //push number of option bytes (as a varint, no less) before actually pushing the option data.
+ //obviously someone thought saving 3 whole bytes in exchange for not being able to sequentially creating the package was smart..
+ variable_header[vh_offset++] = 8;
+ //push maximum packet size option
+ variable_header[vh_offset++] = 0x27;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 24) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 16) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH >> 8) & 0xFF;
+ variable_header[vh_offset++] = (MQTT_BUFFER_LENGTH) & 0xFF;
+ //push topic alias maximum option
+ variable_header[vh_offset++] = 0x22;
+ variable_header[vh_offset++] = 0xFF;
+ variable_header[vh_offset++] = 0xFF;
+ }
//prepare CONNECT payload
//push client id
@@ -279,6 +284,7 @@ static int mqtt_instance(instance* inst){
}
data->fd = -1;
+ data->mqtt_version = MQTT_VERSION_DEFAULT;
inst->impl = data;
if(mqtt_generate_instanceid(inst)){
@@ -317,6 +323,10 @@ static int mqtt_maintenance(){
return 1;
}
}
+ else if(data->last_control && mm_timestamp() - data->last_control >= MQTT_KEEPALIVE * 1000){
+ //send keepalive ping requests
+ mqtt_transmit(inst[u], MSG_PINGREQ, 0, NULL, 0, NULL);
+ }
}
free(inst);
@@ -324,10 +334,17 @@ static int mqtt_maintenance(){
}
static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+
switch(type){
case MSG_CONNACK:
if(length >= 2){
if(variable_header[1]){
+ if(variable_header[1] == 1 && data->mqtt_version == 0x05){
+ LOGPF("Connection on %s was rejected for protocol incompatibility, downgrading to protocol 3.1.1", inst->name);
+ data->mqtt_version = 0x04;
+ return 0;
+ }
LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]);
}
else{
@@ -335,6 +352,9 @@ static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_h
}
}
break;
+ case MSG_PINGRESP:
+ //ignore ping responses
+ break;
default:
LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);
}
@@ -379,13 +399,11 @@ static int mqtt_handle_fd(instance* inst){
}
}
- data->receive_offset += bytes_read;
return 0;
}
static int mqtt_handle(size_t num, managed_fd* fds){
size_t n = 0;
- int rv = 0;
for(n = 0; n < num; n++){
if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){