diff options
Diffstat (limited to 'backends/maweb.c')
-rw-r--r-- | backends/maweb.c | 114 |
1 files changed, 75 insertions, 39 deletions
diff --git a/backends/maweb.c b/backends/maweb.c index 97d4cea..8b878b0 100644 --- a/backends/maweb.c +++ b/backends/maweb.c @@ -1,4 +1,5 @@ #define BACKEND_NAME "maweb" +//#define DEBUG #include <string.h> #include <unistd.h> @@ -15,14 +16,11 @@ #define WS_FLAG_FIN 0x80 #define WS_FLAG_MASK 0x80 -/* - * TODO handle peer close/unregister/reopen and fallback connections - */ +static void maweb_disconnect(instance* inst); static uint64_t last_keepalive = 0; -static uint64_t update_interval = 50; +static uint64_t update_interval = 0; static uint64_t last_update = 0; -static uint64_t updates_inflight = 0; static uint64_t quiet_mode = 0; static maweb_command_key cmdline_keys[] = { @@ -136,7 +134,10 @@ static int channel_comparator(const void* raw_a, const void* raw_b){ } static uint32_t maweb_interval(){ - return update_interval - (last_update % update_interval); + if(update_interval){ + return update_interval - (last_update % update_interval); + } + return 0; } static int maweb_configure(char* option, char* value){ @@ -248,7 +249,7 @@ static int maweb_instance(instance* inst){ static channel* maweb_channel(instance* inst, char* spec, uint8_t flags){ maweb_instance_data* data = (maweb_instance_data*) inst->impl; maweb_channel_data chan = { - 0 + .in = -1 //this hack allows the initial data request to push events even for zero'ed channels }; char* next_token = NULL; channel* channel_ref = NULL; @@ -352,8 +353,7 @@ static int maweb_send_frame(instance* inst, maweb_operation op, uint8_t* payload if(mmbackend_send(data->fd, frame_header, header_bytes) || mmbackend_send(data->fd, payload, len)){ LOGPF("Failed to send on instance %s, assuming connection failure", inst->name); - data->state = ws_closed; - data->login = 0; + maweb_disconnect(inst); return 1; } @@ -423,6 +423,7 @@ static int maweb_process_playback(instance* inst, int64_t page, maweb_channel_ty } static int maweb_process_playbacks(instance* inst, int64_t page, char* payload, size_t payload_length){ + maweb_instance_data* data = (maweb_instance_data*) inst->impl; size_t base_offset = json_obj_offset(payload, "itemGroups"), group_offset, subgroup_offset, item_offset; uint64_t group = 0, subgroup, item, metatype; @@ -466,8 +467,9 @@ static int maweb_process_playbacks(instance* inst, int64_t page, char* payload, } group++; } - updates_inflight--; - DBGPF("Playback message processing done, %" PRIu64 " updates inflight", updates_inflight); + + data->updates_inflight--; + DBGPF("Playback message processing done, %" PRIu64 " updates inflight on %s", data->updates_inflight, inst->name); return 0; } @@ -479,9 +481,9 @@ static int maweb_request_playbacks(instance* inst){ char item_indices[1024] = "[300,400,500]", item_counts[1024] = "[16,16,16]", item_types[1024] = "[3,3,3]"; size_t page_index = 0, view = 3, channel = 0, offsets[3], channel_offset, channels; - if(updates_inflight){ + if(data->updates_inflight){ if(quiet_mode < 1){ - LOGPF("Skipping update request, %" PRIu64 " updates still inflight - consider raising the interval time", updates_inflight); + LOGPF("Skipping update request on %s, %" PRIu64 " updates still inflight - consider raising the interval time", inst->name, data->updates_inflight); } return 0; } @@ -572,15 +574,16 @@ static int maweb_request_playbacks(instance* inst){ data->session); maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer)); DBGPF("Poll request: %s", xmit_buffer); - updates_inflight++; + data->updates_inflight++; } - DBGPF("Poll request handling done, %" PRIu64 " updates requested", updates_inflight); + DBGPF("Poll request handling done, %" PRIu64 " updates requested on %s", data->updates_inflight, inst->name); return rv; } static int maweb_handle_message(instance* inst, char* payload, size_t payload_length){ char xmit_buffer[MAWEB_XMIT_CHUNK]; + int64_t session = 0; char* field; maweb_instance_data* data = (maweb_instance_data*) inst->impl; @@ -591,31 +594,50 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le if(json_obj_bool(payload, "result", 0)){ LOG("Login successful"); data->login = 1; + + //initially request playbacks + if(!update_interval){ + maweb_request_playbacks(inst); + } } else{ - LOG("Login failed"); data->login = 0; + + if(data->hosts > 1){ + LOGPF("Console login failed on %s, will try again with the next host", inst->name); + maweb_disconnect(inst); + } + else{ + LOGPF("Console login failed on %s", inst->name); + } + return 0; } } if(!strncmp(field, "playbacks", 9)){ if(maweb_process_playbacks(inst, json_obj_int(payload, "iPage", 0), payload, payload_length)){ LOG("Failed to handle/request input data"); } + + //request playbacks again if configured + if(!update_interval && data->login && !data->updates_inflight){ + maweb_request_playbacks(inst); + } return 0; } } DBGPF("Incoming message (%" PRIsize_t "): %s", payload_length, payload); if(json_obj(payload, "session") == JSON_NUMBER){ - data->session = json_obj_int(payload, "session", data->session); - if(data->session < 0){ - LOG("Login failed"); - data->login = 0; - return 0; + session = json_obj_int(payload, "session", data->session); + if(session < 0){ + LOG("Invalid web remote session identifier received, closing connection"); + maweb_disconnect(inst); + return 0; } - if(quiet_mode < 2){ - LOGPF("Session id is now %" PRId64, data->session); + if(data->session != session){ + LOGPF("Web remote session ID changed from %" PRId64 " to %" PRId64 "", data->session, session); } + data->session = session; } if(json_obj_bool(payload, "forceLogin", 0)){ @@ -642,6 +664,30 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le return 0; } +static void maweb_disconnect(instance* inst){ + maweb_instance_data* data = (maweb_instance_data*) inst->impl; + char xmit_buffer[MAWEB_XMIT_CHUNK]; + + if(data->fd){ + //close the session if one is active + if(data->session > 0){ + snprintf(xmit_buffer, sizeof(xmit_buffer), "{\"requestType\":\"close\",\"session\":%" PRIu64 "}", data->session); + maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer)); + } + + mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); + close(data->fd); + } + + data->fd = -1; + data->state = ws_closed; + data->login = 0; + data->session = -1; + data->peer_type = peer_unidentified; + data->offset = 0; + data->updates_inflight = 0; +} + static int maweb_connect(instance* inst){ int rv = 1; maweb_instance_data* data = (maweb_instance_data*) inst->impl; @@ -650,14 +696,8 @@ static int maweb_connect(instance* inst){ goto bail; } - //unregister old fd from core - if(data->fd >= 0){ - mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL); - close(data->fd); - data->fd = -1; - } - data->state = ws_closed; - data->login = 0; + //close old connection and reset state + maweb_disconnect(inst); LOGPF("Connecting to host %" PRIsize_t " of %" PRIsize_t " on %s", data->next_host + 1, data->hosts, inst->name); @@ -1047,7 +1087,7 @@ static int maweb_handle(size_t num, managed_fd* fds){ last_keepalive = mm_timestamp(); } - if(last_update && mm_timestamp() - last_update >= update_interval){ + if(update_interval && last_update && mm_timestamp() - last_update >= update_interval){ rv |= maweb_poll(); last_update = mm_timestamp(); } @@ -1071,7 +1111,7 @@ static int maweb_start(size_t n, instance** inst){ //re-set channel identifiers for(p = 0; p < data->channels; p++){ - data->channel[p].chan->ident = p; + mm_channel_update(data->channel[p].chan, p); } //try to connect to any available host @@ -1114,14 +1154,10 @@ static int maweb_shutdown(size_t n, instance** inst){ free(data->pass); data->pass = NULL; - close(data->fd); - data->fd = -1; - + maweb_disconnect(inst[u]); free(data->buffer); data->buffer = NULL; - - data->offset = data->allocated = 0; - data->state = ws_closed; + data->allocated = 0; free(data->channel); data->channel = NULL; |