1 2 3 module mars.protomars; 4 5 import vibe.core.core; 6 import vibe.core.log; 7 8 import mars.client; 9 import mars.server; 10 import mars.msg; 11 12 import mars.protoauth; 13 import mars.protocallmethod; 14 import mars.protoinsertvaluerequest; 15 import mars.protodeleterecordrequest; 16 import mars.protoupd; 17 import mars.protosub; 18 19 void protoMars(S)(MarsClient* client, ref S socket_) 20 { 21 // ... we are switching to binary msgpack ... 22 //socket_.switchToBinaryMode(); 23 auto socket = MarsProxy!S(socket_, client.id); 24 25 // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ... 26 while( ! client.socketWired ) vibe.core.core.yield; 27 28 // ... now the procol between client and server is fully operative, inform the server 29 assert(marsServer !is null); 30 marsServer.onMarsProtocolReady(client); 31 32 while(true) 33 { 34 auto msgType = socket.receiveType(); 35 if( msgType == MsgType.aborting) break; 36 37 switch(msgType) with(MsgType) 38 { 39 case authenticationRequest: 40 logInfo("mars - S<--%s - received an authenticationRequest", client.id); 41 protoAuth(client, socket); 42 break; 43 44 case discardAuthenticationRequest: 45 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id); 46 protoDeauth(client, socket); 47 break; 48 49 //case syncOperationReply: 50 // logInfo("mars - S<--%s - received a syncOperationReply", client.id); 51 // break; 52 53 case importValuesReply: 54 logInfo("mars - S<--%s - received an importValuesReply", client.id); 55 break; 56 57 case insertValuesReply: 58 logInfo("mars - S<--%s - received an insertValuesReply", client.id); 59 break; 60 61 case updateValuesReply: 62 logInfo("mars - S<--%s - received an updateValuesReply", client.id); 63 break; 64 65 case callServerMethodRequest: 66 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id); 67 protoCallServerMathod(client, socket); 68 break; 69 70 case insertValuesRequest: 71 logInfo("mars - S<--%s - received an insertValueRequest", client.id); 72 protoInsertValueRequest(client, socket); 73 break; 74 75 case deleteRecordRequest: 76 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id); 77 protoDeleteRecordRequest(client, socket); 78 break; 79 80 case deleteRecordReply: 81 logInfo("mars - S<--%s - received an deleteRecordReply", client.id); 82 break; 83 84 case optUpdateReq: 85 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id); 86 protoOptUpdate(client, socket); 87 break; 88 89 case pesUpdateReq: 90 logInfo("mars - S<--%s - received an update originating from an pessimistic client update", client.id); 91 protoPesUpdate(client, socket); 92 break; 93 94 case subscribeReq: 95 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id); 96 protoSubscribe(client, socket); 97 break; 98 99 case pingReq: 100 logInfo("mars - S<--%s - received a ping keep alive request", client.id); 101 break; 102 103 default: 104 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType); 105 assert(false); 106 } 107 } 108 109 // ... cleanup the client 110 //client.wireSocket( typeof(socket).init ); 111 } 112 113 struct MarsProxy(S) 114 { 115 import msgpack : pack, unpack; 116 117 struct ReceivedMessage(M) { 118 bool wrongMessageReceived = false; 119 int messageId; 120 121 M m; alias m this; 122 } 123 124 this(ref S s, string ci){ this.socket = &s; this.clientId = ci; } 125 126 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 127 ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 128 ~ (cast(ubyte*)(&(rep.type)))[0 .. 4]; 129 ubyte[] packed = rep.pack!true(); 130 logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length); 131 try { socket.send(prefix ~ packed); } 132 catch(Exception e){ 133 logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 134 throw e; 135 } 136 } 137 138 void sendRequest(A)(int messageId, A req){ 139 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 140 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 141 immutable(ubyte)[] packed = req.pack!true().idup; 142 logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length); 143 try { socket.send(prefix ~ packed); } 144 catch(Exception e){ 145 logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 146 throw e; 147 } 148 149 } 150 151 ReceivedMessage!M receiveMsg(M)(){ 152 auto msgType = receiveType(); 153 if( msgType != M.type ) return ReceivedMessage!M(true); 154 auto rm = ReceivedMessage!M(false, messageId, binaryAs!M); 155 return rm; 156 } 157 158 ReceivedMessage!M binaryAs(M)(){ 159 auto msg = binary.unpack!(M, true); 160 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 161 } 162 163 MsgType receiveType(){ 164 auto data = socket.receiveBinary(); 165 if( data.length < 8 ){ 166 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 167 return MsgType.aborting; 168 } 169 //logInfo("mars - S<--%s - message data:%s", clientId, data); 170 messageId = * cast(int*)(data[0 .. 4].ptr); 171 int msgType = * cast(int*)(data[4 .. 8].ptr); 172 //logInfo("mars - message id %d of type %d", messageId, msgType); 173 if( msgType < MsgType.min || msgType > MsgType.max ){ 174 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 175 return MsgType.aborting; 176 } 177 178 binary = data[8 .. $]; 179 return cast(MsgType)msgType; 180 } 181 182 private { 183 S* socket; 184 ubyte[] binary; 185 int messageId; 186 string clientId; 187 } 188 } 189 190 struct MarsProxyStoC(S) 191 { 192 import msgpack : pack, unpack; 193 194 struct ReceivedMessage(M) { 195 enum { success, wrongMessageReceived, channelDropped } 196 int status; 197 int messageId; 198 199 M m; alias m this; 200 } 201 202 this(ref S s, string ci){ this.socket = &s; this.clientId = ci; } 203 204 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 205 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 206 ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4]; 207 immutable(ubyte)[] packed = rep.pack!true().idup; 208 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 209 try { socket.send(prefix ~ packed); } 210 catch(Exception e){ 211 logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 212 throw e; 213 } 214 215 } 216 217 /** 218 Returns: true/false on success. */ 219 bool sendRequest(A)(int messageId, A req){ 220 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 221 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 222 immutable(ubyte)[] packed = req.pack!true().idup; 223 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 224 try { socket.send( (prefix ~ packed).dup ); } 225 catch(Exception e){ 226 // XXX libasync is raising a standard exception... 227 logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg); 228 if( e.msg == "The remote peer has closed the connection." || 229 e.msg == "WebSocket connection already actively closed." || 230 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla 231 e.msg == "Connection error while writing to TCPConnection.") 232 { 233 return false; 234 } 235 logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 236 throw e; 237 } 238 return true; 239 } 240 241 ReceivedMessage!M receiveMsg(M)(){ 242 auto msgType = receiveType(); 243 244 ReceivedMessage!M msg; 245 if(msgType == MsgTypeStoC.aborting) msg.status = msg.channelDropped; 246 else if( msgType != M.type ) msg.status = msg.wrongMessageReceived; 247 else { 248 msg.status = msg.success; 249 msg.messageId = messageId; 250 msg.m = binaryAs!M; 251 } 252 return msg; 253 } 254 255 ReceivedMessage!M binaryAs(M)(){ 256 import std.experimental.logger; 257 static if( M.sizeof == 1 ){ 258 return ReceivedMessage!M(false, messageId, M()); 259 } 260 else { 261 auto msg = binary.unpack!(M, true); 262 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 263 } 264 } 265 266 MsgTypeStoC receiveType(){ 267 import vibe.http.websockets : WebSocketException; 268 269 ubyte[] data; 270 try { 271 data = socket.receiveBinary(); 272 } 273 catch(WebSocketException e){ 274 logInfo("mars - S<--%s - connection closed while reading message", clientId); 275 return MsgTypeStoC.aborting; // XXX need a better message? 276 } 277 if( data.length < 8 ){ 278 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 279 return MsgTypeStoC.aborting; 280 } 281 //logInfo("mars - S<--%s - message data:%s", clientId, data); 282 messageId = * cast(int*)(data[0 .. 4].ptr); 283 int msgType = * cast(int*)(data[4 .. 8].ptr); 284 logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType); 285 if( msgType < MsgType.min || msgType > MsgType.max ){ 286 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 287 return MsgTypeStoC.aborting; 288 } 289 290 binary = data[8 .. $]; 291 return cast(MsgTypeStoC)msgType; 292 } 293 294 private { 295 S* socket; 296 ubyte[] binary; 297 int messageId; 298 string clientId; 299 } 300 }