1 
2 
3 module mars.protomars;
4 
5 import vibe.core.core;
6 import vibe.core.log;
7 
8 import mars.client;
9 import mars.server;
10 import mars.msg;
11 
12 import mars.protoauth;
13 import mars.protocallmethod;
14 import mars.protoinsertvaluerequest;
15 import mars.protodeleterecordrequest;
16 import mars.protoupd;
17 import mars.protosub;
18 
19 void protoMars(S)(MarsClient* client, S socket_)
20 {
21     // ... we are switching to binary msgpack ...
22     //socket_.switchToBinaryMode();
23     auto socket = MarsProxy!S(socket_, client.id);
24     
25     // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ...
26     while( ! client.socketWired ) vibe.core.core.yield; 
27 
28     // ... now the procol between client and server is fully operative, inform the server
29     assert(marsServer !is null);
30     marsServer.onMarsProtocolReady(client);
31 
32     while(true)
33     {
34         auto msgType = socket.receiveType();
35         if( msgType == MsgType.aborting) break;
36 
37         switch(msgType) with(MsgType)
38         {
39             case authenticationRequest:
40                 logInfo("mars - S<--%s - received an authenticationRequest", client.id);
41                 protoAuth(client, socket);
42                 break;
43 
44             case discardAuthenticationRequest:
45                 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id);
46                 protoDeauth(client, socket);
47                 break;
48 
49             //case syncOperationReply:
50             //    logInfo("mars - S<--%s - received a syncOperationReply", client.id);
51             //    break;
52 
53             case importValuesReply:
54                 logInfo("mars - S<--%s - received an importValuesReply", client.id);
55                 break;
56 
57             case insertValuesReply:
58                 logInfo("mars - S<--%s - received an insertValuesReply", client.id);
59                 break;
60 
61             case updateValuesReply:
62                 logInfo("mars - S<--%s - received an updateValuesReply", client.id);
63                 break;
64 
65             case callServerMethodRequest:
66                 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id);
67                 protoCallServerMathod(client, socket);
68                 break;
69 
70             case insertValuesRequest:
71                 logInfo("mars - S<--%s - received an insertValueRequest", client.id);
72                 protoInsertValueRequest(client, socket);
73                 break;
74 
75             case deleteRecordRequest:
76                 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id);
77                 protoDeleteRecordRequest(client, socket);
78                 break;
79 
80             case deleteRecordReply:
81                 logInfo("mars - S<--%s - received an deleteRecordReply", client.id);
82                 break;
83 
84             case optUpdateReq:
85                 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id);
86                 protoOptUpdate(client, socket);
87                 break;
88 
89             case subscribeReq:
90                 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id);
91                 protoSubscribe(client, socket);
92                 break;
93 
94             default:
95                 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType);
96                 assert(false);
97         }
98     }
99 
100     // ... cleanup the client
101     //client.wireSocket( typeof(socket).init );
102 }
103 
104 
105 struct MarsProxy(S)
106 {
107     import msgpack : pack, unpack;
108 
109     struct ReceivedMessage(M) {
110         bool wrongMessageReceived = false;
111         int messageId;
112         
113         M m; alias m this;
114     }
115 
116     this(S s, string ci){ this.socket = s; this.clientId = ci; }
117 
118     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
119         ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 
120                                    ~ (cast(ubyte*)(&(rep.type)))[0 .. 4];
121         ubyte[] packed = rep.pack!true();
122         logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length);
123         try { socket.send(prefix ~ packed); }
124         catch(Exception e){
125              logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
126              throw e;
127         }
128     }
129 
130     void sendRequest(A)(int messageId, A req){
131         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
132                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
133         immutable(ubyte)[] packed = req.pack!true().idup;
134         logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length);
135         try { socket.send(prefix ~ packed); }
136         catch(Exception e){
137              logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
138              throw e;
139         }
140 
141     }
142 
143     ReceivedMessage!M receiveMsg(M)(){
144         auto msgType = receiveType();
145         if( msgType != M.type ) return ReceivedMessage!M(true);
146         auto rm = ReceivedMessage!M(false, messageId, binaryAs!M);
147         return rm;
148     }
149 
150     ReceivedMessage!M binaryAs(M)(){
151         auto msg = binary.unpack!(M, true);
152         return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
153     }
154 
155     MsgType receiveType(){
156         auto data = socket.receiveBinary();
157         if( data.length < 8 ){
158             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
159             return MsgType.aborting;
160         }
161         //logInfo("mars - S<--%s - message data:%s", clientId, data);
162         messageId = * cast(int*)(data[0 .. 4].ptr);
163         int msgType = * cast(int*)(data[4 .. 8].ptr);
164         //logInfo("mars - message id %d of type %d", messageId, msgType);
165         if( msgType < MsgType.min || msgType > MsgType.max ){
166             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
167             return MsgType.aborting;
168         }
169 
170         binary = data[8 .. $];
171         return cast(MsgType)msgType;
172     }
173         
174     private {
175         S socket;
176         ubyte[] binary;
177         int messageId;
178         string clientId;
179     }
180 }
181 
182 struct MarsProxyStoC(S)
183 {
184     import msgpack : pack, unpack;
185 
186     struct ReceivedMessage(M) {
187         enum { success, wrongMessageReceived, channelDropped }
188         int status;
189         int messageId;
190         
191         M m; alias m this;
192     }
193 
194     this(S s, string ci){ this.socket = s; this.clientId = ci; }
195 
196     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
197         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 
198                                    ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4];
199         immutable(ubyte)[] packed = rep.pack!true().idup;
200         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
201         try { socket.send(prefix ~ packed); }
202         catch(Exception e){
203              logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
204              throw e;
205         }
206 
207     }
208 
209     /**
210     Returns: true/false on success. */
211     bool sendRequest(A)(int messageId, A req){
212         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
213                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
214         immutable(ubyte)[] packed = req.pack!true().idup;
215         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
216         try { socket.send( (prefix ~ packed).dup ); }
217         catch(Exception e){
218             // XXX libasync is raising a standard exception...
219             logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg);
220             if( e.msg == "The remote peer has closed the connection." || 
221                 e.msg == "WebSocket connection already actively closed." ||
222                 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla
223                 e.msg == "Connection error while writing to TCPConnection.")
224             {
225                 return false;
226             }
227             logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
228             throw e;
229         }
230         return true;
231     }
232 
233     ReceivedMessage!M receiveMsg(M)(){
234         auto msgType = receiveType();
235 
236         ReceivedMessage!M msg;
237         if(msgType == MsgType.aborting) msg.status = msg.channelDropped;
238         else if( msgType != M.type ) msg.status = msg.wrongMessageReceived;
239         else {
240             msg.status = msg.success;
241             msg.messageId = messageId;
242             msg.m =  binaryAs!M;
243         }
244         return msg;
245     }
246 
247     ReceivedMessage!M binaryAs(M)(){
248         import std.experimental.logger;
249         static if( M.sizeof == 1 ){
250             return ReceivedMessage!M(false, messageId, M());
251         }
252         else {
253             auto msg = binary.unpack!(M, true);
254             return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
255         }
256     }
257 
258     MsgType receiveType(){
259         import vibe.http.websockets : WebSocketException;
260 
261         ubyte[] data;
262         try {  
263             data = socket.receiveBinary(); 
264         }
265         catch(WebSocketException e){
266             logInfo("mars - S<--%s - connection closed while reading message", clientId);
267             return MsgType.aborting; // XXX need a better message?
268         }
269         if( data.length < 8 ){
270             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
271             return MsgType.aborting;
272         }
273         //logInfo("mars - S<--%s - message data:%s", clientId, data);
274         messageId = * cast(int*)(data[0 .. 4].ptr);
275         int msgType = * cast(int*)(data[4 .. 8].ptr);
276         logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType);
277         if( msgType < MsgType.min || msgType > MsgType.max ){
278             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
279             return MsgType.aborting;
280         }
281 
282         binary = data[8 .. $];
283         return cast(MsgType)msgType;
284     }
285     
286     private {
287         S socket;
288         ubyte[] binary;
289         int messageId;
290         string clientId;
291     }
292 }