1 
2 
3 /**
4  * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma
5  * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli.
6  *
7  * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 
8  * send.
9  */
10 
11 module mars.websocket;
12 
13 
14 import vibe.core.concurrency;
15 import vibe.core.core;
16 import vibe.core.log;
17 import vibe.core.task;
18 import vibe.http.websockets;
19 
20 enum Flow { received, toSend, connectionLost }
21 struct SocketData { Flow flow; string data; }
22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; }
23 struct HandlerData { string data; }
24 struct HandlerBinaryData { immutable(ubyte)[] data; }
25 
26 enum ReceiveMode { text, binary }
27 
28 /**
29  * Entry point of the task that is handling the websocket connection with the client that has just joined us. */
30 void handleWebSocketConnectionClientToService(scope WebSocket socket)
31 {
32     logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &socket);
33     scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket);
34     scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &socket);
35 
36     vibe.core.core.yield();
37 
38     // ... the HTTP request that established the web socket connection, let's extract the client address & session...
39     string clientAddress = socket.request.clientAddress.toString();
40     string sessionId = socket.request.session.id;
41 
42     // ... we can receive text and binary data, and we start with text ...
43     ReceiveMode receiveMode = ReceiveMode.text; 
44 
45     // Identify the client type and start processing it ...
46     import mars.protohelo : protoHelo;
47     //protoHelo(Proxy(dataDispatcherTask, &receiveMode));
48     protoHelo(socket);
49 
50     // ... we have terminated the client process, 
51     
52 }
53 
54 /**
55  * Entry point of the task that is handling the connection that allow the service to push messages to the web client.
56  *
57  * First the client opens the connection that it uses to send messages to the service, THEN this one.
58  */
59 void handleWebSocketConnectionServiceToClient(scope WebSocket socket)
60 {
61     import mars.server : marsServer;
62 
63     logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &socket);
64     scope(success) logInfo("mars - S ... C - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket);
65     scope(failure) logError("mars - S ... C - exiting the websocket handler task for a failure! the socket will be disposed - socket:%s", &socket);
66 
67     import mars.server : marsServer;
68 
69     string clientId = socket.receiveText();
70     logInfo("mars - S ... C - received the client identifier:%s", clientId);
71 
72     // expose this task to the marsClient, so that it can push request to the web client
73     assert(marsServer !is null);
74     auto client = marsServer.getClient(clientId);
75     if( client is null ){
76         logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId);
77         return;
78     }
79     import mars.protomars : MarsProxyStoC;
80     client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId));
81 
82     logInfo("mars - S ... C - waiting for termination");
83     string terminate = receiveOnly!string();
84 }
85 
86 struct Proxy {
87     
88     import std.experimental.logger : logInfo = log, trace;
89     alias logError = logInfo;
90 
91     this(Task dispatcher, ReceiveMode* receiveMode)
92     { 
93         import core.atomic : atomicOp;
94 
95         this.dispatcher = dispatcher;
96         this.receiveMode = receiveMode;
97         this.seqIdentifier = sequence; 
98         atomicOp!"+="(sequence, 1);
99     }
100 
101     void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; }
102 
103     string receive() {
104         auto data = receiveOnly!HandlerData();
105         return data.data;
106     }
107 
108     immutable(ubyte)[] receiveBinary() {
109         auto data = receiveOnly!HandlerBinaryData();
110         return data.data;
111     }
112     
113     void send(string data) {
114         //trace("tracing...");
115         dispatcher.send(SocketData(Flow.toSend, data));
116         //trace("tracing...");
117     }
118 
119     void send(immutable(ubyte)[] data) {
120         //trace("tracing...");
121         dispatcher.send(SocketBinaryData(Flow.toSend, data));
122         //trace("tracing...");
123     }
124 
125     private {
126         Task dispatcher;
127         ReceiveMode* receiveMode; 
128 
129         int seqIdentifier;
130         static shared int sequence = 1;
131     }
132 }