1 2 3 /** 4 * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma 5 * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli. 6 * 7 * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 8 * send. 9 */ 10 11 module mars.websocket; 12 13 14 import vibe.core.concurrency; 15 import vibe.core.core; 16 import vibe.core.log; 17 import vibe.core.task; 18 import vibe.http.websockets; 19 20 enum Flow { received, toSend, connectionLost } 21 struct SocketData { Flow flow; string data; } 22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; } 23 struct HandlerData { string data; } 24 struct HandlerBinaryData { immutable(ubyte)[] data; } 25 26 enum ReceiveMode { text, binary } 27 /** 28 * Entry point of the task that is handling the websocket connection with the client that has just joined us. */ 29 void handleWebSocketConnectionClientToService(scope WebSocket socket) 30 { 31 logInfo("the webclient connected the S <-- C channel"); 32 vibe.core.core.yield(); 33 34 // ... the HTTP request that established the web socket connection, let's extract the client address & session... 35 string clientAddress = socket.request.clientAddress.toString(); 36 string sessionId = socket.request.session.id; 37 38 // ... we can receive text and binary data, and we start with text ... 39 ReceiveMode receiveMode = ReceiveMode.text; 40 41 /+ 42 // Task that receives and dispatch data to/for the socket 43 void dataDispatcher(Task receiver) 44 { 45 //logInfo("task dataDispatcher starting for sessionid %s", sessionId); 46 scope(exit) logInfo("task dataDispatcher terminating for sessionid %s", sessionId); 47 try { 48 while(true) 49 { 50 if( receiveMode == ReceiveMode.text ){ 51 auto socketData = receiveOnly!SocketData(); 52 final switch(socketData.flow) with(Flow) { 53 case connectionLost: 54 logInfo("mars - connection lost received by data dispatcher, so terminating"); 55 return; 56 case received: 57 receiver.send(HandlerData(socketData.data)); 58 break; 59 case toSend: 60 //logInfo("mars - dispatcher sending data via websocket: %s", socketData.data); 61 socket.send(socketData.data); 62 break; 63 } 64 } 65 else { 66 auto socketData = receiveOnly!SocketBinaryData(); 67 final switch(socketData.flow) with(Flow) { 68 case connectionLost: 69 //logInfo("mars - connection lost received by web socket receiver, so terminating"); 70 import mars.msg : MarsMsgType = MsgType; 71 int messageId = 0; auto msgType = MarsMsgType.aborting; 72 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(messageId)))[0 .. 4] 73 ~ (cast(immutable(ubyte)*)(&(msgType)))[0 .. 4]; 74 //trace("sending forged abort to the protocol", prefix); 75 // XXX non ho capito perchè, ma con solo il prefix, viene ricevuto sminchio ... 76 receiver.send(HandlerBinaryData( (prefix ~ prefix).idup )); 77 //trace("sending forged abort done, returning and exiting the task"); 78 79 return; 80 case received: 81 receiver.send(HandlerBinaryData(socketData.data)); 82 break; 83 case toSend: 84 //logInfo("mars - dispatcher sending binary data via websocket: length %d", socketData.data.length); 85 socket.send(socketData.data.dup); 86 break; 87 } 88 } 89 } 90 } 91 catch(Exception e){ 92 logError("mars - task dataDispatcher exception!"); 93 logError(e.toString()); 94 } 95 } 96 +/ 97 98 /+ 99 // Task that receives from the websocket and dispatch to the above task 100 void wsReceiver(Task receiver) 101 { 102 //logInfo("task wsReceiver starting"); 103 //scope(exit) logInfo("task wsReceiver terminating"); 104 try { 105 while( socket.waitForData() ) 106 { 107 if( receiveMode == ReceiveMode.text ) { 108 string data = socket.receiveText(true); 109 //logInfo("mars - received data from websocket:%s", data); 110 receiver.send(SocketData(Flow.received, data)); 111 } 112 else { 113 immutable(ubyte)[] data = socket.receiveBinary().idup; 114 //logInfo("mars - received binary data from websocket with length %d", data.length); 115 receiver.send(SocketBinaryData(Flow.received, data)); 116 } 117 } 118 logInfo("mars - task websocket receiver connection lost!"); 119 // inform the other task that the connection is lost 120 if( receiveMode == ReceiveMode.binary ){ 121 receiver.send(SocketBinaryData(Flow.connectionLost)); 122 } else { 123 receiver.send(SocketData(Flow.connectionLost)); 124 } 125 //logInfo("mars - task websocket receiver exiting"); 126 } 127 catch(Exception e){ 128 logError("mars - task wsReceiver exception!"); 129 logError(e.toString()); 130 } 131 } 132 +/ 133 134 /+ 135 // Activate the tasks for this client .... 136 137 // As we are passing getThis as parameter, THIS task will be notified about data received by the websocket, so the 138 // Proxy shoud be used by this task ... 139 auto dataDispatcherTask = runTask( &dataDispatcher, Task.getThis ); 140 // Start the task that will send the websocket data to the dispatcher task ... 141 auto wsReceiverTask = runTask( &wsReceiver, dataDispatcherTask ); 142 +/ 143 144 // Identify the client type and start processing it ... 145 import mars.protohelo : protoHelo; 146 //protoHelo(Proxy(dataDispatcherTask, &receiveMode)); 147 protoHelo(socket); 148 149 // ... we have terminated the client process, 150 logInfo("mars - exiting the task that handled the websocker:%d tasks are running"/+, dataDispatcherTask.taskCounter()+/); 151 } 152 153 /** 154 * Entry point of the task that is handling the websocket connection with the client that has just joined us. */ 155 void handleWebSocketConnectionServiceToClient(scope WebSocket socket) 156 { 157 import mars.server : marsServer; 158 159 logInfo("the webclient connected the S --> C channel"); 160 string clientId = socket.receiveText(); 161 logInfo("received the client identifier:%s", clientId); 162 163 // expose this task to the marsClient, so that it can push request to the web client 164 assert(marsServer !is null); 165 auto client = marsServer.getClient(clientId); 166 if( client is null ){ 167 logError("mars - can't find the mars client with id %s in the server registered clients", clientId); 168 return; 169 } 170 import mars.protomars : MarsProxyStoC; 171 client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId)); 172 173 logInfo("push channel waiting for termination"); 174 string terminate = receiveOnly!string(); 175 logInfo("push channel terminating"); 176 } 177 178 struct Proxy { 179 180 import std.experimental.logger : logInfo = log, trace; 181 alias logError = logInfo; 182 183 this(Task dispatcher, ReceiveMode* receiveMode) 184 { 185 import core.atomic : atomicOp; 186 187 this.dispatcher = dispatcher; 188 this.receiveMode = receiveMode; 189 this.seqIdentifier = sequence; 190 atomicOp!"+="(sequence, 1); 191 } 192 193 void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; } 194 195 string receive() { 196 auto data = receiveOnly!HandlerData(); 197 return data.data; 198 } 199 200 immutable(ubyte)[] receiveBinary() { 201 auto data = receiveOnly!HandlerBinaryData(); 202 return data.data; 203 } 204 205 void send(string data) { 206 //trace("tracing..."); 207 dispatcher.send(SocketData(Flow.toSend, data)); 208 //trace("tracing..."); 209 } 210 211 void send(immutable(ubyte)[] data) { 212 //trace("tracing..."); 213 dispatcher.send(SocketBinaryData(Flow.toSend, data)); 214 //trace("tracing..."); 215 } 216 217 private { 218 Task dispatcher; 219 ReceiveMode* receiveMode; 220 221 int seqIdentifier; 222 static shared int sequence = 1; 223 } 224 }