1 
2 
3 module mars.protomars;
4 
5 import vibe.core.core;
6 import vibe.core.log;
7 
8 import mars.client;
9 import mars.server;
10 import mars.msg;
11 
12 import mars.protoauth;
13 import mars.protocallmethod;
14 import mars.protoinsertvaluerequest;
15 import mars.protodeleterecordrequest;
16 import mars.protoupd;
17 import mars.protosub;
18 
19 void protoMars(S)(MarsClient* client, ref S socket_)
20 {
21     // ... we are switching to binary msgpack ...
22     //socket_.switchToBinaryMode();
23     auto socket = MarsProxy!S(socket_, client.id);
24     
25     // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ...
26     while( ! client.socketWired ) vibe.core.core.yield; 
27 
28     // ... now the procol between client and server is fully operative, inform the server
29     assert(marsServer !is null);
30     marsServer.onMarsProtocolReady(client);
31 
32     while(true)
33     {
34         auto msgType = socket.receiveType();
35         if( msgType == MsgType.aborting) break;
36 
37         switch(msgType) with(MsgType)
38         {
39             case authenticationRequest:
40                 logInfo("mars - S<--%s - received an authenticationRequest", client.id);
41                 protoAuth(client, socket);
42                 break;
43 
44             case discardAuthenticationRequest:
45                 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id);
46                 protoDeauth(client, socket);
47                 break;
48 
49             //case syncOperationReply:
50             //    logInfo("mars - S<--%s - received a syncOperationReply", client.id);
51             //    break;
52 
53             case importValuesReply:
54                 logInfo("mars - S<--%s - received an importValuesReply", client.id);
55                 break;
56 
57             case insertValuesReply:
58                 logInfo("mars - S<--%s - received an insertValuesReply", client.id);
59                 break;
60 
61             case updateValuesReply:
62                 logInfo("mars - S<--%s - received an updateValuesReply", client.id);
63                 break;
64 
65             case callServerMethodRequest:
66                 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id);
67                 protoCallServerMathod(client, socket);
68                 break;
69 
70             case insertValuesRequest:
71                 logInfo("mars - S<--%s - received an insertValueRequest", client.id);
72                 protoInsertValueRequest(client, socket);
73                 break;
74 
75             case deleteRecordRequest:
76                 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id);
77                 protoDeleteRecordRequest(client, socket);
78                 break;
79 
80             case deleteRecordReply:
81                 logInfo("mars - S<--%s - received an deleteRecordReply", client.id);
82                 break;
83 
84             case optUpdateReq:
85                 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id);
86                 protoOptUpdate(client, socket);
87                 break;
88 
89             case pesUpdateReq:
90                 logInfo("mars - S<--%s - received an update originating from an pessimistic client update", client.id);
91                 protoPesUpdate(client, socket);
92                 break;
93             
94             case subscribeReq:
95                 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id);
96                 protoSubscribe(client, socket);
97                 break;
98 
99             case pingReq:
100                 logInfo("mars - S<--%s - received a ping keep alive request", client.id);
101                 break;
102 
103             default:
104                 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType);
105                 assert(false);
106         }
107     }
108 
109     // ... cleanup the client
110     //client.wireSocket( typeof(socket).init );
111 }
112 
113 struct MarsProxy(S)
114 {
115     import msgpack : pack, unpack;
116 
117     struct ReceivedMessage(M) {
118         bool wrongMessageReceived = false;
119         int messageId;
120         
121         M m; alias m this;
122     }
123 
124     this(ref S s, string ci){ this.socket = &s; this.clientId = ci; }
125 
126     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
127         ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 
128                                    ~ (cast(ubyte*)(&(rep.type)))[0 .. 4];
129         ubyte[] packed = rep.pack!true();
130         logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length);
131         try { socket.send(prefix ~ packed); }
132         catch(Exception e){
133              logInfo("mars - (a) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
134              throw e;
135         }
136     }
137 
138     void sendRequest(A)(int messageId, A req){
139         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
140                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
141         immutable(ubyte)[] packed = req.pack!true().idup;
142         logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length);
143         try { socket.send(prefix ~ packed); }
144         catch(Exception e){
145              logInfo("mars - (b) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
146              throw e;
147         }
148 
149     }
150 
151     ReceivedMessage!M receiveMsg(M)(){
152         auto msgType = receiveType();
153         if( msgType != M.type ) return ReceivedMessage!M(true);
154         auto rm = ReceivedMessage!M(false, messageId, binaryAs!M);
155         return rm;
156     }
157 
158     ReceivedMessage!M binaryAs(M)(){
159         auto msg = binary.unpack!(M, true);
160         return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
161     }
162 
163     MsgType receiveType(){
164         auto data = socket.receiveBinary();
165         if( data.length < 8 ){
166             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
167             return MsgType.aborting;
168         }
169         //logInfo("mars - S<--%s - message data:%s", clientId, data);
170         messageId = * cast(int*)(data[0 .. 4].ptr);
171         int msgType = * cast(int*)(data[4 .. 8].ptr);
172         //logInfo("mars - message id %d of type %d", messageId, msgType);
173         if( msgType < MsgType.min || msgType > MsgType.max ){
174             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
175             return MsgType.aborting;
176         }
177 
178         binary = data[8 .. $];
179         return cast(MsgType)msgType;
180     }
181         
182     private {
183         S* socket;
184         ubyte[] binary;
185         int messageId;
186         string clientId;
187     }
188 }
189 
190 struct MarsProxyStoC(S)
191 {
192     import msgpack : pack, unpack;
193 
194     struct ReceivedMessage(M) {
195         enum { success, wrongMessageReceived, channelDropped }
196         int status;
197         int messageId;
198         
199         M m; alias m this;
200     }
201 
202     this(ref S s, string ci){ this.socket = &s; this.clientId = ci; }
203 
204     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
205         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 
206                                    ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4];
207         immutable(ubyte)[] packed = rep.pack!true().idup;
208         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
209         try { socket.send(prefix ~ packed); }
210         catch(Exception e){
211              logInfo("mars - (c) catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
212              throw e;
213         }
214 
215     }
216 
217     /**
218     Returns: true/false on success. */
219     bool sendRequest(A)(int messageId, A req){
220         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
221                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
222         immutable(ubyte)[] packed = req.pack!true().idup;
223         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
224         try { socket.send( (prefix ~ packed).dup ); }
225         catch(Exception e){
226             // XXX libasync is raising a standard exception...
227             logInfo("mars - catched during socket.send! the exception message is '%s'! trying to handle it", e.msg);
228             if( e.msg == "The remote peer has closed the connection." || 
229                 e.msg == "WebSocket connection already actively closed." ||
230                 e.msg == "Remote hung up while writing to TCPConnection." || // vibe 0.7.31 vanilla
231                 e.msg == "Connection error while writing to TCPConnection.")
232             {
233                 return false;
234             }
235             logInfo("mars - catched during socket.send! the exception message is '%s'! now rethrowing!", e.msg);
236             throw e;
237         }
238         return true;
239     }
240 
241     ReceivedMessage!M receiveMsg(M)(){
242         auto msgType = receiveType();
243 
244         ReceivedMessage!M msg;
245         if(msgType == MsgTypeStoC.aborting) msg.status = msg.channelDropped;
246         else if( msgType != M.type ) msg.status = msg.wrongMessageReceived;
247         else {
248             msg.status = msg.success;
249             msg.messageId = messageId;
250             msg.m =  binaryAs!M;
251         }
252         return msg;
253     }
254 
255     ReceivedMessage!M binaryAs(M)(){
256         import std.experimental.logger;
257         static if( M.sizeof == 1 ){
258             return ReceivedMessage!M(false, messageId, M());
259         }
260         else {
261             auto msg = binary.unpack!(M, true);
262             return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
263         }
264     }
265 
266     MsgTypeStoC receiveType(){
267         import vibe.http.websockets : WebSocketException;
268 
269         ubyte[] data;
270         try {  
271             data = socket.receiveBinary(); 
272         }
273         catch(WebSocketException e){
274             logInfo("mars - S<--%s - connection closed while reading message", clientId);
275             return MsgTypeStoC.aborting; // XXX need a better message?
276         }
277         if( data.length < 8 ){
278             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
279             return MsgTypeStoC.aborting;
280         }
281         //logInfo("mars - S<--%s - message data:%s", clientId, data);
282         messageId = * cast(int*)(data[0 .. 4].ptr);
283         int msgType = * cast(int*)(data[4 .. 8].ptr);
284         logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType);
285         if( msgType < MsgType.min || msgType > MsgType.max ){
286             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
287             return MsgTypeStoC.aborting;
288         }
289 
290         binary = data[8 .. $];
291         return cast(MsgTypeStoC)msgType;
292     }
293     
294     private {
295         S* socket;
296         ubyte[] binary;
297         int messageId;
298         string clientId;
299     }
300 }