aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/maweb.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/maweb.c')
-rw-r--r--backends/maweb.c137
1 files changed, 105 insertions, 32 deletions
diff --git a/backends/maweb.c b/backends/maweb.c
index 6861d75..97d4cea 100644
--- a/backends/maweb.c
+++ b/backends/maweb.c
@@ -163,19 +163,29 @@ static int maweb_configure_instance(instance* inst, char* option, char* value){
LOGPF("Invalid host specified for instance %s", inst->name);
return 1;
}
- free(data->host);
- data->host = strdup(host);
- free(data->port);
- data->port = NULL;
- if(port){
- data->port = strdup(port);
+
+ data->host = realloc(data->host, (data->hosts + 1) * sizeof(char*));
+ data->port = realloc(data->port, (data->hosts + 1) * sizeof(char*));
+
+ if(!data->host || !data->port){
+ LOG("Failed to allocate memory");
+ return 1;
}
+
+ data->host[data->hosts] = strdup(host);
+ data->port[data->hosts] = port ? strdup(port) : NULL;
+ if(!data->host[data->hosts] || (port && !data->port[data->hosts])){
+ LOG("Failed to allocate memory");
+ free(data->host[data->hosts]);
+ free(data->port[data->hosts]);
+ return 1;
+ }
+
+ data->hosts++;
return 0;
}
else if(!strcmp(option, "user")){
- free(data->user);
- data->user = strdup(value);
- return 0;
+ return mmbackend_strdup(&data->user, value);
}
else if(!strcmp(option, "password")){
#ifndef MAWEB_NO_LIBSSL
@@ -222,6 +232,7 @@ static int maweb_instance(instance* inst){
}
data->fd = -1;
+ data->state = ws_closed;
data->buffer = calloc(MAWEB_RECV_CHUNK, sizeof(uint8_t));
if(!data->buffer){
LOG("Failed to allocate memory");
@@ -340,6 +351,9 @@ static int maweb_send_frame(instance* inst, maweb_operation op, uint8_t* payload
if(mmbackend_send(data->fd, frame_header, header_bytes)
|| mmbackend_send(data->fd, payload, len)){
+ LOGPF("Failed to send on instance %s, assuming connection failure", inst->name);
+ data->state = ws_closed;
+ data->login = 0;
return 1;
}
@@ -556,7 +570,7 @@ static int maweb_request_playbacks(instance* inst){
item_types,
view,
data->session);
- rv |= maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));
+ maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));
DBGPF("Poll request: %s", xmit_buffer);
updates_inflight++;
}
@@ -629,19 +643,30 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le
}
static int maweb_connect(instance* inst){
+ int rv = 1;
maweb_instance_data* data = (maweb_instance_data*) inst->impl;
- if(!data->host){
- return 1;
+ if(!data->host || !data->host[data->next_host]){
+ LOGPF("Invalid host configuration on instance %s, host %" PRIsize_t, inst->name, data->next_host + 1);
+ goto bail;
}
//unregister old fd from core
if(data->fd >= 0){
mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
+ close(data->fd);
+ data->fd = -1;
}
+ data->state = ws_closed;
+ data->login = 0;
+
+ LOGPF("Connecting to host %" PRIsize_t " of %" PRIsize_t " on %s", data->next_host + 1, data->hosts, inst->name);
+
+ data->fd = mmbackend_socket(data->host[data->next_host],
+ data->port[data->next_host] ? data->port[data->next_host] : MAWEB_DEFAULT_PORT,
+ SOCK_STREAM, 0, 0, 1);
- data->fd = mmbackend_socket(data->host, data->port ? data->port : MAWEB_DEFAULT_PORT, SOCK_STREAM, 0, 0);
if(data->fd < 0){
- return 1;
+ goto bail;
}
data->state = ws_new;
@@ -654,15 +679,20 @@ static int maweb_connect(instance* inst){
|| mmbackend_send_str(data->fd, "Sec-WebSocket-Key: rbEQrXMEvCm4ZUjkj6juBQ==\r\n")
|| mmbackend_send_str(data->fd, "\r\n")){
LOG("Failed to communicate with peer");
- return 1;
+ goto bail;
}
//register new fd
if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){
LOG("Failed to register FD");
- return 1;
+ goto bail;
}
- return 0;
+
+ rv = 0;
+bail:
+ data->next_host++;
+ data->next_host %= data->hosts;
+ return rv;
}
static ssize_t maweb_handle_lines(instance* inst, ssize_t bytes_read){
@@ -693,6 +723,19 @@ static ssize_t maweb_handle_lines(instance* inst, ssize_t bytes_read){
return data->offset + begin;
}
+static int maweb_establish(instance* inst){
+ maweb_instance_data* data = (maweb_instance_data*) inst->impl;
+ size_t start = data->next_host;
+
+ do{
+ if(!maweb_connect(inst)){
+ break;
+ }
+ } while(data->next_host != start);
+
+ return data->state != ws_closed ? 0 : 1;
+}
+
static ssize_t maweb_handle_ws(instance* inst, ssize_t bytes_read){
maweb_instance_data* data = (maweb_instance_data*) inst->impl;
size_t header_length = 2;
@@ -766,7 +809,7 @@ static int maweb_handle_fd(instance* inst){
data->buffer = realloc(data->buffer, (data->allocated + MAWEB_RECV_CHUNK) * sizeof(uint8_t));
if(!data->buffer){
LOG("Failed to allocate memory");
- return 1;
+ return -1;
}
data->allocated += MAWEB_RECV_CHUNK;
bytes_left += MAWEB_RECV_CHUNK;
@@ -774,14 +817,12 @@ static int maweb_handle_fd(instance* inst){
bytes_read = recv(data->fd, data->buffer + data->offset, bytes_left - 1, 0);
if(bytes_read < 0){
- LOGPF("Failed to receive: %s", strerror(errno));
- //TODO close, reopen
+ LOGPF("Failed to receive on %s: %s", inst->name, mmbackend_socket_strerror(errno));
return 1;
}
else if(bytes_read == 0){
- //client closed connection
- //TODO try to reopen
- return 0;
+ //client closed connection, try to reopen the connection
+ return 1;
}
do{
@@ -801,7 +842,6 @@ static int maweb_handle_fd(instance* inst){
if(bytes_handled < 0){
bytes_handled = data->offset + bytes_read;
data->offset = 0;
- //TODO close, reopen
LOG("Failed to handle incoming data");
return 1;
}
@@ -947,6 +987,12 @@ static int maweb_keepalive(){
snprintf(xmit_buffer, sizeof(xmit_buffer), "{\"session\":%" PRIu64 "}", data->session);
maweb_send_frame(inst[u], ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));
}
+ else if(data->state == ws_closed){
+ //try to reconnect to any remote
+ if(maweb_establish(inst[u])){
+ LOGPF("Failed to reconnect to any host on %s, will retry in %d seconds", inst[u]->name, MAWEB_CONNECTION_KEEPALIVE / 1000);
+ }
+ }
}
free(inst);
@@ -981,7 +1027,18 @@ static int maweb_handle(size_t num, managed_fd* fds){
int rv = 0;
for(n = 0; n < num; n++){
- rv |= maweb_handle_fd((instance*) fds[n].impl);
+ rv = maweb_handle_fd((instance*) fds[n].impl);
+ //try to reconnect soft failures
+ if(rv == 1 && maweb_establish((instance*) fds[n].impl)){
+ //keepalive will retry periodically
+ LOGPF("Failed to reconnect with any configured host on instance %s", ((instance*) fds[n].impl)->name);
+ }
+ else if(rv){
+ //propagate critical failures
+ return rv;
+ }
+ //errors handled
+ rv = 0;
}
//FIXME all keepalive processing allocates temporary buffers, this might an optimization target
@@ -1003,8 +1060,13 @@ static int maweb_start(size_t n, instance** inst){
maweb_instance_data* data = NULL;
for(u = 0; u < n; u++){
- //sort channels
data = (maweb_instance_data*) inst[u]->impl;
+ if(!data->hosts){
+ LOGPF("No hosts configured on instance %s", inst[u]->name);
+ return 1;
+ }
+
+ //sort channels
qsort(data->channel, data->channels, sizeof(maweb_channel_data), channel_comparator);
//re-set channel identifiers
@@ -1012,10 +1074,10 @@ static int maweb_start(size_t n, instance** inst){
data->channel[p].chan->ident = p;
}
- if(maweb_connect(inst[u])){
- LOGPF("Failed to open connection for instance %s", inst[u]->name);
- free(inst);
- return 1;
+ //try to connect to any available host
+ if(maweb_establish(inst[u])){
+ //do not return failure here, keepalive will periodically try to reconnect
+ LOGPF("Failed to connect to any host configured on instance %s", inst[u]->name);
}
}
@@ -1027,15 +1089,26 @@ static int maweb_start(size_t n, instance** inst){
}
static int maweb_shutdown(size_t n, instance** inst){
- size_t u;
+ size_t u, p;
maweb_instance_data* data = NULL;
for(u = 0; u < n; u++){
data = (maweb_instance_data*) inst[u]->impl;
+
+ for(p = 0; p < data->hosts; p++){
+ //one of these might have failed to allocate
+ if(data->host){
+ free(data->host[p]);
+ }
+ if(data->port){
+ free(data->port[p]);
+ }
+ }
free(data->host);
data->host = NULL;
free(data->port);
data->port = NULL;
+
free(data->user);
data->user = NULL;
free(data->pass);
@@ -1048,7 +1121,7 @@ static int maweb_shutdown(size_t n, instance** inst){
data->buffer = NULL;
data->offset = data->allocated = 0;
- data->state = ws_new;
+ data->state = ws_closed;
free(data->channel);
data->channel = NULL;