diff options
Diffstat (limited to 'backends/maweb.c')
| -rw-r--r-- | backends/maweb.c | 137 | 
1 files changed, 105 insertions, 32 deletions
diff --git a/backends/maweb.c b/backends/maweb.c index 6861d75..97d4cea 100644 --- a/backends/maweb.c +++ b/backends/maweb.c @@ -163,19 +163,29 @@ static int maweb_configure_instance(instance* inst, char* option, char* value){  			LOGPF("Invalid host specified for instance %s", inst->name);  			return 1;  		} -		free(data->host); -		data->host = strdup(host); -		free(data->port); -		data->port = NULL; -		if(port){ -			data->port = strdup(port); + +		data->host = realloc(data->host, (data->hosts + 1) * sizeof(char*)); +		data->port = realloc(data->port, (data->hosts + 1) * sizeof(char*)); + +		if(!data->host || !data->port){ +			LOG("Failed to allocate memory"); +			return 1;  		} + +		data->host[data->hosts] = strdup(host); +		data->port[data->hosts] = port ? strdup(port) : NULL; +		if(!data->host[data->hosts] || (port && !data->port[data->hosts])){ +			LOG("Failed to allocate memory"); +			free(data->host[data->hosts]); +			free(data->port[data->hosts]); +			return 1; +		} + +		data->hosts++;  		return 0;  	}  	else if(!strcmp(option, "user")){ -		free(data->user); -		data->user = strdup(value); -		return 0; +		return mmbackend_strdup(&data->user, value);  	}  	else if(!strcmp(option, "password")){  		#ifndef MAWEB_NO_LIBSSL @@ -222,6 +232,7 @@ static int maweb_instance(instance* inst){  	}  	data->fd = -1; +	data->state = ws_closed;  	data->buffer = calloc(MAWEB_RECV_CHUNK, sizeof(uint8_t));  	if(!data->buffer){  		LOG("Failed to allocate memory"); @@ -340,6 +351,9 @@ static int maweb_send_frame(instance* inst, maweb_operation op, uint8_t* payload  	if(mmbackend_send(data->fd, frame_header, header_bytes)  			|| mmbackend_send(data->fd, payload, len)){ +		LOGPF("Failed to send on instance %s, assuming connection failure", inst->name); +		data->state = ws_closed; +		data->login = 0;  		return 1;  	} @@ -556,7 +570,7 @@ static int maweb_request_playbacks(instance* inst){  				item_types,  				view,  				data->session); -		rv |= maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer)); +		maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));  		DBGPF("Poll request: %s", xmit_buffer);  		updates_inflight++;  	} @@ -629,19 +643,30 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le  }  static int maweb_connect(instance* inst){ +	int rv = 1;  	maweb_instance_data* data = (maweb_instance_data*) inst->impl; -	if(!data->host){ -		return 1; +	if(!data->host || !data->host[data->next_host]){ +		LOGPF("Invalid host configuration on instance %s, host %" PRIsize_t, inst->name, data->next_host + 1); +		goto bail;  	}  	//unregister old fd from core  	if(data->fd >= 0){  		mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); +		close(data->fd); +		data->fd = -1;  	} +	data->state = ws_closed; +	data->login = 0; + +	LOGPF("Connecting to host %" PRIsize_t " of %" PRIsize_t " on %s", data->next_host + 1, data->hosts, inst->name); + +	data->fd = mmbackend_socket(data->host[data->next_host], +			data->port[data->next_host] ? data->port[data->next_host] : MAWEB_DEFAULT_PORT, +			SOCK_STREAM, 0, 0, 1); -	data->fd = mmbackend_socket(data->host, data->port ? data->port : MAWEB_DEFAULT_PORT, SOCK_STREAM, 0, 0);  	if(data->fd < 0){ -		return 1; +		goto bail;  	}  	data->state = ws_new; @@ -654,15 +679,20 @@ static int maweb_connect(instance* inst){  			|| mmbackend_send_str(data->fd, "Sec-WebSocket-Key: rbEQrXMEvCm4ZUjkj6juBQ==\r\n")  			|| mmbackend_send_str(data->fd, "\r\n")){  		LOG("Failed to communicate with peer"); -		return 1; +		goto bail;  	}  	//register new fd  	if(mm_manage_fd(data->fd, BACKEND_NAME, 1, (void*) inst)){  		LOG("Failed to register FD"); -		return 1; +		goto bail;  	} -	return 0; + +	rv = 0; +bail: +	data->next_host++; +	data->next_host %= data->hosts; +	return rv;  }  static ssize_t maweb_handle_lines(instance* inst, ssize_t bytes_read){ @@ -693,6 +723,19 @@ static ssize_t maweb_handle_lines(instance* inst, ssize_t bytes_read){  	return data->offset + begin;  } +static int maweb_establish(instance* inst){ +	maweb_instance_data* data = (maweb_instance_data*) inst->impl; +	size_t start = data->next_host; + +	do{ +		if(!maweb_connect(inst)){ +			break; +		} +	} while(data->next_host != start); + +	return data->state != ws_closed ? 0 : 1; +} +  static ssize_t maweb_handle_ws(instance* inst, ssize_t bytes_read){  	maweb_instance_data* data = (maweb_instance_data*) inst->impl;  	size_t header_length = 2; @@ -766,7 +809,7 @@ static int maweb_handle_fd(instance* inst){  		data->buffer = realloc(data->buffer, (data->allocated + MAWEB_RECV_CHUNK) * sizeof(uint8_t));  		if(!data->buffer){  			LOG("Failed to allocate memory"); -			return 1; +			return -1;  		}  		data->allocated += MAWEB_RECV_CHUNK;  		bytes_left += MAWEB_RECV_CHUNK; @@ -774,14 +817,12 @@ static int maweb_handle_fd(instance* inst){  	bytes_read = recv(data->fd, data->buffer + data->offset, bytes_left - 1, 0);  	if(bytes_read < 0){ -		LOGPF("Failed to receive: %s", strerror(errno)); -		//TODO close, reopen +		LOGPF("Failed to receive on %s: %s", inst->name, mmbackend_socket_strerror(errno));  		return 1;  	}  	else if(bytes_read == 0){ -		//client closed connection -		//TODO try to reopen -		return 0; +		//client closed connection, try to reopen the connection +		return 1;  	}  	do{ @@ -801,7 +842,6 @@ static int maweb_handle_fd(instance* inst){  		if(bytes_handled < 0){  			bytes_handled = data->offset + bytes_read;  			data->offset = 0; -			//TODO close, reopen  			LOG("Failed to handle incoming data");  			return 1;  		} @@ -947,6 +987,12 @@ static int maweb_keepalive(){  			snprintf(xmit_buffer, sizeof(xmit_buffer), "{\"session\":%" PRIu64 "}", data->session);  			maweb_send_frame(inst[u], ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));  		} +		else if(data->state == ws_closed){ +			//try to reconnect to any remote +			if(maweb_establish(inst[u])){ +				LOGPF("Failed to reconnect to any host on %s, will retry in %d seconds", inst[u]->name, MAWEB_CONNECTION_KEEPALIVE / 1000); +			} +		}  	}  	free(inst); @@ -981,7 +1027,18 @@ static int maweb_handle(size_t num, managed_fd* fds){  	int rv = 0;  	for(n = 0; n < num; n++){ -		rv |= maweb_handle_fd((instance*) fds[n].impl); +		rv = maweb_handle_fd((instance*) fds[n].impl); +		//try to reconnect soft failures +		if(rv == 1 && maweb_establish((instance*) fds[n].impl)){ +			//keepalive will retry periodically +			LOGPF("Failed to reconnect with any configured host on instance %s", ((instance*) fds[n].impl)->name); +		} +		else if(rv){ +			//propagate critical failures +			return rv; +		} +		//errors handled +		rv = 0;  	}  	//FIXME all keepalive processing allocates temporary buffers, this might an optimization target @@ -1003,8 +1060,13 @@ static int maweb_start(size_t n, instance** inst){  	maweb_instance_data* data = NULL;  	for(u = 0; u < n; u++){ -		//sort channels  		data = (maweb_instance_data*) inst[u]->impl; +		if(!data->hosts){ +			LOGPF("No hosts configured on instance %s", inst[u]->name); +			return 1; +		} + +		//sort channels  		qsort(data->channel, data->channels, sizeof(maweb_channel_data), channel_comparator);  		//re-set channel identifiers @@ -1012,10 +1074,10 @@ static int maweb_start(size_t n, instance** inst){  			data->channel[p].chan->ident = p;  		} -		if(maweb_connect(inst[u])){ -			LOGPF("Failed to open connection for instance %s", inst[u]->name); -			free(inst); -			return 1; +		//try to connect to any available host +		if(maweb_establish(inst[u])){ +			//do not return failure here, keepalive will periodically try to reconnect +			LOGPF("Failed to connect to any host configured on instance %s", inst[u]->name);  		}  	} @@ -1027,15 +1089,26 @@ static int maweb_start(size_t n, instance** inst){  }  static int maweb_shutdown(size_t n, instance** inst){ -	size_t u; +	size_t u, p;  	maweb_instance_data* data = NULL;  	for(u = 0; u < n; u++){  		data = (maweb_instance_data*) inst[u]->impl; + +		for(p = 0; p < data->hosts; p++){ +			//one of these might have failed to allocate +			if(data->host){ +				free(data->host[p]); +			} +			if(data->port){ +				free(data->port[p]); +			} +		}  		free(data->host);  		data->host = NULL;  		free(data->port);  		data->port = NULL; +  		free(data->user);  		data->user = NULL;  		free(data->pass); @@ -1048,7 +1121,7 @@ static int maweb_shutdown(size_t n, instance** inst){  		data->buffer = NULL;  		data->offset = data->allocated = 0; -		data->state = ws_new; +		data->state = ws_closed;  		free(data->channel);  		data->channel = NULL;  | 
