1 
2 
3 /**
4  * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma
5  * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli.
6  *
7  * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 
8  * send.
9  */
10 
11 module mars.websocket;
12 
13 
14 import vibe.core.concurrency;
15 import vibe.core.core;
16 import vibe.core.log;
17 import vibe.core.task;
18 import vibe.http.websockets;
19 
20 enum Flow { received, toSend, connectionLost }
21 struct SocketData { Flow flow; string data; }
22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; }
23 struct HandlerData { string data; }
24 struct HandlerBinaryData { immutable(ubyte)[] data; }
25 
26 enum ReceiveMode { text, binary }
27 
28 /**
29  * Entry point of the task that is handling the websocket connection with the client that has just joined us. */
30 void handleWebSocketConnectionClientToService(scope WebSocket socket)
31 {
32     logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &socket);
33     scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket);
34     scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &socket);
35 
36     vibe.core.core.yield();
37 
38     // ... the HTTP request that established the web socket connection, let's extract the client address & session...
39     string clientAddress = socket.request.clientAddress.toString();
40     string sessionId = socket.request.session.id;
41 
42     // ... we can receive text and binary data, and we start with text ...
43     ReceiveMode receiveMode = ReceiveMode.text; 
44 
45     // Identify the client type and start processing it ...
46     import mars.protohelo : protoHelo;
47     //protoHelo(Proxy(dataDispatcherTask, &receiveMode));
48     protoHelo(socket);
49 }
50 
51 /**
52  * Entry point of the task that is handling the connection that allow the service to push messages to the web client.
53  *
54  * First the client opens the connection that it uses to send messages to the service, THEN this one.
55  */
56 void handleWebSocketConnectionServiceToClient(scope WebSocket socket)
57 {
58     import mars.server : marsServer;
59 
60     logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &socket);
61     scope(success) logInfo("mars - S ... C - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket);
62     scope(failure) logError("mars - S ... C - exiting the websocket handler task for a failure! the socket will be disposed - socket:%s", &socket);
63 
64     import mars.server : marsServer;
65 
66     string clientId = socket.receiveText();
67     logInfo("mars - S ... C - received the client identifier:%s", clientId);
68 
69     // expose this task to the marsClient, so that it can push request to the web client
70     assert(marsServer !is null);
71     auto client = marsServer.getClient(clientId);
72     if( client is null ){
73         logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId);
74         return;
75     }
76     import mars.protomars : MarsProxyStoC;
77     client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId), Task.getThis);
78 
79     logInfo("mars - S ... C - waiting for termination");
80     string terminate = receiveOnly!string();
81     logInfo("mars - S ... C - received the terminate signal:%s", terminate);
82 }
83 
84 struct Proxy {
85     
86     import std.experimental.logger : logInfo = log, trace;
87     alias logError = logInfo;
88 
89     this(Task dispatcher, ReceiveMode* receiveMode)
90     { 
91         import core.atomic : atomicOp;
92 
93         this.dispatcher = dispatcher;
94         this.receiveMode = receiveMode;
95         this.seqIdentifier = sequence; 
96         atomicOp!"+="(sequence, 1);
97     }
98 
99     void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; }
100 
101     string receive() {
102         auto data = receiveOnly!HandlerData();
103         return data.data;
104     }
105 
106     immutable(ubyte)[] receiveBinary() {
107         auto data = receiveOnly!HandlerBinaryData();
108         return data.data;
109     }
110     
111     void send(string data) {
112         //trace("tracing...");
113         dispatcher.send(SocketData(Flow.toSend, data));
114         //trace("tracing...");
115     }
116 
117     void send(immutable(ubyte)[] data) {
118         //trace("tracing...");
119         dispatcher.send(SocketBinaryData(Flow.toSend, data));
120         //trace("tracing...");
121     }
122 
123     private {
124         Task dispatcher;
125         ReceiveMode* receiveMode; 
126 
127         int seqIdentifier;
128         static shared int sequence = 1;
129     }
130 }