1 
2 
3 /**
4  * Vibe gestisce la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma
5  * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli.
6  *
7  * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 
8  * send.
9  */
10 
11 module mars.websocket;
12 
13 
14 import vibe.core.concurrency;
15 import vibe.core.core;
16 import vibe.core.log;
17 import vibe.core.task;
18 import vibe.http.websockets;
19 
20 enum Flow { received, toSend, connectionLost }
21 struct SocketData { Flow flow; string data; }
22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; }
23 struct HandlerData { string data; }
24 struct HandlerBinaryData { immutable(ubyte)[] data; }
25 
26 enum ReceiveMode { text, binary }
27 
28 /**
29  * Entry point of the task that is handling the websocket connection with the client that has just joined us. */
30 void handleWebSocketConnectionClientToService(scope WebSocket webSocket)
31 {
32     logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &webSocket);
33     scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &webSocket);
34     scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &webSocket);
35 
36     try {
37     vibe.core.core.yield();
38 
39     // ... the HTTP request that established the web socket connection, let's extract the client address & session...
40     string clientAddress = webSocket.request.clientAddress.toString();
41     string sessionId = webSocket.request.session.id;
42 
43     // ... we can receive text and binary data, and we start with text ...
44     ReceiveMode receiveMode = ReceiveMode.text; 
45 
46     // Identify the client type and start processing it ...
47     import mars.protohelo : protoHelo;
48     auto socket = ResilientWebSocket(webSocket);
49     protoHelo(socket);
50     } catch(Exception e){ logError("mars - C ... S - catched throwable and retrowing:%s", e.toString); throw e; }
51 }
52 
53 /**
54  * Entry point of the task that is handling the connection that allow the service to push messages to the web client.
55  *
56  * First the client opens the connection that it uses to send messages to the service, THEN this one.
57  */
58 void handleWebSocketConnectionServiceToClient(scope WebSocket webSocket)
59 {
60     import mars.server : marsServer;
61 
62     try {
63     logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &webSocket);
64     scope(success)
65         logInfo("mars - S ... C - exiting the websocket handler task with success, the websocket will be disposed - websocket:%s", &webSocket);
66     scope(failure)
67         logError("mars - S ... C - exiting the websocket handler task for a failure! the websocket will be disposed - socket:%s", &webSocket);
68 
69     auto socket = ResilientWebSocket(webSocket);
70 
71     import mars.server : marsServer;
72 
73     string clientId = socket.receiveText();
74     logInfo("mars - S ... C - received the client identifier:%s", clientId);
75 
76     // expose this task to the marsClient, so that it can push request to the web client
77     assert(marsServer !is null);
78     auto client = marsServer.getClient(clientId);
79     if( client is null ){
80         logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId);
81         return;
82     }
83     import mars.protomars : MarsProxyStoC;
84     auto proxy = MarsProxyStoC!ResilientWebSocket(socket, clientId);
85     client.wireSocket(proxy, Task.getThis);
86 
87     logInfo("mars - S ... C - waiting for termination"); string terminate = receiveOnly!string();
88     logInfo("mars - S ... C - received the terminate signal:%s", terminate);
89 
90     } catch(Exception e){ logError("mars - S ... C - catched throwable and retrowing:%s", e.toString); throw e; }
91 }
92 
93 
94 /**
95  * WebSocket resilient to network disconnections.
96  *
97  * The actual implementation of Ws imply a throw during a read/write if the connection goes down.
98  * Every driver has it's own way of throwing, the actual one it's throwing a plain exception: we try to catch
99  * and inspect the exception to detect if the WS was disconnected, reporting that fact in a clear way.
100  */
101 struct ResilientWebSocket
102 {
103     @disable this(this);
104 
105     /// send data over the websocket. returns true/false if sent or not.
106     bool send(ubyte[] data) {
107         bool sent = true;
108         try {
109             socket.send(data); // ..."throws WebSocketException if the connection is closed." ...
110         }
111         catch(WebSocketException e){
112             logInfo("mars - the websocket seems closed, data not sent");
113             sent = false;
114         }
115         catch(Exception e){
116             logInfo("mars - catched during websocket.send! the exception message is '%s'! trying to handle it", e.msg);
117             switch(e.msg){
118                 case "The remote peer has closed the connection.":
119                 case "WebSocket connection already actively closed.":
120                 case "Remote hung up while writing to TCPConnection.":
121                 case "Connection error while writing to TCPConnection.":
122                 case "Error writing data to socket.": // macOS, vibe 0.8.1, vibecore
123                     logWarn("mars - please classify the exception in websocket module!");
124                     sent = false;
125                     break;
126                 default:
127                     logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
128                     throw e;
129             }
130         }
131         return sent;
132     }
133     bool send ( scope const(char)[] data) {
134         bool sent = true;
135         try {
136             socket.send(data); // ..."throws WebSocketException if the connection is closed." ...
137         }
138         catch(WebSocketException e){
139             logInfo("mars - the websocket seems closed, data not sent");
140             sent = false;
141         }
142         catch(Exception e){
143             logInfo("mars - catched during websocket.send! the exception message is '%s'! trying to handle it", e.msg);
144             switch(e.msg){
145                 case "The remote peer has closed the connection.":
146                 case "WebSocket connection already actively closed.":
147                 case "Remote hung up while writing to TCPConnection.":
148                 case "Connection error while writing to TCPConnection.":
149                 case "Error writing data to socket.": // macOS, vibe 0.8.1, vibecore
150                     logWarn("mars - please classify the exception in websocket module!");
151                     sent = false;
152                     break;
153                 default:
154                     logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
155                     throw e;
156             }
157         }
158         return sent;
159     }
160 
161     string receiveText ( bool strict = true ){
162         try {
163             return socket.receiveText(strict);
164         }
165         catch(Exception e){
166             logError("mars - catched during websocket receiveText! Rethrowing! msg:%s", e);
167             throw e;
168         }
169     }
170 
171     private {
172         WebSocket socket;
173     }
174 
175     ubyte[] receiveBinary ( bool strict = true ){
176         try {
177             return socket.receiveBinary(strict);
178         }
179         catch(WebSocketException e){
180             logInfo("mars - the sebsocket seems closed, data not received");
181             return [];
182         }
183         catch(Exception e){
184             logInfo("mars - catched during websocket.receiveBinary! the exception message is '%s'! trying to handle it", e.msg);
185             switch(e.msg){
186                 default:
187                     logError("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
188                     throw e;
189             }
190         }
191     }
192 }
193