diff options
Diffstat (limited to 'backends')
| -rw-r--r-- | backends/mqtt.c | 60 | 
1 files changed, 57 insertions, 3 deletions
| diff --git a/backends/mqtt.c b/backends/mqtt.c index 8c4a9fd..72046df 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -290,15 +290,69 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){  	return 0;  } +static int mqtt_maintenance(){ +	size_t n, u; +	instance** inst = NULL; +	mqtt_instance_data* data = NULL; + +	if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ +		LOG("Failed to fetch instance list"); +		return 1; +	} + +	DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); +	for(u = 0; u < n; u++){ +       		data = (mqtt_instance_data*) inst[u]->impl; +		if(data->fd <= 0){ +			if(mqtt_reconnect(inst[u]) >= 2){ +				LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); +				free(inst); +				return 1; +			} +		} +	} + +	free(inst); +	return 0; +} + +static int mqtt_handle_fd(instance* inst){ +	mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; +	ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + +	bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); +	if(bytes_read < 0){ +		LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno)); +		return 1; +	} +	else if(bytes_read == 0){ +		//disconnected, try to reconnect +		LOGPF("Instance %s disconnected, reconnection queued", inst->name); +		mqtt_disconnect(inst); +		return 1; +	} + +	DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + +	return 0; +} +  static int mqtt_handle(size_t num, managed_fd* fds){  	size_t n = 0; +	int rv = 0; -	//for(n = 0; n < num; n++){ -	//} +	for(n = 0; n < num; n++){ +		if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ +			//propagate critical failures +			return 1; +		} +	}  	//keepalive/reconnect processing  	if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ -		//TODO run reconnects +		if(mqtt_maintenance()){ +			return 1; +		}  		last_maintenance = mm_timestamp();  	} | 
