diff options
| author | cbdev <cb@cbcdn.com> | 2020-12-20 13:30:11 +0100 | 
|---|---|---|
| committer | cbdev <cb@cbcdn.com> | 2020-12-20 13:30:11 +0100 | 
| commit | 0175c84ad866e8f33a90d571e6207c6cc120075c (patch) | |
| tree | 193218642155b608fe29219c5bfb5e96f85f569a | |
| parent | 2b0aea70c275e08c1e312db91653eef880b4f725 (diff) | |
| download | midimonster-0175c84ad866e8f33a90d571e6207c6cc120075c.tar.gz midimonster-0175c84ad866e8f33a90d571e6207c6cc120075c.tar.bz2 midimonster-0175c84ad866e8f33a90d571e6207c6cc120075c.zip  | |
Basic MQTT message parsing
| -rw-r--r-- | backends/mqtt.c | 54 | 
1 files changed, 50 insertions, 4 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 72046df..d42d1f5 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -86,16 +86,22 @@ static int mqtt_generate_instanceid(instance* inst){  	return mmbackend_strdup(&(data->client_id), clientid);  } -static size_t mqtt_varint_decode(uint8_t* buffer, uint32_t* result){ +static int mqtt_pop_varint(uint8_t* buffer, size_t len, uint32_t* result){  	size_t value = 0, offset = 0;  	do { +		if(offset >= len){ +			return 0; +		} +  		value |= (buffer[offset] & 0x7F) << (7 * offset);  		offset++;  	} while(buffer[offset - 1] & 0x80); -	return 0; + +	*result = value; +	return offset;  } -static size_t mqtt_varint_encode(size_t value, size_t maxlen, uint8_t* buffer){ +static size_t mqtt_push_varint(size_t value, size_t maxlen, uint8_t* buffer){  	//implementation conforming to spec 1.5.5  	size_t offset = 0;  	do { @@ -142,8 +148,9 @@ static int mqtt_transmit(instance* inst, uint8_t type, size_t vh_length, uint8_t  	uint8_t fixed_header[5];  	size_t offset = 0; +	//how in the world is it a _fixed_ header if it contains a variable length integer? eh...  	fixed_header[offset++] = type; -	offset += mqtt_varint_encode(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset); +	offset += mqtt_push_varint(vh_length + payload_length, sizeof(fixed_header) - offset, fixed_header + offset);  	if(mmbackend_send(data->fd, fixed_header, offset)  			|| mmbackend_send(data->fd, vh, vh_length) @@ -316,9 +323,28 @@ static int mqtt_maintenance(){  	return 0;  } +static int mqtt_handle_message(instance* inst, uint8_t type, uint8_t* variable_header, size_t length){ +	switch(type){ +		case MSG_CONNACK: +			if(length >= 2){ +				if(variable_header[1]){ +					LOGPF("Connection on %s was rejected, reason code %d", inst->name, variable_header[1]); +				} +				else{ +					LOGPF("Connection on %s established", inst->name); +				} +			} +			break; +		default: +			LOGPF("Unhandled MQTT message type 0x%02X on %s", type, inst->name); +	} +	return 0; +} +  static int mqtt_handle_fd(instance* inst){  	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;  	ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; +	uint32_t message_length = 0, header_length = 0;  	bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0);  	if(bytes_read < 0){ @@ -333,7 +359,27 @@ static int mqtt_handle_fd(instance* inst){  	}  	DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); +	data->receive_offset += bytes_read; + +	//TODO loop this while at least one unhandled message is in the buffer +	//check for complete message +	if(data->receive_offset >= 2){ +		header_length = mqtt_pop_varint(data->receive_buffer + 1, data->receive_offset - 1, &message_length); +		if(header_length && data->receive_offset >= message_length + header_length + 1){ +			DBGPF("Received complete message of %" PRIu32 " bytes, total received %" PRIsize_t ", payload %" PRIu32 ", message type %02X", message_length + header_length + 1, data->receive_offset, message_length, data->receive_buffer[0]); +			if(mqtt_handle_message(inst, data->receive_buffer[0], data->receive_buffer + header_length + 1, message_length)){ +				//TODO handle failures properly +			} + +			//remove handled message +			if(data->receive_offset > message_length + header_length + 1){ +				memmove(data->receive_buffer, data->receive_buffer + message_length + header_length + 1, data->receive_offset - (message_length + header_length + 1)); +			} +			data->receive_offset -= message_length + header_length + 1; +		} +	} +	data->receive_offset += bytes_read;  	return 0;  }  | 
