From 2b0aea70c275e08c1e312db91653eef880b4f725 Mon Sep 17 00:00:00 2001 From: cbdev Date: Sat, 19 Dec 2020 17:25:19 +0100 Subject: Reconnect MQTT instances during maintenance --- backends/mqtt.c | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/backends/mqtt.c b/backends/mqtt.c index 8c4a9fd..72046df 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -290,15 +290,69 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ return 0; } +static int mqtt_maintenance(){ + size_t n, u; + instance** inst = NULL; + mqtt_instance_data* data = NULL; + + if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ + LOG("Failed to fetch instance list"); + return 1; + } + + DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + if(data->fd <= 0){ + if(mqtt_reconnect(inst[u]) >= 2){ + LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); + free(inst); + return 1; + } + } + } + + free(inst); + return 0; +} + +static int mqtt_handle_fd(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + + bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); + if(bytes_read < 0){ + LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno)); + return 1; + } + else if(bytes_read == 0){ + //disconnected, try to reconnect + LOGPF("Instance %s disconnected, reconnection queued", inst->name); + mqtt_disconnect(inst); + return 1; + } + + DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + + return 0; +} + static int mqtt_handle(size_t num, managed_fd* fds){ size_t n = 0; + int rv = 0; - //for(n = 0; n < num; n++){ - //} + for(n = 0; n < num; n++){ + if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ + //propagate critical failures + return 1; + } + } //keepalive/reconnect processing if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ - //TODO run reconnects + if(mqtt_maintenance()){ + return 1; + } last_maintenance = mm_timestamp(); } -- cgit v1.2.3