1 
2 
3 /**
4  * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma
5  * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli.
6  *
7  * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 
8  * send.
9  */
10 
11 module mars.websocket;
12 
13 
14 import vibe.core.concurrency;
15 import vibe.core.core;
16 import vibe.core.log;
17 import vibe.core.task;
18 import vibe.http.websockets;
19 
20 enum Flow { received, toSend, connectionLost }
21 struct SocketData { Flow flow; string data; }
22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; }
23 struct HandlerData { string data; }
24 struct HandlerBinaryData { immutable(ubyte)[] data; }
25 
26 enum ReceiveMode { text, binary }
27 /**
28  * Entry point of the task that is handling the websocket connection with the client that has just joined us. */
29 void handleWebSocketConnectionClientToService(scope WebSocket socket)
30 {
31     logInfo("the webclient connected the S <-- C channel");
32     vibe.core.core.yield();
33 
34     // ... the HTTP request that established the web socket connection, let's extract the client address & session...
35     string clientAddress = socket.request.clientAddress.toString();
36     string sessionId = socket.request.session.id;
37 
38     // ... we can receive text and binary data, and we start with text ...
39     ReceiveMode receiveMode = ReceiveMode.text;
40 
41     /+
42     // Task that receives and dispatch data to/for the socket
43     void dataDispatcher(Task receiver)
44     {
45         //logInfo("task dataDispatcher starting for sessionid %s", sessionId);
46         scope(exit) logInfo("task dataDispatcher terminating for sessionid %s", sessionId);
47         try {
48             while(true)
49             {
50                 if( receiveMode == ReceiveMode.text ){
51                     auto socketData = receiveOnly!SocketData();
52                     final switch(socketData.flow) with(Flow) {
53                         case connectionLost:
54                             logInfo("mars - connection lost received by data dispatcher, so terminating");
55                             return;
56                         case received:
57                             receiver.send(HandlerData(socketData.data));
58                             break;
59                         case toSend:
60                             //logInfo("mars - dispatcher sending data via websocket: %s", socketData.data);
61                             socket.send(socketData.data);
62                             break;
63                     }
64                 }
65                 else {
66                     auto socketData = receiveOnly!SocketBinaryData();
67                     final switch(socketData.flow) with(Flow) {
68                         case connectionLost:
69                             //logInfo("mars - connection lost received by web socket receiver, so terminating");
70                             import mars.msg : MarsMsgType = MsgType;
71                             int messageId = 0; auto msgType = MarsMsgType.aborting;
72                             immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(messageId)))[0 .. 4] 
73                                                            ~ (cast(immutable(ubyte)*)(&(msgType)))[0 .. 4];
74                             //trace("sending forged abort to the protocol", prefix);
75                             // XXX non ho capito perchè, ma con solo il prefix, viene ricevuto sminchio ...
76                             receiver.send(HandlerBinaryData( (prefix ~ prefix).idup ));
77                             //trace("sending forged abort done, returning and exiting the task");
78                             
79                             return;
80                         case received:
81                             receiver.send(HandlerBinaryData(socketData.data));
82                             break;
83                         case toSend:
84                             //logInfo("mars - dispatcher sending binary data via websocket: length %d", socketData.data.length);
85                             socket.send(socketData.data.dup);
86                             break;
87                     }
88                 }
89             }
90         }
91         catch(Exception e){
92             logError("mars - task dataDispatcher exception!");
93             logError(e.toString());
94         }
95     }
96     +/
97 
98     /+
99     // Task that receives from the websocket and dispatch to the above task
100     void wsReceiver(Task receiver)
101     {
102         //logInfo("task wsReceiver starting");
103         //scope(exit) logInfo("task wsReceiver terminating");
104         try {
105             while( socket.waitForData() )
106             {
107                 if( receiveMode == ReceiveMode.text ) {
108                     string data = socket.receiveText(true);
109                     //logInfo("mars - received data from websocket:%s", data);
110                     receiver.send(SocketData(Flow.received, data));
111                 }
112                 else {
113                     immutable(ubyte)[] data = socket.receiveBinary().idup;
114                     //logInfo("mars - received binary data from websocket with length %d", data.length);
115                     receiver.send(SocketBinaryData(Flow.received, data));
116                 }
117             }
118             logInfo("mars - task websocket receiver connection lost!");
119             // inform the other task that the connection is lost
120             if( receiveMode == ReceiveMode.binary ){
121                 receiver.send(SocketBinaryData(Flow.connectionLost));
122             } else {
123                 receiver.send(SocketData(Flow.connectionLost));
124             }
125             //logInfo("mars - task websocket receiver exiting");
126         }
127         catch(Exception e){
128             logError("mars - task wsReceiver exception!");
129             logError(e.toString());
130         }
131     }
132     +/
133 
134     /+
135     // Activate the tasks for this client ....
136 
137     // As we are passing getThis as parameter, THIS task will be notified about data received by the websocket, so the 
138     // Proxy shoud be used by this task ...
139     auto dataDispatcherTask = runTask( &dataDispatcher, Task.getThis );
140     // Start the task that will send the websocket data to the dispatcher task ... 
141     auto wsReceiverTask = runTask( &wsReceiver, dataDispatcherTask );
142     +/  
143 
144     // Identify the client type and start processing it ...
145     import mars.protohelo : protoHelo;
146     //protoHelo(Proxy(dataDispatcherTask, &receiveMode));
147     protoHelo(socket);
148 
149     // ... we have terminated the client process, 
150     logInfo("mars - exiting the task that handled the websocker:%d tasks are running"/+, dataDispatcherTask.taskCounter()+/);
151 }
152 
153 /**
154  * Entry point of the task that is handling the websocket connection with the client that has just joined us. */
155 void handleWebSocketConnectionServiceToClient(scope WebSocket socket)
156 {
157     import mars.server : marsServer;
158 
159     logInfo("the webclient connected the S --> C channel");
160     string clientId = socket.receiveText();
161     logInfo("received the client identifier:%s", clientId);
162 
163     // expose this task to the marsClient, so that it can push request to the web client
164     assert(marsServer !is null);
165     auto client = marsServer.getClient(clientId);
166     if( client is null ){
167         logError("mars - can't find the mars client with id %s in the server registered clients", clientId);
168         return;
169     }
170     import mars.protomars : MarsProxyStoC;
171     client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId));
172 
173     logInfo("push channel waiting for termination");
174     string terminate = receiveOnly!string();
175     logInfo("push channel terminating");
176 }
177 
178 struct Proxy {
179     
180     import std.experimental.logger : logInfo = log, trace;
181     alias logError = logInfo;
182 
183     this(Task dispatcher, ReceiveMode* receiveMode)
184     { 
185         import core.atomic : atomicOp;
186 
187         this.dispatcher = dispatcher;
188         this.receiveMode = receiveMode;
189         this.seqIdentifier = sequence; 
190         atomicOp!"+="(sequence, 1);
191     }
192 
193     void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; }
194 
195     string receive() {
196         auto data = receiveOnly!HandlerData();
197         return data.data;
198     }
199 
200     immutable(ubyte)[] receiveBinary() {
201         auto data = receiveOnly!HandlerBinaryData();
202         return data.data;
203     }
204     
205     void send(string data) {
206         //trace("tracing...");
207         dispatcher.send(SocketData(Flow.toSend, data));
208         //trace("tracing...");
209     }
210 
211     void send(immutable(ubyte)[] data) {
212         //trace("tracing...");
213         dispatcher.send(SocketBinaryData(Flow.toSend, data));
214         //trace("tracing...");
215     }
216 
217     private {
218         Task dispatcher;
219         ReceiveMode* receiveMode; 
220 
221         int seqIdentifier;
222         static shared int sequence = 1;
223     }
224 }