aboutsummaryrefslogtreecommitdiffhomepage
path: root/backends/maweb.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/maweb.c')
-rw-r--r--backends/maweb.c114
1 files changed, 75 insertions, 39 deletions
diff --git a/backends/maweb.c b/backends/maweb.c
index 97d4cea..8b878b0 100644
--- a/backends/maweb.c
+++ b/backends/maweb.c
@@ -1,4 +1,5 @@
#define BACKEND_NAME "maweb"
+//#define DEBUG
#include <string.h>
#include <unistd.h>
@@ -15,14 +16,11 @@
#define WS_FLAG_FIN 0x80
#define WS_FLAG_MASK 0x80
-/*
- * TODO handle peer close/unregister/reopen and fallback connections
- */
+static void maweb_disconnect(instance* inst);
static uint64_t last_keepalive = 0;
-static uint64_t update_interval = 50;
+static uint64_t update_interval = 0;
static uint64_t last_update = 0;
-static uint64_t updates_inflight = 0;
static uint64_t quiet_mode = 0;
static maweb_command_key cmdline_keys[] = {
@@ -136,7 +134,10 @@ static int channel_comparator(const void* raw_a, const void* raw_b){
}
static uint32_t maweb_interval(){
- return update_interval - (last_update % update_interval);
+ if(update_interval){
+ return update_interval - (last_update % update_interval);
+ }
+ return 0;
}
static int maweb_configure(char* option, char* value){
@@ -248,7 +249,7 @@ static int maweb_instance(instance* inst){
static channel* maweb_channel(instance* inst, char* spec, uint8_t flags){
maweb_instance_data* data = (maweb_instance_data*) inst->impl;
maweb_channel_data chan = {
- 0
+ .in = -1 //this hack allows the initial data request to push events even for zero'ed channels
};
char* next_token = NULL;
channel* channel_ref = NULL;
@@ -352,8 +353,7 @@ static int maweb_send_frame(instance* inst, maweb_operation op, uint8_t* payload
if(mmbackend_send(data->fd, frame_header, header_bytes)
|| mmbackend_send(data->fd, payload, len)){
LOGPF("Failed to send on instance %s, assuming connection failure", inst->name);
- data->state = ws_closed;
- data->login = 0;
+ maweb_disconnect(inst);
return 1;
}
@@ -423,6 +423,7 @@ static int maweb_process_playback(instance* inst, int64_t page, maweb_channel_ty
}
static int maweb_process_playbacks(instance* inst, int64_t page, char* payload, size_t payload_length){
+ maweb_instance_data* data = (maweb_instance_data*) inst->impl;
size_t base_offset = json_obj_offset(payload, "itemGroups"), group_offset, subgroup_offset, item_offset;
uint64_t group = 0, subgroup, item, metatype;
@@ -466,8 +467,9 @@ static int maweb_process_playbacks(instance* inst, int64_t page, char* payload,
}
group++;
}
- updates_inflight--;
- DBGPF("Playback message processing done, %" PRIu64 " updates inflight", updates_inflight);
+
+ data->updates_inflight--;
+ DBGPF("Playback message processing done, %" PRIu64 " updates inflight on %s", data->updates_inflight, inst->name);
return 0;
}
@@ -479,9 +481,9 @@ static int maweb_request_playbacks(instance* inst){
char item_indices[1024] = "[300,400,500]", item_counts[1024] = "[16,16,16]", item_types[1024] = "[3,3,3]";
size_t page_index = 0, view = 3, channel = 0, offsets[3], channel_offset, channels;
- if(updates_inflight){
+ if(data->updates_inflight){
if(quiet_mode < 1){
- LOGPF("Skipping update request, %" PRIu64 " updates still inflight - consider raising the interval time", updates_inflight);
+ LOGPF("Skipping update request on %s, %" PRIu64 " updates still inflight - consider raising the interval time", inst->name, data->updates_inflight);
}
return 0;
}
@@ -572,15 +574,16 @@ static int maweb_request_playbacks(instance* inst){
data->session);
maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));
DBGPF("Poll request: %s", xmit_buffer);
- updates_inflight++;
+ data->updates_inflight++;
}
- DBGPF("Poll request handling done, %" PRIu64 " updates requested", updates_inflight);
+ DBGPF("Poll request handling done, %" PRIu64 " updates requested on %s", data->updates_inflight, inst->name);
return rv;
}
static int maweb_handle_message(instance* inst, char* payload, size_t payload_length){
char xmit_buffer[MAWEB_XMIT_CHUNK];
+ int64_t session = 0;
char* field;
maweb_instance_data* data = (maweb_instance_data*) inst->impl;
@@ -591,31 +594,50 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le
if(json_obj_bool(payload, "result", 0)){
LOG("Login successful");
data->login = 1;
+
+ //initially request playbacks
+ if(!update_interval){
+ maweb_request_playbacks(inst);
+ }
}
else{
- LOG("Login failed");
data->login = 0;
+
+ if(data->hosts > 1){
+ LOGPF("Console login failed on %s, will try again with the next host", inst->name);
+ maweb_disconnect(inst);
+ }
+ else{
+ LOGPF("Console login failed on %s", inst->name);
+ }
+ return 0;
}
}
if(!strncmp(field, "playbacks", 9)){
if(maweb_process_playbacks(inst, json_obj_int(payload, "iPage", 0), payload, payload_length)){
LOG("Failed to handle/request input data");
}
+
+ //request playbacks again if configured
+ if(!update_interval && data->login && !data->updates_inflight){
+ maweb_request_playbacks(inst);
+ }
return 0;
}
}
DBGPF("Incoming message (%" PRIsize_t "): %s", payload_length, payload);
if(json_obj(payload, "session") == JSON_NUMBER){
- data->session = json_obj_int(payload, "session", data->session);
- if(data->session < 0){
- LOG("Login failed");
- data->login = 0;
- return 0;
+ session = json_obj_int(payload, "session", data->session);
+ if(session < 0){
+ LOG("Invalid web remote session identifier received, closing connection");
+ maweb_disconnect(inst);
+ return 0;
}
- if(quiet_mode < 2){
- LOGPF("Session id is now %" PRId64, data->session);
+ if(data->session != session){
+ LOGPF("Web remote session ID changed from %" PRId64 " to %" PRId64 "", data->session, session);
}
+ data->session = session;
}
if(json_obj_bool(payload, "forceLogin", 0)){
@@ -642,6 +664,30 @@ static int maweb_handle_message(instance* inst, char* payload, size_t payload_le
return 0;
}
+static void maweb_disconnect(instance* inst){
+ maweb_instance_data* data = (maweb_instance_data*) inst->impl;
+ char xmit_buffer[MAWEB_XMIT_CHUNK];
+
+ if(data->fd){
+ //close the session if one is active
+ if(data->session > 0){
+ snprintf(xmit_buffer, sizeof(xmit_buffer), "{\"requestType\":\"close\",\"session\":%" PRIu64 "}", data->session);
+ maweb_send_frame(inst, ws_text, (uint8_t*) xmit_buffer, strlen(xmit_buffer));
+ }
+
+ mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
+ close(data->fd);
+ }
+
+ data->fd = -1;
+ data->state = ws_closed;
+ data->login = 0;
+ data->session = -1;
+ data->peer_type = peer_unidentified;
+ data->offset = 0;
+ data->updates_inflight = 0;
+}
+
static int maweb_connect(instance* inst){
int rv = 1;
maweb_instance_data* data = (maweb_instance_data*) inst->impl;
@@ -650,14 +696,8 @@ static int maweb_connect(instance* inst){
goto bail;
}
- //unregister old fd from core
- if(data->fd >= 0){
- mm_manage_fd(data->fd, BACKEND_NAME, 0, NULL);
- close(data->fd);
- data->fd = -1;
- }
- data->state = ws_closed;
- data->login = 0;
+ //close old connection and reset state
+ maweb_disconnect(inst);
LOGPF("Connecting to host %" PRIsize_t " of %" PRIsize_t " on %s", data->next_host + 1, data->hosts, inst->name);
@@ -1047,7 +1087,7 @@ static int maweb_handle(size_t num, managed_fd* fds){
last_keepalive = mm_timestamp();
}
- if(last_update && mm_timestamp() - last_update >= update_interval){
+ if(update_interval && last_update && mm_timestamp() - last_update >= update_interval){
rv |= maweb_poll();
last_update = mm_timestamp();
}
@@ -1071,7 +1111,7 @@ static int maweb_start(size_t n, instance** inst){
//re-set channel identifiers
for(p = 0; p < data->channels; p++){
- data->channel[p].chan->ident = p;
+ mm_channel_update(data->channel[p].chan, p);
}
//try to connect to any available host
@@ -1114,14 +1154,10 @@ static int maweb_shutdown(size_t n, instance** inst){
free(data->pass);
data->pass = NULL;
- close(data->fd);
- data->fd = -1;
-
+ maweb_disconnect(inst[u]);
free(data->buffer);
data->buffer = NULL;
-
- data->offset = data->allocated = 0;
- data->state = ws_closed;
+ data->allocated = 0;
free(data->channel);
data->channel = NULL;