diff options
Diffstat (limited to 'backends')
-rw-r--r-- | backends/mqtt.c | 54 |
1 files changed, 50 insertions, 4 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 72046df..d42d1f5 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -86,16 +86,22 @@ static int mqtt_generate_instanceid(instance* inst){ return mmbackend_strdup(&(data->client_id), clientid); } -static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ +static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){ size_t value = 0, offset = 0; do { + if(offset >= len){ + return 0; + } + value |= (buffer[offset] & 0x7F) << (7 * offset); offset++; } while(buffer[offset - 1] & 0x80); - return 0; + + *result = value; + return offset; } -static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ +static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){ //implementation conforming to spec 1.5.5 size_t offset = 0; do { @@ -142,8 +148,9 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t uint8_t fixed_header[5]; size_t offset = 0; + //how in the world is it a _fixed_ header if it contains a variable length integer? eh... fixed_header[offset++] = type; - offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); + offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); if(mmbackend_send(data->fd, fixed_header, offset) || mmbackend_send(data->fd, vh, vh_length) @@ -316,9 +323,28 @@ static int mqtt_maintenance(){ return 0; } +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ + switch(type){ + case MSG_CONNACK: + if(length >= 2){ + if(variable_header[1]){ + LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); + } + else{ + LOGPF("Connection on %s established", inst->name); + } + } + break; + default: + LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); + } + return 0; +} + static int mqtt_handle_fd(instance* inst){ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + uint32_t message_length = 0, header_length = 0; bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); if(bytes_read < 0){ @@ -333,7 +359,27 @@ static int mqtt_handle_fd(instance* inst){ } DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + data->receive_offset += bytes_read; + + //TODO loop this while at least one unhandled message is in the buffer + //check for complete message + if(data->receive_offset >= 2){ + header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); + if(header_length && data->receive_offset >= message_length + header_length + 1){ + DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); + if(mqtt_handle_message(inst, data->receive_buffer[0], data->receive_buffer + header_length + 1, message_length)){ + //TODO handle failures properly + } + + //remove handled message + if(data->receive_offset > message_length + header_length + 1){ + memmove(data->receive_buffer, data->receive_buffer + message_length + header_length + 1, data->receive_offset - (message_length + header_length + 1)); + } + data->receive_offset -= message_length + header_length + 1; + } + } + data->receive_offset += bytes_read; return 0; } |