From de23df891940464080297cd9b17324d8ee4601d7 Mon Sep 17 00:00:00 2001
From: cbdev <cb@cbcdn.com>
Date: Sun, 19 May 2019 17:53:23 +0200
Subject: Implement basic peer connection handling

---
 builtins.c  | 33 ++++++++++++++++++---
 websocksy.c | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 websocksy.h | 29 +++++++++++++++++--
 ws_proto.c  | 24 ++++++++++++++--
 4 files changed, 169 insertions(+), 12 deletions(-)

diff --git a/builtins.c b/builtins.c
index 2e3d285..b75dcd0 100644
--- a/builtins.c
+++ b/builtins.c
@@ -1,11 +1,36 @@
 static ws_peer_info default_peer = {
-	0
+	.transport = peer_tcp_client,
+	.host = "localhost",
+	.port = "5900"
 };
 
+//TODO backend configuration
 ws_peer_info backend_defaultpeer_query(char* endpoint, size_t protocols, char** protocol, size_t headers, ws_http_header* header, websocket* ws){
 	//return a copy of the default peer
-	ws_peer_info peer = {
-		0
-	};
+	ws_peer_info peer = default_peer;
+	peer.host = (default_peer.host) ? strdup(default_peer.host) : NULL;
+	peer.port = (default_peer.port) ? strdup(default_peer.port) : NULL;
+
+	//TODO backend protocol discovery
+	peer.protocol = protocols;
 	return peer;
 }
+
+int64_t framing_auto(uint8_t* data, size_t length, size_t last_read, ws_operation* opcode, void** framing_data, char* config){
+	//TODO implement auto framer
+	return length;
+}
+
+int64_t framing_binary(uint8_t* data, size_t length, size_t last_read, ws_operation* opcode, void** framing_data, char* config){
+	return length;
+}
+
+int64_t framing_separator(uint8_t* data, size_t length, size_t last_read, ws_operation* opcode, void** framing_data, char* config){
+	//TODO implement separator framer
+	return length;
+}
+
+int64_t framing_newline(uint8_t* data, size_t length, size_t last_read, ws_operation* opcode, void** framing_data, char* config){
+	//TODO implement separator framer
+	return length;
+}
diff --git a/websocksy.c b/websocksy.c
index 714c3f5..e4f7886 100644
--- a/websocksy.c
+++ b/websocksy.c
@@ -16,6 +16,7 @@
  * - backend config
  * - framing config / per peer?
  * - per-connection framing state
+ * - framing function discovery / registry
  */
 
 /*
@@ -53,8 +54,46 @@ static struct {
 };
 
 int connect_peer(websocket* ws){
+	int rv = 1;
+
 	ws->peer = config.backend.query(ws->request_path, ws->protocols, ws->protocol, ws->headers, ws->header, ws);
-	return 0;
+	if(!ws->peer.host || !ws->peer.port){
+		//no peer provided
+		return 1;
+	}
+
+	//assign default framing function if none provided
+	if(!ws->peer.framing){
+		ws->peer.framing = framing_auto;
+	}
+
+	switch(ws->peer.transport){
+		case peer_tcp_client:
+			ws->peer_fd = network_socket(ws->peer.host, ws->peer.port, SOCK_STREAM, 0);
+			break;
+		case peer_udp_client:
+			ws->peer_fd = network_socket(ws->peer.host, ws->peer.port, SOCK_DGRAM, 0);
+			break;
+		case peer_tcp_server:
+			//TODO implement tcp server mode
+			fprintf(stderr, "TCP Server mode not yet implemented\n");
+			rv = 1;
+			break;
+		case peer_udp_server:
+			ws->peer_fd = network_socket(ws->peer.host, ws->peer.port, SOCK_DGRAM, 1);
+			break;
+		case peer_fifo_tx:
+		case peer_fifo_rx:
+		case peer_unix:
+		default:
+			//TODO implement other peer modes
+			fprintf(stderr, "Peer connection mode not yet implemented\n");
+			rv = 1;
+			break;
+	}
+
+	rv = (ws->peer_fd == -1) ? 1 : 0;
+	return rv;
 }
 
 #include "ws_proto.c"
@@ -82,8 +121,58 @@ int args_parse(int argc, char** argv){
 }
 
 int ws_peer_data(websocket* ws){
-	//TODO
-	return -1;
+	ssize_t bytes_read, bytes_left = sizeof(ws->peer_buffer) - ws->peer_buffer_offset;
+	int64_t bytes_framed;
+	//default to a binary frame
+	ws_operation opcode = ws_frame_binary;
+
+	bytes_read = recv(ws->peer_fd, ws->peer_buffer + ws->peer_buffer_offset, bytes_left - 1, 0);
+	if(bytes_read < 0){
+		fprintf(stderr, "Failed to receive from peer: %s\n", strerror(errno));
+		ws_close(ws, ws_close_unexpected, "Peer connection failed");
+		return 0;
+	}
+	else if(bytes_read == 0){
+		//peer closed connection
+		ws_close(ws, ws_close_normal, "Peer closed connection");
+		return 0;
+	}
+
+	ws->peer_buffer[ws->peer_buffer_offset + bytes_read] = 0;
+
+	do{
+		//call the framing function
+		bytes_framed = ws->peer.framing(ws->peer_buffer, ws->peer_buffer_offset + bytes_read, bytes_read, &opcode, &(ws->peer_framing_data), ws->peer.framing_config);
+		if(bytes_framed > 0){
+			if(bytes_framed > ws->peer_buffer_offset + bytes_read){
+				ws_close(ws, ws_close_unexpected, "Internal error");
+				fprintf(stderr, "Overrun by framing function, have %lu + %lu bytes, framed %lu\n", ws->peer_buffer_offset, bytes_read, bytes_framed);
+				return 0;
+			}
+			//send the indicated n bytes to the websocket peer
+			if(ws_send_frame(ws, opcode, ws->peer_buffer, bytes_framed)){
+				return 1;
+			}
+			//copy back
+			memmove(ws->peer_buffer, ws->peer_buffer + bytes_framed, (ws->peer_buffer_offset + bytes_read) - bytes_framed);
+			//this should not actually happen, but it might with some weird framing functions/protocols
+			if(bytes_framed < ws->peer_buffer_offset){
+				ws->peer_buffer_offset -= bytes_framed;
+			}
+			else{
+				bytes_framed -= ws->peer_buffer_offset;
+				bytes_read -= bytes_framed;
+				ws->peer_buffer_offset = 0;
+			}
+		}
+		else if(bytes_framed < 0){
+			//TODO handle framing errors
+		}
+	}
+	while(bytes_framed && (ws->peer_buffer_offset + bytes_read) > 0);
+
+	ws->peer_buffer_offset += bytes_read;
+	return 0;
 }
 
 int main(int argc, char** argv){
diff --git a/websocksy.h b/websocksy.h
index 97cc338..a3206f7 100644
--- a/websocksy.h
+++ b/websocksy.h
@@ -9,6 +9,8 @@
 
 /* HTTP/WS read buffer size & limit */
 #define WS_MAX_LINE 16384
+/* Peer read buffer size / proxy packet limit */
+#define PEER_BUFFER_SIZE 16384
 /* Maximum number of HTTP headers to accept */
 #define WS_HEADER_LIMIT 10
 
@@ -69,27 +71,45 @@ typedef struct /*_ws_http_header*/ {
  * protocol-dependent, this functionality needs to be user-extendable.
  *
  * We do however provide some default framing functions:
- * 	* binary: Always forward all reads as binary frames
  * 	* auto: Based on the content, forward every read result as binary/text
+ * 	* binary: Always forward all reads as binary frames
  * 	* separator: Separate binary frames on a sequence of bytes
  * 	* newline: Forward text frames separated by newlines (\r\n)
  *
  * The separator function is called once for every succesful read from the peer socket and called
  * again when it indicates a frame boundary but there is still data in the buffer.
+ * The `framing_data` pointer can be used to store protocol-dependent data on a per-connection basis.
+ * If the pointer is nonzero when the connection is terminated, the function will be called with a
+ * NULL `data` pointer as an indication the any allocation within `framing_data` is to be freed.
+ * The return value is the number of bytes to be sent to the peer.
+ */
+typedef int64_t (*ws_framing)(uint8_t* data, size_t length, size_t last_read, ws_operation* opcode, void** framing_data, char* config);
+
+/*
+ * Modes of peer connection establishment
  */
-typedef int64_t (*ws_framing)(void);
+typedef enum {
+	peer_tcp_client,
+	peer_udp_client,
+	peer_tcp_server,
+	peer_udp_server,
+	peer_fifo_tx,
+	peer_fifo_rx,
+	peer_unix
+} peer_transport;
 
 /*
  * Peer connection model
  */
 typedef struct /*_ws_peer_info*/ {
 	/* Peer protocol data */
-	int transport;
+	peer_transport transport;
 	char* host;
 	char* port;
 
 	/* Framing function for this peer */
 	ws_framing framing;
+	char* framing_config;
 
 	/* WebSocket subprotocol indication index*/
 	size_t protocol;
@@ -122,6 +142,9 @@ typedef struct /*_web_socket*/ {
 	/* Peer data */
 	ws_peer_info peer;
 	int peer_fd;
+	uint8_t peer_buffer[PEER_BUFFER_SIZE];
+	size_t peer_buffer_offset;
+	void* peer_framing_data;
 } websocket;
 
 /*
diff --git a/ws_proto.c b/ws_proto.c
index 35d9933..54da838 100644
--- a/ws_proto.c
+++ b/ws_proto.c
@@ -28,6 +28,11 @@ int ws_close(websocket* ws, ws_close_reason code, char* reason){
 	if(ws->peer_fd >= 0){
 		close(ws->peer_fd);
 		ws->peer_fd = -1;
+		//clean up framing data
+		if(ws->peer_framing_data){
+			ws->peer.framing(NULL, 0, 0, NULL, &(ws->peer_framing_data), ws->peer.framing_config);
+			ws->peer_framing_data = NULL;
+		}
 	}
 
 	for(p = 0; p < ws->headers; p++){
@@ -45,6 +50,7 @@ int ws_close(websocket* ws, ws_close_reason code, char* reason){
 	ws->protocol = NULL;
 
 	ws->read_buffer_offset = 0;
+	ws->peer_buffer_offset = 0;
 
 	free(ws->request_path);
 	ws->request_path = NULL;
@@ -57,6 +63,7 @@ int ws_close(websocket* ws, ws_close_reason code, char* reason){
 
 	free(ws->peer.host);
 	free(ws->peer.port);
+	free(ws->peer.framing_config);
 	ws->peer = empty_peer;
 
 	return 0;
@@ -365,7 +372,13 @@ size_t ws_frame(websocket* ws){
 		case ws_frame_text:
 			fprintf(stderr, "Text payload: %.*s\n", (int) payload_length, (char*) payload);
 		case ws_frame_binary:
-			//TODO forward to peer
+			//forward to peer
+			if(ws->peer_fd >= 0){
+				fprintf(stderr, "WS -> Peer %lu bytes\n", payload_length);
+				if(network_send(ws->peer_fd, payload, payload_length)){
+					ws_close(ws, ws_close_unexpected, "Failed to forward");
+				}
+			}
 			break;
 		case ws_frame_close:
 			ws_close(ws, ws_close_normal, "Client requested termination");
@@ -375,13 +388,20 @@ size_t ws_frame(websocket* ws){
 		case ws_frame_pong:
 			break;
 		default:
-			//TODO unknown frame type received
+			//unknown frame type received
+			fprintf(stderr, "Unknown WebSocket opcode %02X in frame\n", WS_GET_OP(ws->read_buffer[0]));
+			ws_close(ws, ws_close_proto, "Invalid opcode");
 			break;
 	}
 
 	return ((payload - ws->read_buffer) + payload_length);
 }
 
+int ws_send_frame(websocket* ws, ws_operation opcode, uint8_t* data, size_t len){
+	fprintf(stderr, "Peer -> WS %lu bytes\n", len);
+	return 0;
+}
+
 int ws_data(websocket* ws){
 	ssize_t bytes_read, n, bytes_left = sizeof(ws->read_buffer) - ws->read_buffer_offset;
 	int rv = 0;
-- 
cgit v1.2.3