1 2 3 module mars.protomars; 4 5 import vibe.core.core; 6 import vibe.core.log; 7 8 import mars.client; 9 import mars.server; 10 import mars.msg; 11 12 import mars.protoauth; 13 import mars.protocallmethod; 14 import mars.protoinsertvaluerequest; 15 import mars.protodeleterecordrequest; 16 import mars.protoupd; 17 import mars.protosub; 18 19 void protoMars(S)(MarsClient* client, ref S socket_) 20 { 21 // ... we are switching to binary msgpack ... 22 //socket_.switchToBinaryMode(); 23 auto socket = MarsProxy!S(socket_, client.id); 24 25 // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ... 26 while( ! client.socketWired ) vibe.core.core.yield; 27 28 // ... now the procol between client and server is fully operative, inform the server 29 assert(marsServer !is null); 30 marsServer.onMarsProtocolReady(client); 31 32 while(true) 33 { 34 auto msgType = socket.receiveType(); 35 if( msgType == MsgType.aborting) break; 36 37 switch(msgType) with(MsgType) 38 { 39 case authenticationRequest: 40 logInfo("mars - S<--%s - received an authenticationRequest", client.id); 41 protoAuth(client, socket); 42 break; 43 44 case discardAuthenticationRequest: 45 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id); 46 protoDeauth(client, socket); 47 break; 48 49 //case syncOperationReply: 50 // logInfo("mars - S<--%s - received a syncOperationReply", client.id); 51 // break; 52 53 case importValuesReply: 54 logInfo("mars - S<--%s - received an importValuesReply", client.id); 55 break; 56 57 case insertValuesReply: 58 logInfo("mars - S<--%s - received an insertValuesReply", client.id); 59 break; 60 61 case updateValuesReply: 62 logInfo("mars - S<--%s - received an updateValuesReply", client.id); 63 break; 64 65 case callServerMethodRequest: 66 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id); 67 protoCallServerMathod(client, socket); 68 break; 69 70 case insertValuesRequest: 71 logInfo("mars - S<--%s - received an insertValueRequest", client.id); 72 protoInsertValueRequest(client, socket); 73 break; 74 75 case deleteRecordRequest: 76 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id); 77 protoDeleteRecordRequest(client, socket); 78 break; 79 80 case deleteRecordReply: 81 logInfo("mars - S<--%s - received an deleteRecordReply", client.id); 82 break; 83 84 case optUpdateReq: 85 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id); 86 protoOptUpdate(client, socket); 87 break; 88 89 case subscribeReq: 90 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id); 91 protoSubscribe(client, socket); 92 break; 93 94 case pingReq: 95 logInfo("mars - S<--%s - received a ping keep alive request", client.id); 96 break; 97 98 default: 99 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType); 100 assert(false); 101 } 102 } 103 104 // ... cleanup the client 105 //client.wireSocket( typeof(socket).init ); 106 } 107 108 struct MarsProxy(S) 109 { 110 import msgpack : pack, unpack; 111 112 struct ReceivedMessage(M) { 113 bool wrongMessageReceived = false; 114 int messageId; 115 116 M m; alias m this; 117 } 118 119 this(ref S s, string ci){ this.socket = &s; this.clientId = ci; } 120 121 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 122 ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 123 ~ (cast(ubyte*)(&(rep.type)))[0 .. 4]; 124 ubyte[] packed = rep.pack!true(); 125 logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length); 126 try { socket.send(prefix ~ packed); } 127 catch(Exception e){ 128 logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 129 throw e; 130 } 131 } 132 133 void sendRequest(A)(int messageId, A req){ 134 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 135 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 136 immutable(ubyte)[] packed = req.pack!true().idup; 137 logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length); 138 try { socket.send(prefix ~ packed); } 139 catch(Exception e){ 140 logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 141 throw e; 142 } 143 144 } 145 146 ReceivedMessage!M receiveMsg(M)(){ 147 auto msgType = receiveType(); 148 if( msgType != M.type ) return ReceivedMessage!M(true); 149 auto rm = ReceivedMessage!M(false, messageId, binaryAs!M); 150 return rm; 151 } 152 153 ReceivedMessage!M binaryAs(M)(){ 154 auto msg = binary.unpack!(M, true); 155 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 156 } 157 158 MsgType receiveType(){ 159 auto data = socket.receiveBinary(); 160 if( data.length < 8 ){ 161 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 162 return MsgType.aborting; 163 } 164 //logInfo("mars - S<--%s - message data:%s", clientId, data); 165 messageId = * cast(int*)(data[0 .. 4].ptr); 166 int msgType = * cast(int*)(data[4 .. 8].ptr); 167 //logInfo("mars - message id %d of type %d", messageId, msgType); 168 if( msgType < MsgType.min || msgType > MsgType.max ){ 169 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 170 return MsgType.aborting; 171 } 172 173 binary = data[8 .. $]; 174 return cast(MsgType)msgType; 175 } 176 177 private { 178 S* socket; 179 ubyte[] binary; 180 int messageId; 181 string clientId; 182 } 183 } 184 185 struct MarsProxyStoC(S) 186 { 187 import msgpack : pack, unpack; 188 189 struct ReceivedMessage(M) { 190 enum { success, wrongMessageReceived, channelDropped } 191 int status; 192 int messageId; 193 194 M m; alias m this; 195 } 196 197 this(ref S s, string ci){ this.socket = &s; this.clientId = ci; } 198 199 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 200 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 201 ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4]; 202 immutable(ubyte)[] packed = rep.pack!true().idup; 203 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 204 try { socket.send(prefix ~ packed); } 205 catch(Exception e){ 206 logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 207 throw e; 208 } 209 210 } 211 212 /** 213 Returns: true/false on success. */ 214 bool sendRequest(A)(int messageId, A req){ 215 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 216 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 217 immutable(ubyte)[] packed = req.pack!true().idup; 218 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 219 try { socket.send( (prefix ~ packed).dup ); } 220 catch(Exception e){ 221 // XXX libasync is raising a standard exception... 222 logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg); 223 if( e.msg == "The remote peer has closed the connection." || 224 e.msg == "WebSocket connection already actively closed." || 225 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla 226 e.msg == "Connection error while writing to TCPConnection.") 227 { 228 return false; 229 } 230 logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 231 throw e; 232 } 233 return true; 234 } 235 236 ReceivedMessage!M receiveMsg(M)(){ 237 auto msgType = receiveType(); 238 239 ReceivedMessage!M msg; 240 if(msgType == MsgTypeStoC.aborting) msg.status = msg.channelDropped; 241 else if( msgType != M.type ) msg.status = msg.wrongMessageReceived; 242 else { 243 msg.status = msg.success; 244 msg.messageId = messageId; 245 msg.m = binaryAs!M; 246 } 247 return msg; 248 } 249 250 ReceivedMessage!M binaryAs(M)(){ 251 import std.experimental.logger; 252 static if( M.sizeof == 1 ){ 253 return ReceivedMessage!M(false, messageId, M()); 254 } 255 else { 256 auto msg = binary.unpack!(M, true); 257 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 258 } 259 } 260 261 MsgTypeStoC receiveType(){ 262 import vibe.http.websockets : WebSocketException; 263 264 ubyte[] data; 265 try { 266 data = socket.receiveBinary(); 267 } 268 catch(WebSocketException e){ 269 logInfo("mars - S<--%s - connection closed while reading message", clientId); 270 return MsgTypeStoC.aborting; // XXX need a better message? 271 } 272 if( data.length < 8 ){ 273 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 274 return MsgTypeStoC.aborting; 275 } 276 //logInfo("mars - S<--%s - message data:%s", clientId, data); 277 messageId = * cast(int*)(data[0 .. 4].ptr); 278 int msgType = * cast(int*)(data[4 .. 8].ptr); 279 logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType); 280 if( msgType < MsgType.min || msgType > MsgType.max ){ 281 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 282 return MsgTypeStoC.aborting; 283 } 284 285 binary = data[8 .. $]; 286 return cast(MsgTypeStoC)msgType; 287 } 288 289 private { 290 S* socket; 291 ubyte[] binary; 292 int messageId; 293 string clientId; 294 } 295 }