1 2 3 module mars.protomars; 4 5 import vibe.core.core; 6 import vibe.core.log; 7 8 import mars.client; 9 import mars.server; 10 import mars.msg; 11 12 import mars.protoauth; 13 import mars.protocallmethod; 14 import mars.protoinsertvaluerequest; 15 import mars.protodeleterecordrequest; 16 import mars.protoupd; 17 import mars.protosub; 18 19 void protoMars(S)(MarsClient* client, S socket_) 20 { 21 // ... we are switching to binary msgpack ... 22 //socket_.switchToBinaryMode(); 23 auto socket = MarsProxy!S(socket_, client.id); 24 25 // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ... 26 while( ! client.socketWired ) vibe.core.core.yield; 27 28 // ... now the procol between client and server is fully operative, inform the server 29 assert(marsServer !is null); 30 marsServer.onMarsProtocolReady(client); 31 32 while(true) 33 { 34 auto msgType = socket.receiveType(); 35 if( msgType == MsgType.aborting) break; 36 37 switch(msgType) with(MsgType) 38 { 39 case authenticationRequest: 40 logInfo("mars - S<--%s - received an authenticationRequest", client.id); 41 protoAuth(client, socket); 42 break; 43 44 case discardAuthenticationRequest: 45 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id); 46 protoDeauth(client, socket); 47 break; 48 49 //case syncOperationReply: 50 // logInfo("mars - S<--%s - received a syncOperationReply", client.id); 51 // break; 52 53 case importValuesReply: 54 logInfo("mars - S<--%s - received an importValuesReply", client.id); 55 break; 56 57 case insertValuesReply: 58 logInfo("mars - S<--%s - received an insertValuesReply", client.id); 59 break; 60 61 case updateValuesReply: 62 logInfo("mars - S<--%s - received an updateValuesReply", client.id); 63 break; 64 65 case callServerMethodRequest: 66 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id); 67 protoCallServerMathod(client, socket); 68 break; 69 70 case insertValuesRequest: 71 logInfo("mars - S<--%s - received an insertValueRequest", client.id); 72 protoInsertValueRequest(client, socket); 73 break; 74 75 case deleteRecordRequest: 76 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id); 77 protoDeleteRecordRequest(client, socket); 78 break; 79 80 case deleteRecordReply: 81 logInfo("mars - S<--%s - received an deleteRecordReply", client.id); 82 break; 83 84 case optUpdateReq: 85 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id); 86 protoOptUpdate(client, socket); 87 break; 88 89 case subscribeReq: 90 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id); 91 protoSubscribe(client, socket); 92 break; 93 94 default: 95 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType); 96 assert(false); 97 } 98 } 99 100 // ... cleanup the client 101 //client.wireSocket( typeof(socket).init ); 102 } 103 104 105 struct MarsProxy(S) 106 { 107 import msgpack : pack, unpack; 108 109 struct ReceivedMessage(M) { 110 bool wrongMessageReceived = false; 111 int messageId; 112 113 M m; alias m this; 114 } 115 116 this(S s, string ci){ this.socket = s; this.clientId = ci; } 117 118 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 119 ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 120 ~ (cast(ubyte*)(&(rep.type)))[0 .. 4]; 121 ubyte[] packed = rep.pack!true(); 122 logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length); 123 socket.send(prefix ~ packed); 124 } 125 126 void sendRequest(A)(int messageId, A req){ 127 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 128 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 129 immutable(ubyte)[] packed = req.pack!true().idup; 130 logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length); 131 socket.send(prefix ~ packed); 132 } 133 134 ReceivedMessage!M receiveMsg(M)(){ 135 auto msgType = receiveType(); 136 if( msgType != M.type ) return ReceivedMessage!M(true); 137 auto rm = ReceivedMessage!M(false, messageId, binaryAs!M); 138 return rm; 139 } 140 141 ReceivedMessage!M binaryAs(M)(){ 142 auto msg = binary.unpack!(M, true); 143 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 144 } 145 146 MsgType receiveType(){ 147 auto data = socket.receiveBinary(); 148 if( data.length < 8 ){ 149 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 150 return MsgType.aborting; 151 } 152 //logInfo("mars - S<--%s - message data:%s", clientId, data); 153 messageId = * cast(int*)(data[0 .. 4].ptr); 154 int msgType = * cast(int*)(data[4 .. 8].ptr); 155 //logInfo("mars - message id %d of type %d", messageId, msgType); 156 if( msgType < MsgType.min || msgType > MsgType.max ){ 157 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 158 return MsgType.aborting; 159 } 160 161 binary = data[8 .. $]; 162 return cast(MsgType)msgType; 163 } 164 165 private { 166 S socket; 167 ubyte[] binary; 168 int messageId; 169 string clientId; 170 } 171 } 172 173 struct MarsProxyStoC(S) 174 { 175 import msgpack : pack, unpack; 176 177 struct ReceivedMessage(M) { 178 enum { success, wrongMessageReceived, channelDropped } 179 int status; 180 int messageId; 181 182 M m; alias m this; 183 } 184 185 this(S s, string ci){ this.socket = s; this.clientId = ci; } 186 187 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 188 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 189 ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4]; 190 immutable(ubyte)[] packed = rep.pack!true().idup; 191 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 192 socket.send(prefix ~ packed); 193 } 194 195 /** 196 Returns: true/false on success. */ 197 bool sendRequest(A)(int messageId, A req){ 198 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 199 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 200 immutable(ubyte)[] packed = req.pack!true().idup; 201 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 202 try { socket.send( (prefix ~ packed).dup ); } 203 catch(Exception e){ 204 // XXX libasync is raising a standard exception... 205 if( e.msg == "The remote peer has closed the connection." || 206 e.msg == "WebSocket connection already actively closed.") 207 { 208 return false; 209 } 210 throw e; 211 } 212 return true; 213 } 214 215 ReceivedMessage!M receiveMsg(M)(){ 216 auto msgType = receiveType(); 217 218 ReceivedMessage!M msg; 219 if(msgType == MsgType.aborting) msg.status = msg.channelDropped; 220 else if( msgType != M.type ) msg.status = msg.wrongMessageReceived; 221 else { 222 msg.status = msg.success; 223 msg.messageId = messageId; 224 msg.m = binaryAs!M; 225 } 226 return msg; 227 } 228 229 ReceivedMessage!M binaryAs(M)(){ 230 import std.experimental.logger; 231 static if( M.sizeof == 1 ){ 232 return ReceivedMessage!M(false, messageId, M()); 233 } 234 else { 235 auto msg = binary.unpack!(M, true); 236 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 237 } 238 } 239 240 MsgType receiveType(){ 241 import vibe.http.websockets : WebSocketException; 242 243 ubyte[] data; 244 try { 245 data = socket.receiveBinary(); 246 } 247 catch(WebSocketException e){ 248 logInfo("mars - S<--%s - connection closed while reading message", clientId); 249 return MsgType.aborting; // XXX need a better message? 250 } 251 if( data.length < 8 ){ 252 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 253 return MsgType.aborting; 254 } 255 //logInfo("mars - S<--%s - message data:%s", clientId, data); 256 messageId = * cast(int*)(data[0 .. 4].ptr); 257 int msgType = * cast(int*)(data[4 .. 8].ptr); 258 logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType); 259 if( msgType < MsgType.min || msgType > MsgType.max ){ 260 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 261 return MsgType.aborting; 262 } 263 264 binary = data[8 .. $]; 265 return cast(MsgType)msgType; 266 } 267 268 private { 269 S socket; 270 ubyte[] binary; 271 int messageId; 272 string clientId; 273 } 274 }