1 
2 
3 module mars.protomars;
4 
5 import vibe.core.core;
6 import vibe.core.log;
7 
8 import mars.client;
9 import mars.server;
10 import mars.msg;
11 
12 import mars.protoauth;
13 import mars.protocallmethod;
14 import mars.protoinsertvaluerequest;
15 import mars.protodeleterecordrequest;
16 import mars.protoupd;
17 import mars.protosub;
18 
19 void protoMars(S)(MarsClient* client, ref S socket_)
20 {
21     // ... we are switching to binary msgpack ...
22     //socket_.switchToBinaryMode();
23     auto socket = MarsProxy!S(socket_, client.id);
24     
25     // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ...
26     while( ! client.socketWired ) vibe.core.core.yield; 
27 
28     // ... now the procol between client and server is fully operative, inform the server
29     assert(marsServer !is null);
30     marsServer.onMarsProtocolReady(client);
31 
32     while(true)
33     {
34         auto msgType = socket.receiveType();
35         if( msgType == MsgType.aborting) break;
36 
37         switch(msgType) with(MsgType)
38         {
39             case authenticationRequest:
40                 logInfo("mars - S<--%s - received an authenticationRequest", client.id);
41                 protoAuth(client, socket);
42                 break;
43 
44             case discardAuthenticationRequest:
45                 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id);
46                 protoDeauth(client, socket);
47                 break;
48 
49             //case syncOperationReply:
50             //    logInfo("mars - S<--%s - received a syncOperationReply", client.id);
51             //    break;
52 
53             case importValuesReply:
54                 logInfo("mars - S<--%s - received an importValuesReply", client.id);
55                 break;
56 
57             case insertValuesReply:
58                 logInfo("mars - S<--%s - received an insertValuesReply", client.id);
59                 break;
60 
61             case updateValuesReply:
62                 logInfo("mars - S<--%s - received an updateValuesReply", client.id);
63                 break;
64 
65             case callServerMethodRequest:
66                 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id);
67                 protoCallServerMathod(client, socket);
68                 break;
69 
70             case insertValuesRequest:
71                 logInfo("mars - S<--%s - received an insertValueRequest", client.id);
72                 protoInsertValueRequest(client, socket);
73                 break;
74 
75             case deleteRecordRequest:
76                 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id);
77                 protoDeleteRecordRequest(client, socket);
78                 break;
79 
80             case deleteRecordReply:
81                 logInfo("mars - S<--%s - received an deleteRecordReply", client.id);
82                 break;
83 
84             case optUpdateReq:
85                 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id);
86                 protoOptUpdate(client, socket);
87                 break;
88 
89             case subscribeReq:
90                 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id);
91                 protoSubscribe(client, socket);
92                 break;
93 
94             case pingReq:
95                 logInfo("mars - S<--%s - received a ping keep alive request", client.id);
96                 break;
97 
98             default:
99                 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType);
100                 assert(false);
101         }
102     }
103 
104     // ... cleanup the client
105     //client.wireSocket( typeof(socket).init );
106 }
107 
108 struct MarsProxy(S)
109 {
110     import msgpack : pack, unpack;
111 
112     struct ReceivedMessage(M) {
113         bool wrongMessageReceived = false;
114         int messageId;
115         
116         M m; alias m this;
117     }
118 
119     this(ref S s, string ci){ this.socket = &s; this.clientId = ci; }
120 
121     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
122         ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 
123                                    ~ (cast(ubyte*)(&(rep.type)))[0 .. 4];
124         ubyte[] packed = rep.pack!true();
125         logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length);
126         try { socket.send(prefix ~ packed); }
127         catch(Exception e){
128              logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
129              throw e;
130         }
131     }
132 
133     void sendRequest(A)(int messageId, A req){
134         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
135                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
136         immutable(ubyte)[] packed = req.pack!true().idup;
137         logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length);
138         try { socket.send(prefix ~ packed); }
139         catch(Exception e){
140              logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
141              throw e;
142         }
143 
144     }
145 
146     ReceivedMessage!M receiveMsg(M)(){
147         auto msgType = receiveType();
148         if( msgType != M.type ) return ReceivedMessage!M(true);
149         auto rm = ReceivedMessage!M(false, messageId, binaryAs!M);
150         return rm;
151     }
152 
153     ReceivedMessage!M binaryAs(M)(){
154         auto msg = binary.unpack!(M, true);
155         return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
156     }
157 
158     MsgType receiveType(){
159         auto data = socket.receiveBinary();
160         if( data.length < 8 ){
161             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
162             return MsgType.aborting;
163         }
164         //logInfo("mars - S<--%s - message data:%s", clientId, data);
165         messageId = * cast(int*)(data[0 .. 4].ptr);
166         int msgType = * cast(int*)(data[4 .. 8].ptr);
167         //logInfo("mars - message id %d of type %d", messageId, msgType);
168         if( msgType < MsgType.min || msgType > MsgType.max ){
169             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
170             return MsgType.aborting;
171         }
172 
173         binary = data[8 .. $];
174         return cast(MsgType)msgType;
175     }
176         
177     private {
178         S* socket;
179         ubyte[] binary;
180         int messageId;
181         string clientId;
182     }
183 }
184 
185 struct MarsProxyStoC(S)
186 {
187     import msgpack : pack, unpack;
188 
189     struct ReceivedMessage(M) {
190         enum { success, wrongMessageReceived, channelDropped }
191         int status;
192         int messageId;
193         
194         M m; alias m this;
195     }
196 
197     this(ref S s, string ci){ this.socket = &s; this.clientId = ci; }
198 
199     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
200         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 
201                                    ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4];
202         immutable(ubyte)[] packed = rep.pack!true().idup;
203         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
204         try { socket.send(prefix ~ packed); }
205         catch(Exception e){
206              logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
207              throw e;
208         }
209 
210     }
211 
212     /**
213     Returns: true/false on success. */
214     bool sendRequest(A)(int messageId, A req){
215         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
216                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
217         immutable(ubyte)[] packed = req.pack!true().idup;
218         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
219         try { socket.send( (prefix ~ packed).dup ); }
220         catch(Exception e){
221             // XXX libasync is raising a standard exception...
222             logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg);
223             if( e.msg == "The remote peer has closed the connection." || 
224                 e.msg == "WebSocket connection already actively closed." ||
225                 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla
226                 e.msg == "Connection error while writing to TCPConnection.")
227             {
228                 return false;
229             }
230             logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
231             throw e;
232         }
233         return true;
234     }
235 
236     ReceivedMessage!M receiveMsg(M)(){
237         auto msgType = receiveType();
238 
239         ReceivedMessage!M msg;
240         if(msgType == MsgTypeStoC.aborting) msg.status = msg.channelDropped;
241         else if( msgType != M.type ) msg.status = msg.wrongMessageReceived;
242         else {
243             msg.status = msg.success;
244             msg.messageId = messageId;
245             msg.m =  binaryAs!M;
246         }
247         return msg;
248     }
249 
250     ReceivedMessage!M binaryAs(M)(){
251         import std.experimental.logger;
252         static if( M.sizeof == 1 ){
253             return ReceivedMessage!M(false, messageId, M());
254         }
255         else {
256             auto msg = binary.unpack!(M, true);
257             return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
258         }
259     }
260 
261     MsgTypeStoC receiveType(){
262         import vibe.http.websockets : WebSocketException;
263 
264         ubyte[] data;
265         try {  
266             data = socket.receiveBinary(); 
267         }
268         catch(WebSocketException e){
269             logInfo("mars - S<--%s - connection closed while reading message", clientId);
270             return MsgTypeStoC.aborting; // XXX need a better message?
271         }
272         if( data.length < 8 ){
273             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
274             return MsgTypeStoC.aborting;
275         }
276         //logInfo("mars - S<--%s - message data:%s", clientId, data);
277         messageId = * cast(int*)(data[0 .. 4].ptr);
278         int msgType = * cast(int*)(data[4 .. 8].ptr);
279         logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType);
280         if( msgType < MsgType.min || msgType > MsgType.max ){
281             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
282             return MsgTypeStoC.aborting;
283         }
284 
285         binary = data[8 .. $];
286         return cast(MsgTypeStoC)msgType;
287     }
288     
289     private {
290         S* socket;
291         ubyte[] binary;
292         int messageId;
293         string clientId;
294     }
295 }