diff options
author | cbdev <cb@cbcdn.com> | 2020-12-19 17:25:19 +0100 |
---|---|---|
committer | cbdev <cb@cbcdn.com> | 2020-12-19 17:25:19 +0100 |
commit | 2b0aea70c275e08c1e312db91653eef880b4f725 (patch) | |
tree | 432e8390a842e44de7e368907b55a412b7928f40 /backends | |
parent | e248f0c5778f73988477ba73174c91b7085c232c (diff) | |
download | midimonster-2b0aea70c275e08c1e312db91653eef880b4f725.tar.gz midimonster-2b0aea70c275e08c1e312db91653eef880b4f725.tar.bz2 midimonster-2b0aea70c275e08c1e312db91653eef880b4f725.zip |
Reconnect MQTT instances during maintenance
Diffstat (limited to 'backends')
-rw-r--r-- | backends/mqtt.c | 60 |
1 files changed, 57 insertions, 3 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c index 8c4a9fd..72046df 100644 --- a/backends/mqtt.c +++ b/backends/mqtt.c @@ -290,15 +290,69 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){ return 0; } +static int mqtt_maintenance(){ + size_t n, u; + instance** inst = NULL; + mqtt_instance_data* data = NULL; + + if(mm_backend_instances(BACKEND_NAME, &n, &inst)){ + LOG("Failed to fetch instance list"); + return 1; + } + + DBGPF("Running maintenance operations on %" PRIsize_t " instances", n); + for(u = 0; u < n; u++){ + data = (mqtt_instance_data*) inst[u]->impl; + if(data->fd <= 0){ + if(mqtt_reconnect(inst[u]) >= 2){ + LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name); + free(inst); + return 1; + } + } + } + + free(inst); + return 0; +} + +static int mqtt_handle_fd(instance* inst){ + mqtt_instance_data* data = (mqtt_instance_data*) inst->impl; + ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset; + + bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0); + if(bytes_read < 0){ + LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno)); + return 1; + } + else if(bytes_read == 0){ + //disconnected, try to reconnect + LOGPF("Instance %s disconnected, reconnection queued", inst->name); + mqtt_disconnect(inst); + return 1; + } + + DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read); + + return 0; +} + static int mqtt_handle(size_t num, managed_fd* fds){ size_t n = 0; + int rv = 0; - //for(n = 0; n < num; n++){ - //} + for(n = 0; n < num; n++){ + if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){ + //propagate critical failures + return 1; + } + } //keepalive/reconnect processing if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){ - //TODO run reconnects + if(mqtt_maintenance()){ + return 1; + } last_maintenance = mm_timestamp(); } |