1 2 3 /** 4 * Vibe gestire la ricezione di dati da websocket con una wait bloccante, cosa ottima per un protocollo req/rep, ma 5 * complicata se si vuol gestire in contemporanea anche un protocollo push lato server, o task paralleli. 6 * 7 * Questo modulo si occupa di isolare il websocket, inviando i dati ricevuti ad un task, e facendo da proxy per i 8 * send. 9 */ 10 11 module mars.websocket; 12 13 14 import vibe.core.concurrency; 15 import vibe.core.core; 16 import vibe.core.log; 17 import vibe.core.task; 18 import vibe.http.websockets; 19 20 enum Flow { received, toSend, connectionLost } 21 struct SocketData { Flow flow; string data; } 22 struct SocketBinaryData { Flow flow; immutable(ubyte)[] data; } 23 struct HandlerData { string data; } 24 struct HandlerBinaryData { immutable(ubyte)[] data; } 25 26 enum ReceiveMode { text, binary } 27 28 /** 29 * Entry point of the task that is handling the websocket connection with the client that has just joined us. */ 30 void handleWebSocketConnectionClientToService(scope WebSocket socket) 31 { 32 logInfo("mars - C ... S - a webclient has opened the 'Client to Service' channel - socket:%s", &socket); 33 scope(success) logInfo("mars - C ... S - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket); 34 scope(failure) logError("mars - C ... S - exiting the websocket handler task for a failure! the socket will be disposed socket:%s", &socket); 35 36 vibe.core.core.yield(); 37 38 // ... the HTTP request that established the web socket connection, let's extract the client address & session... 39 string clientAddress = socket.request.clientAddress.toString(); 40 string sessionId = socket.request.session.id; 41 42 // ... we can receive text and binary data, and we start with text ... 43 ReceiveMode receiveMode = ReceiveMode.text; 44 45 // Identify the client type and start processing it ... 46 import mars.protohelo : protoHelo; 47 //protoHelo(Proxy(dataDispatcherTask, &receiveMode)); 48 protoHelo(socket); 49 50 // ... we have terminated the client process, 51 52 } 53 54 /** 55 * Entry point of the task that is handling the connection that allow the service to push messages to the web client. 56 * 57 * First the client opens the connection that it uses to send messages to the service, THEN this one. 58 */ 59 void handleWebSocketConnectionServiceToClient(scope WebSocket socket) 60 { 61 import mars.server : marsServer; 62 63 logInfo("mars - S ... C - a webclient has opened the 'Service to Client' channel - socket:%s", &socket); 64 scope(success) logInfo("mars - S ... C - exiting the websocket handler task with success, the socket will be disposed - socket:%s", &socket); 65 scope(failure) logError("mars - S ... C - exiting the websocket handler task for a failure! the socket will be disposed - socket:%s", &socket); 66 67 import mars.server : marsServer; 68 69 string clientId = socket.receiveText(); 70 logInfo("mars - S ... C - received the client identifier:%s", clientId); 71 72 // expose this task to the marsClient, so that it can push request to the web client 73 assert(marsServer !is null); 74 auto client = marsServer.getClient(clientId); 75 if( client is null ){ 76 logError("mars - S ... C - can't find the mars client with id %s in the server registered clients", clientId); 77 return; 78 } 79 import mars.protomars : MarsProxyStoC; 80 client.wireSocket(MarsProxyStoC!WebSocket(socket, clientId)); 81 82 logInfo("mars - S ... C - waiting for termination"); 83 string terminate = receiveOnly!string(); 84 } 85 86 struct Proxy { 87 88 import std.experimental.logger : logInfo = log, trace; 89 alias logError = logInfo; 90 91 this(Task dispatcher, ReceiveMode* receiveMode) 92 { 93 import core.atomic : atomicOp; 94 95 this.dispatcher = dispatcher; 96 this.receiveMode = receiveMode; 97 this.seqIdentifier = sequence; 98 atomicOp!"+="(sequence, 1); 99 } 100 101 void switchToBinaryMode() { *receiveMode = ReceiveMode.binary; } 102 103 string receive() { 104 auto data = receiveOnly!HandlerData(); 105 return data.data; 106 } 107 108 immutable(ubyte)[] receiveBinary() { 109 auto data = receiveOnly!HandlerBinaryData(); 110 return data.data; 111 } 112 113 void send(string data) { 114 //trace("tracing..."); 115 dispatcher.send(SocketData(Flow.toSend, data)); 116 //trace("tracing..."); 117 } 118 119 void send(immutable(ubyte)[] data) { 120 //trace("tracing..."); 121 dispatcher.send(SocketBinaryData(Flow.toSend, data)); 122 //trace("tracing..."); 123 } 124 125 private { 126 Task dispatcher; 127 ReceiveMode* receiveMode; 128 129 int seqIdentifier; 130 static shared int sequence = 1; 131 } 132 }