1 2 3 /** 4 * Vibe gestisce la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma 5 * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli. 6 * 7 * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 8 * send. 9 */ 10 11 module mars.websocket; 12 13 14 import vibe.core.concurrency; 15 import vibe.core.core; 16 import vibe.core.log; 17 import vibe.core.task; 18 import vibe.http.websockets; 19 20 enum Flow { received, toSend, connectionLost } 21 struct SocketData { Flow flow; string data; } 22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; } 23 struct HandlerData { string data; } 24 struct HandlerBinaryData { immutable(ubyte)[] data; } 25 26 enum ReceiveMode { text, binary } 27 28 /** 29 * Entry point of the task that is handling the websocket connection with the client that has just joined us. */ 30 void handleWebSocketConnectionClientToService(scope WebSocket webSocket) 31 { 32 logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &webSocket); 33 scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &webSocket); 34 scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &webSocket); 35 36 try { 37 vibe.core.core.yield(); 38 39 // ... the HTTP request that established the web socket connection, let's extract the client address & session... 40 string clientAddress = webSocket.request.clientAddress.toString(); 41 string sessionId = webSocket.request.session.id; 42 43 // ... we can receive text and binary data, and we start with text ... 44 ReceiveMode receiveMode = ReceiveMode.text; 45 46 // Identify the client type and start processing it ... 47 import mars.protohelo : protoHelo; 48 auto socket = ResilientWebSocket(webSocket); 49 protoHelo(socket); 50 } catch(Exception e){ logError("mars - C ... S - catched throwable and retrowing:%s", e.toString); throw e; } 51 } 52 53 /** 54 * Entry point of the task that is handling the connection that allow the service to push messages to the web client. 55 * 56 * First the client opens the connection that it uses to send messages to the service, THEN this one. 57 */ 58 void handleWebSocketConnectionServiceToClient(scope WebSocket webSocket) 59 { 60 import mars.server : marsServer; 61 62 try { 63 logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &webSocket); 64 scope(success) 65 logInfo("mars - S ... C - exiting the websocket handler task with success, the websocket will be disposed - websocket:%s", &webSocket); 66 scope(failure) 67 logError("mars - S ... C - exiting the websocket handler task for a failure! the websocket will be disposed - socket:%s", &webSocket); 68 69 auto socket = ResilientWebSocket(webSocket); 70 71 import mars.server : marsServer; 72 73 string clientId = socket.receiveText(); 74 logInfo("mars - S ... C - received the client identifier:%s", clientId); 75 76 // expose this task to the marsClient, so that it can push request to the web client 77 assert(marsServer !is null); 78 auto client = marsServer.getClient(clientId); 79 if( client is null ){ 80 logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId); 81 return; 82 } 83 import mars.protomars : MarsProxyStoC; 84 auto proxy = MarsProxyStoC!ResilientWebSocket(socket, clientId); 85 client.wireSocket(proxy, Task.getThis); 86 87 logInfo("mars - S ... C - waiting for termination"); string terminate = receiveOnly!string(); 88 logInfo("mars - S ... C - received the terminate signal:%s", terminate); 89 90 } catch(Exception e){ logError("mars - S ... C - catched throwable and retrowing:%s", e.toString); throw e; } 91 } 92 93 94 /** 95 * WebSocket resilient to network disconnections. 96 * 97 * The actual implementation of Ws imply a throw during a read/write if the connection goes down. 98 * Every driver has it's own way of throwing, the actual one it's throwing a plain exception: we try to catch 99 * and inspect the exception to detect if the WS was disconnected, reporting that fact in a clear way. 100 */ 101 struct ResilientWebSocket 102 { 103 @disable this(this); 104 105 /// send data over the websocket. returns true/false if sent or not. 106 bool send(ubyte[] data) { 107 bool sent = true; 108 try { 109 socket.send(data); // ..."throws WebSocketException if the connection is closed." ... 110 } 111 catch(WebSocketException e){ 112 logInfo("mars - the websocket seems closed, data not sent"); 113 sent = false; 114 } 115 catch(Exception e){ 116 logInfo("mars - catched during websocket.send! the exception message is '%s'! trying to handle it", e.msg); 117 switch(e.msg){ 118 case "The remote peer has closed the connection.": 119 case "WebSocket connection already actively closed.": 120 case "Remote hung up while writing to TCPConnection.": 121 case "Connection error while writing to TCPConnection.": 122 case "Error writing data to socket.": // macOS, vibe 0.8.1, vibecore 123 logWarn("mars - please classify the exception in websocket module!"); 124 sent = false; 125 break; 126 default: 127 logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 128 throw e; 129 } 130 } 131 return sent; 132 } 133 bool send ( scope const(char)[] data) { 134 bool sent = true; 135 try { 136 socket.send(data); // ..."throws WebSocketException if the connection is closed." ... 137 } 138 catch(WebSocketException e){ 139 logInfo("mars - the websocket seems closed, data not sent"); 140 sent = false; 141 } 142 catch(Exception e){ 143 logInfo("mars - catched during websocket.send! the exception message is '%s'! trying to handle it", e.msg); 144 switch(e.msg){ 145 case "The remote peer has closed the connection.": 146 case "WebSocket connection already actively closed.": 147 case "Remote hung up while writing to TCPConnection.": 148 case "Connection error while writing to TCPConnection.": 149 case "Error writing data to socket.": // macOS, vibe 0.8.1, vibecore 150 logWarn("mars - please classify the exception in websocket module!"); 151 sent = false; 152 break; 153 default: 154 logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 155 throw e; 156 } 157 } 158 return sent; 159 } 160 161 string receiveText ( bool strict = true ){ 162 try { 163 return socket.receiveText(strict); 164 } 165 catch(Exception e){ 166 logError("mars - catched during websocket receiveText! Rethrowing! msg:%s", e); 167 throw e; 168 } 169 } 170 171 private { 172 WebSocket socket; 173 } 174 175 ubyte[] receiveBinary ( bool strict = true ){ 176 try { 177 return socket.receiveBinary(strict); 178 } 179 catch(WebSocketException e){ 180 logInfo("mars - the sebsocket seems closed, data not received"); 181 return []; 182 } 183 catch(Exception e){ 184 logInfo("mars - catched during websocket.receiveBinary! the exception message is '%s'! trying to handle it", e.msg); 185 switch(e.msg){ 186 default: 187 logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 188 throw e; 189 } 190 } 191 } 192 } 193