aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--backends/mqtt.c54
1 files changed, 50 insertions, 4 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 72046df..d42d1f5 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -86,16 +86,22 @@ static int mqtt_generate_instanceid(instance* inst){
return mmbackend_strdup(&(data->client_id), clientid);
}
-static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){
+static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){
size_t value = 0, offset = 0;
do {
+ if(offset >= len){
+ return 0;
+ }
+
value |= (buffer[offset] & 0x7F) << (7 * offset);
offset++;
} while(buffer[offset - 1] & 0x80);
- return 0;
+
+ *result = value;
+ return offset;
}
-static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){
+static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){
//implementation conforming to spec 1.5.5
size_t offset = 0;
do {
@@ -142,8 +148,9 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t
uint8_t fixed_header[5];
size_t offset = 0;
+ //how in the world is it a _fixed_ header if it contains a variable length integer? eh...
fixed_header[offset++] = type;
- offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);
+ offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);
if(mmbackend_send(data->fd, fixed_header, offset)
|| mmbackend_send(data->fd, vh, vh_length)
@@ -316,9 +323,28 @@ static int mqtt_maintenance(){
return 0;
}
+static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){
+ switch(type){
+ case MSG_CONNACK:
+ if(length >= 2){
+ if(variable_header[1]){
+ LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]);
+ }
+ else{
+ LOGPF("Connection on %s established", inst->name);
+ }
+ }
+ break;
+ default:
+ LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name);
+ }
+ return 0;
+}
+
static int mqtt_handle_fd(instance* inst){
mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset;
+ uint32_t message_length = 0, header_length = 0;
bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0);
if(bytes_read < 0){
@@ -333,7 +359,27 @@ static int mqtt_handle_fd(instance* inst){
}
DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read);
+ data->receive_offset += bytes_read;
+
+ //TODO loop this while at least one unhandled message is in the buffer
+ //check for complete message
+ if(data->receive_offset >= 2){
+ header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length);
+ if(header_length && data->receive_offset >= message_length + header_length + 1){
+ DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]);
+ if(mqtt_handle_message(inst, data->receive_buffer[0], data->receive_buffer + header_length + 1, message_length)){
+ //TODO handle failures properly
+ }
+
+ //remove handled message
+ if(data->receive_offset > message_length + header_length + 1){
+ memmove(data->receive_buffer, data->receive_buffer + message_length + header_length + 1, data->receive_offset - (message_length + header_length + 1));
+ }
+ data->receive_offset -= message_length + header_length + 1;
+ }
+ }
+ data->receive_offset += bytes_read;
return 0;
}