1 2 3 /** 4 * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma 5 * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli. 6 * 7 * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 8 * send. 9 */ 10 11 module mars.websocket; 12 13 14 import vibe.core.concurrency; 15 import vibe.core.core; 16 import vibe.core.log; 17 import vibe.core.task; 18 import vibe.http.websockets; 19 20 enum Flow { received, toSend, connectionLost } 21 struct SocketData { Flow flow; string data; } 22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; } 23 struct HandlerData { string data; } 24 struct HandlerBinaryData { immutable(ubyte)[] data; } 25 26 enum ReceiveMode { text, binary } 27 28 /** 29 * Entry point of the task that is handling the websocket connection with the client that has just joined us. */ 30 void handleWebSocketConnectionClientToService(scope WebSocket socket) 31 { 32 logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &socket); 33 scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket); 34 scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &socket); 35 36 vibe.core.core.yield(); 37 38 // ... the HTTP request that established the web socket connection, let's extract the client address & session... 39 string clientAddress = socket.request.clientAddress.toString(); 40 string sessionId = socket.request.session.id; 41 42 // ... we can receive text and binary data, and we start with text ... 43 ReceiveMode receiveMode = ReceiveMode.text; 44 45 // Identify the client type and start processing it ... 46 import mars.protohelo : protoHelo; 47 //protoHelo(Proxy(dataDispatcherTask, &receiveMode)); 48 protoHelo(socket); 49 } 50 51 /** 52 * Entry point of the task that is handling the connection that allow the service to push messages to the web client. 53 * 54 * First the client opens the connection that it uses to send messages to the service, THEN this one. 55 */ 56 void handleWebSocketConnectionServiceToClient(scope WebSocket socket) 57 { 58 import mars.server : marsServer; 59 60 logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &socket); 61 scope(success) logInfo("mars - S ... C - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket); 62 scope(failure) logError("mars - S ... C - exiting the websocket handler task for a failure! the socket will be disposed - socket:%s", &socket); 63 64 import mars.server : marsServer; 65 66 string clientId = socket.receiveText(); 67 logInfo("mars - S ... C - received the client identifier:%s", clientId); 68 69 // expose this task to the marsClient, so that it can push request to the web client 70 assert(marsServer !is null); 71 auto client = marsServer.getClient(clientId); 72 if( client is null ){ 73 logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId); 74 return; 75 } 76 import mars.protomars : MarsProxyStoC; 77 client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId), Task.getThis); 78 79 logInfo("mars - S ... C - waiting for termination"); 80 string terminate = receiveOnly!string(); 81 logInfo("mars - S ... C - received the terminate signal:%s", terminate); 82 } 83 84 struct Proxy { 85 86 import std.experimental.logger : logInfo = log, trace; 87 alias logError = logInfo; 88 89 this(Task dispatcher, ReceiveMode* receiveMode) 90 { 91 import core.atomic : atomicOp; 92 93 this.dispatcher = dispatcher; 94 this.receiveMode = receiveMode; 95 this.seqIdentifier = sequence; 96 atomicOp!"+="(sequence, 1); 97 } 98 99 void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; } 100 101 string receive() { 102 auto data = receiveOnly!HandlerData(); 103 return data.data; 104 } 105 106 immutable(ubyte)[] receiveBinary() { 107 auto data = receiveOnly!HandlerBinaryData(); 108 return data.data; 109 } 110 111 void send(string data) { 112 //trace("tracing..."); 113 dispatcher.send(SocketData(Flow.toSend, data)); 114 //trace("tracing..."); 115 } 116 117 void send(immutable(ubyte)[] data) { 118 //trace("tracing..."); 119 dispatcher.send(SocketBinaryData(Flow.toSend, data)); 120 //trace("tracing..."); 121 } 122 123 private { 124 Task dispatcher; 125 ReceiveMode* receiveMode; 126 127 int seqIdentifier; 128 static shared int sequence = 1; 129 } 130 }