1 2 3 module mars.protomars; 4 5 import vibe.core.core; 6 import vibe.core.log; 7 8 import mars.client; 9 import mars.server; 10 import mars.msg; 11 12 import mars.protoauth; 13 import mars.protocallmethod; 14 import mars.protoinsertvaluerequest; 15 import mars.protodeleterecordrequest; 16 import mars.protoupd; 17 import mars.protosub; 18 19 void protoMars(S)(MarsClient* client, S socket_) 20 { 21 // ... we are switching to binary msgpack ... 22 //socket_.switchToBinaryMode(); 23 auto socket = MarsProxy!S(socket_, client.id); 24 25 // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ... 26 while( ! client.socketWired ) vibe.core.core.yield; 27 28 // ... now the procol between client and server is fully operative, inform the server 29 assert(marsServer !is null); 30 marsServer.onMarsProtocolReady(client); 31 32 while(true) 33 { 34 auto msgType = socket.receiveType(); 35 if( msgType == MsgType.aborting) break; 36 37 switch(msgType) with(MsgType) 38 { 39 case authenticationRequest: 40 logInfo("mars - S<--%s - received an authenticationRequest", client.id); 41 protoAuth(client, socket); 42 break; 43 44 case discardAuthenticationRequest: 45 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id); 46 protoDeauth(client, socket); 47 break; 48 49 //case syncOperationReply: 50 // logInfo("mars - S<--%s - received a syncOperationReply", client.id); 51 // break; 52 53 case importValuesReply: 54 logInfo("mars - S<--%s - received an importValuesReply", client.id); 55 break; 56 57 case insertValuesReply: 58 logInfo("mars - S<--%s - received an insertValuesReply", client.id); 59 break; 60 61 case updateValuesReply: 62 logInfo("mars - S<--%s - received an updateValuesReply", client.id); 63 break; 64 65 case callServerMethodRequest: 66 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id); 67 protoCallServerMathod(client, socket); 68 break; 69 70 case insertValuesRequest: 71 logInfo("mars - S<--%s - received an insertValueRequest", client.id); 72 protoInsertValueRequest(client, socket); 73 break; 74 75 case deleteRecordRequest: 76 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id); 77 protoDeleteRecordRequest(client, socket); 78 break; 79 80 case deleteRecordReply: 81 logInfo("mars - S<--%s - received an deleteRecordReply", client.id); 82 break; 83 84 case optUpdateReq: 85 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id); 86 protoOptUpdate(client, socket); 87 break; 88 89 case subscribeReq: 90 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id); 91 protoSubscribe(client, socket); 92 break; 93 94 default: 95 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType); 96 assert(false); 97 } 98 } 99 100 // ... cleanup the client 101 //client.wireSocket( typeof(socket).init ); 102 } 103 104 105 struct MarsProxy(S) 106 { 107 import msgpack : pack, unpack; 108 109 struct ReceivedMessage(M) { 110 bool wrongMessageReceived = false; 111 int messageId; 112 113 M m; alias m this; 114 } 115 116 this(S s, string ci){ this.socket = s; this.clientId = ci; } 117 118 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 119 ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 120 ~ (cast(ubyte*)(&(rep.type)))[0 .. 4]; 121 ubyte[] packed = rep.pack!true(); 122 logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length); 123 try { socket.send(prefix ~ packed); } 124 catch(Exception e){ 125 logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 126 throw e; 127 } 128 } 129 130 void sendRequest(A)(int messageId, A req){ 131 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 132 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 133 immutable(ubyte)[] packed = req.pack!true().idup; 134 logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length); 135 try { socket.send(prefix ~ packed); } 136 catch(Exception e){ 137 logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 138 throw e; 139 } 140 141 } 142 143 ReceivedMessage!M receiveMsg(M)(){ 144 auto msgType = receiveType(); 145 if( msgType != M.type ) return ReceivedMessage!M(true); 146 auto rm = ReceivedMessage!M(false, messageId, binaryAs!M); 147 return rm; 148 } 149 150 ReceivedMessage!M binaryAs(M)(){ 151 auto msg = binary.unpack!(M, true); 152 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 153 } 154 155 MsgType receiveType(){ 156 auto data = socket.receiveBinary(); 157 if( data.length < 8 ){ 158 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 159 return MsgType.aborting; 160 } 161 //logInfo("mars - S<--%s - message data:%s", clientId, data); 162 messageId = * cast(int*)(data[0 .. 4].ptr); 163 int msgType = * cast(int*)(data[4 .. 8].ptr); 164 //logInfo("mars - message id %d of type %d", messageId, msgType); 165 if( msgType < MsgType.min || msgType > MsgType.max ){ 166 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 167 return MsgType.aborting; 168 } 169 170 binary = data[8 .. $]; 171 return cast(MsgType)msgType; 172 } 173 174 private { 175 S socket; 176 ubyte[] binary; 177 int messageId; 178 string clientId; 179 } 180 } 181 182 struct MarsProxyStoC(S) 183 { 184 import msgpack : pack, unpack; 185 186 struct ReceivedMessage(M) { 187 enum { success, wrongMessageReceived, channelDropped } 188 int status; 189 int messageId; 190 191 M m; alias m this; 192 } 193 194 this(S s, string ci){ this.socket = s; this.clientId = ci; } 195 196 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 197 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 198 ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4]; 199 immutable(ubyte)[] packed = rep.pack!true().idup; 200 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 201 try { socket.send(prefix ~ packed); } 202 catch(Exception e){ 203 logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 204 throw e; 205 } 206 207 } 208 209 /** 210 Returns: true/false on success. */ 211 bool sendRequest(A)(int messageId, A req){ 212 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 213 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 214 immutable(ubyte)[] packed = req.pack!true().idup; 215 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 216 try { socket.send( (prefix ~ packed).dup ); } 217 catch(Exception e){ 218 // XXX libasync is raising a standard exception... 219 logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg); 220 if( e.msg == "The remote peer has closed the connection." || 221 e.msg == "WebSocket connection already actively closed." || 222 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla 223 e.msg == "Connection error while writing to TCPConnection.") 224 { 225 return false; 226 } 227 logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg); 228 throw e; 229 } 230 return true; 231 } 232 233 ReceivedMessage!M receiveMsg(M)(){ 234 auto msgType = receiveType(); 235 236 ReceivedMessage!M msg; 237 if(msgType == MsgType.aborting) msg.status = msg.channelDropped; 238 else if( msgType != M.type ) msg.status = msg.wrongMessageReceived; 239 else { 240 msg.status = msg.success; 241 msg.messageId = messageId; 242 msg.m = binaryAs!M; 243 } 244 return msg; 245 } 246 247 ReceivedMessage!M binaryAs(M)(){ 248 import std.experimental.logger; 249 static if( M.sizeof == 1 ){ 250 return ReceivedMessage!M(false, messageId, M()); 251 } 252 else { 253 auto msg = binary.unpack!(M, true); 254 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 255 } 256 } 257 258 MsgType receiveType(){ 259 import vibe.http.websockets : WebSocketException; 260 261 ubyte[] data; 262 try { 263 data = socket.receiveBinary(); 264 } 265 catch(WebSocketException e){ 266 logInfo("mars - S<--%s - connection closed while reading message", clientId); 267 return MsgType.aborting; // XXX need a better message? 268 } 269 if( data.length < 8 ){ 270 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 271 return MsgType.aborting; 272 } 273 //logInfo("mars - S<--%s - message data:%s", clientId, data); 274 messageId = * cast(int*)(data[0 .. 4].ptr); 275 int msgType = * cast(int*)(data[4 .. 8].ptr); 276 logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType); 277 if( msgType < MsgType.min || msgType > MsgType.max ){ 278 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 279 return MsgType.aborting; 280 } 281 282 binary = data[8 .. $]; 283 return cast(MsgType)msgType; 284 } 285 286 private { 287 S socket; 288 ubyte[] binary; 289 int messageId; 290 string clientId; 291 } 292 }