aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorcbdev <cb@cbcdn.com>2020-12-19 17:25:19 +0100
committercbdev <cb@cbcdn.com>2020-12-19 17:25:19 +0100
commit2b0aea70c275e08c1e312db91653eef880b4f725 (patch)
tree432e8390a842e44de7e368907b55a412b7928f40
parente248f0c5778f73988477ba73174c91b7085c232c (diff)
downloadmidimonster-2b0aea70c275e08c1e312db91653eef880b4f725.tar.gz
midimonster-2b0aea70c275e08c1e312db91653eef880b4f725.tar.bz2
midimonster-2b0aea70c275e08c1e312db91653eef880b4f725.zip
Reconnect MQTT instances during maintenance
-rw-r--r--backends/mqtt.c60
1 files changed, 57 insertions, 3 deletions
diff --git a/backends/mqtt.c b/backends/mqtt.c
index 8c4a9fd..72046df 100644
--- a/backends/mqtt.c
+++ b/backends/mqtt.c
@@ -290,15 +290,69 @@ static int mqtt_set(instance* inst, size_t num, channel** c, channel_value* v){
return 0;
}
+static int mqtt_maintenance(){
+ size_t n, u;
+ instance** inst = NULL;
+ mqtt_instance_data* data = NULL;
+
+ if(mm_backend_instances(BACKEND_NAME, &n, &inst)){
+ LOG("Failed to fetch instance list");
+ return 1;
+ }
+
+ DBGPF("Running maintenance operations on %" PRIsize_t " instances", n);
+ for(u = 0; u < n; u++){
+ data = (mqtt_instance_data*) inst[u]->impl;
+ if(data->fd <= 0){
+ if(mqtt_reconnect(inst[u]) >= 2){
+ LOGPF("Failed to reconnect instance %s, terminating", inst[u]->name);
+ free(inst);
+ return 1;
+ }
+ }
+ }
+
+ free(inst);
+ return 0;
+}
+
+static int mqtt_handle_fd(instance* inst){
+ mqtt_instance_data* data = (mqtt_instance_data*) inst->impl;
+ ssize_t bytes_read = 0, bytes_left = sizeof(data->receive_buffer) - data->receive_offset;
+
+ bytes_read = recv(data->fd, data->receive_buffer + data->receive_offset, bytes_left, 0);
+ if(bytes_read < 0){
+ LOGPF("Failed to receive data on instance %s: %s", inst->name, mmbackend_socket_strerror(errno));
+ return 1;
+ }
+ else if(bytes_read == 0){
+ //disconnected, try to reconnect
+ LOGPF("Instance %s disconnected, reconnection queued", inst->name);
+ mqtt_disconnect(inst);
+ return 1;
+ }
+
+ DBGPF("Instance %s, offset %" PRIsize_t ", read %" PRIsize_t " bytes", inst->name, data->receive_offset, bytes_read);
+
+ return 0;
+}
+
static int mqtt_handle(size_t num, managed_fd* fds){
size_t n = 0;
+ int rv = 0;
- //for(n = 0; n < num; n++){
- //}
+ for(n = 0; n < num; n++){
+ if(mqtt_handle_fd((instance*) fds[n].impl) >= 2){
+ //propagate critical failures
+ return 1;
+ }
+ }
//keepalive/reconnect processing
if(last_maintenance && mm_timestamp() - last_maintenance >= MQTT_KEEPALIVE * 1000){
- //TODO run reconnects
+ if(mqtt_maintenance()){
+ return 1;
+ }
last_maintenance = mm_timestamp();
}