1 2 3 module mars.protomars; 4 5 import vibe.core.core; 6 import vibe.core.log; 7 8 import mars.client; 9 import mars.server; 10 import mars.msg; 11 12 import mars.protoauth; 13 import mars.protocallmethod; 14 import mars.protoinsertvaluerequest; 15 import mars.protodeleterecordrequest; 16 17 void protoMars(S)(MarsClient* client, S socket_) 18 { 19 // ... we are switching to binary msgpack ... 20 //socket_.switchToBinaryMode(); 21 auto socket = MarsProxy!S(socket_, client.id); 22 23 // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ... 24 while( ! client.socketWired ) vibe.core.core.yield; 25 26 // ... now the procol between client and server is fully operative, inform the server 27 assert(marsServer !is null); 28 marsServer.onMarsProtocolReady(client); 29 30 while(true) 31 { 32 auto msgType = socket.receiveType(); 33 if( msgType == MsgType.aborting) break; 34 35 switch(msgType) with(MsgType) 36 { 37 case authenticationRequest: 38 logInfo("mars - S<--%s - received an authenticationRequest", client.id); 39 protoAuth(client, socket); 40 break; 41 42 case discardAuthenticationRequest: 43 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id); 44 protoDeauth(client, socket); 45 break; 46 47 //case syncOperationReply: 48 // logInfo("mars - S<--%s - received a syncOperationReply", client.id); 49 // break; 50 51 case importValuesReply: 52 logInfo("mars - S<--%s - received an importValuesReply", client.id); 53 break; 54 55 case insertValuesReply: 56 logInfo("mars - S<--%s - received an insertValuesReply", client.id); 57 break; 58 59 case updateValuesReply: 60 logInfo("mars - S<--%s - received an updateValuesReply", client.id); 61 break; 62 63 case callServerMethodRequest: 64 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id); 65 protoCallServerMathod(client, socket); 66 break; 67 68 case insertValuesRequest: 69 logInfo("mars - S<--%s - received an insertValueRequest", client.id); 70 protoInsertValueRequest(client, socket); 71 break; 72 73 case deleteRecordRequest: 74 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id); 75 protoDeleteRecordRequest(client, socket); 76 break; 77 78 case deleteRecordReply: 79 logInfo("mars - S<--%s - received an deleteRecordReply", client.id); 80 break; 81 82 default: 83 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType); 84 assert(false); 85 } 86 } 87 88 // ... cleanup the client 89 //client.wireSocket( typeof(socket).init ); 90 } 91 92 93 struct MarsProxy(S) 94 { 95 import msgpack : pack, unpack; 96 97 struct ReceivedMessage(M) { 98 bool wrongMessageReceived = false; 99 int messageId; 100 101 M m; alias m this; 102 } 103 104 this(S s, string ci){ this.socket = s; this.clientId = ci; } 105 106 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 107 ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 108 ~ (cast(ubyte*)(&(rep.type)))[0 .. 4]; 109 ubyte[] packed = rep.pack!true(); 110 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 111 socket.send(prefix ~ packed); 112 } 113 114 void sendRequest(A)(int messageId, A req){ 115 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 116 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 117 immutable(ubyte)[] packed = req.pack!true().idup; 118 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 119 socket.send(prefix ~ packed); 120 } 121 122 ReceivedMessage!M receiveMsg(M)(){ 123 auto msgType = receiveType(); 124 if( msgType != M.type ) return ReceivedMessage!M(true); 125 auto rm = ReceivedMessage!M(false, messageId, binaryAs!M); 126 return rm; 127 } 128 129 ReceivedMessage!M binaryAs(M)(){ 130 auto msg = binary.unpack!(M, true); 131 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 132 } 133 134 MsgType receiveType(){ 135 auto data = socket.receiveBinary(); 136 if( data.length < 8 ){ 137 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 138 return MsgType.aborting; 139 } 140 //logInfo("mars - S<--%s - message data:%s", clientId, data); 141 messageId = * cast(int*)(data[0 .. 4].ptr); 142 int msgType = * cast(int*)(data[4 .. 8].ptr); 143 //logInfo("mars - message id %d of type %d", messageId, msgType); 144 if( msgType < MsgType.min || msgType > MsgType.max ){ 145 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 146 return MsgType.aborting; 147 } 148 149 binary = data[8 .. $]; 150 return cast(MsgType)msgType; 151 } 152 153 private { 154 S socket; 155 ubyte[] binary; 156 int messageId; 157 string clientId; 158 } 159 } 160 161 struct MarsProxyStoC(S) 162 { 163 import msgpack : pack, unpack; 164 165 struct ReceivedMessage(M) { 166 enum { success, wrongMessageReceived, channelDropped } 167 int status; 168 int messageId; 169 170 M m; alias m this; 171 } 172 173 this(S s, string ci){ this.socket = s; this.clientId = ci; } 174 175 void sendReply(Q, A)(ReceivedMessage!Q req, A rep){ 176 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 177 ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4]; 178 immutable(ubyte)[] packed = rep.pack!true().idup; 179 logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length); 180 socket.send(prefix ~ packed); 181 } 182 183 /** 184 Returns: true/false on success. */ 185 bool sendRequest(A)(int messageId, A req){ 186 immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 187 ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4]; 188 immutable(ubyte)[] packed = req.pack!true().idup; 189 logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length); 190 try { socket.send( (prefix ~ packed).dup ); } 191 catch(Exception e){ 192 // XXX libasync is raising a standard exception... 193 if( e.msg == "The remote peer has closed the connection." || 194 e.msg == "WebSocket connection already actively closed.") 195 { 196 return false; 197 } 198 throw e; 199 } 200 return true; 201 } 202 203 ReceivedMessage!M receiveMsg(M)(){ 204 auto msgType = receiveType(); 205 206 ReceivedMessage!M msg; 207 if(msgType == MsgType.aborting) msg.status = msg.channelDropped; 208 else if( msgType != M.type ) msg.status = msg.wrongMessageReceived; 209 else { 210 msg.status = msg.success; 211 msg.messageId = messageId; 212 msg.m = binaryAs!M; 213 } 214 return msg; 215 } 216 217 ReceivedMessage!M binaryAs(M)(){ 218 import std.experimental.logger; 219 static if( M.sizeof == 1 ){ 220 return ReceivedMessage!M(false, messageId, M()); 221 } 222 else { 223 auto msg = binary.unpack!(M, true); 224 return ReceivedMessage!M(false, messageId, binary.unpack!(M, true)); 225 } 226 } 227 228 MsgType receiveType(){ 229 import vibe.http.websockets : WebSocketException; 230 231 ubyte[] data; 232 try { 233 data = socket.receiveBinary(); 234 } 235 catch(WebSocketException e){ 236 logInfo("mars - S<--%s - connection closed while reading message", clientId); 237 return MsgType.aborting; // XXX need a better message? 238 } 239 if( data.length < 8 ){ 240 logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length); 241 return MsgType.aborting; 242 } 243 //logInfo("mars - S<--%s - message data:%s", clientId, data); 244 messageId = * cast(int*)(data[0 .. 4].ptr); 245 int msgType = * cast(int*)(data[4 .. 8].ptr); 246 logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType); 247 if( msgType < MsgType.min || msgType > MsgType.max ){ 248 logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType); 249 return MsgType.aborting; 250 } 251 252 binary = data[8 .. $]; 253 return cast(MsgType)msgType; 254 } 255 256 private { 257 S socket; 258 ubyte[] binary; 259 int messageId; 260 string clientId; 261 } 262 }