1 
2 
3 module mars.protomars;
4 
5 import vibe.core.core;
6 import vibe.core.log;
7 
8 import mars.client;
9 import mars.server;
10 import mars.msg;
11 
12 import mars.protoauth;
13 import mars.protocallmethod;
14 import mars.protoinsertvaluerequest;
15 import mars.protodeleterecordrequest;
16 
17 void protoMars(S)(MarsClient* client, S socket_)
18 {
19     // ... we are switching to binary msgpack ...
20     //socket_.switchToBinaryMode();
21     auto socket = MarsProxy!S(socket_, client.id);
22     
23     // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ...
24     while( ! client.socketWired ) vibe.core.core.yield; 
25 
26     // ... now the procol between client and server is fully operative, inform the server
27     assert(marsServer !is null);
28     marsServer.onMarsProtocolReady(client);
29 
30     while(true)
31     {
32         auto msgType = socket.receiveType();
33         if( msgType == MsgType.aborting) break;
34 
35         switch(msgType) with(MsgType)
36         {
37             case authenticationRequest:
38                 logInfo("mars - S<--%s - received an authenticationRequest", client.id);
39                 protoAuth(client, socket);
40                 break;
41 
42             case discardAuthenticationRequest:
43                 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id);
44                 protoDeauth(client, socket);
45                 break;
46 
47             //case syncOperationReply:
48             //    logInfo("mars - S<--%s - received a syncOperationReply", client.id);
49             //    break;
50 
51             case importValuesReply:
52                 logInfo("mars - S<--%s - received an importValuesReply", client.id);
53                 break;
54 
55             case insertValuesReply:
56                 logInfo("mars - S<--%s - received an insertValuesReply", client.id);
57                 break;
58 
59             case updateValuesReply:
60                 logInfo("mars - S<--%s - received an updateValuesReply", client.id);
61                 break;
62 
63             case callServerMethodRequest:
64                 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id);
65                 protoCallServerMathod(client, socket);
66                 break;
67 
68             case insertValuesRequest:
69                 logInfo("mars - S<--%s - received an insertValueRequest", client.id);
70                 protoInsertValueRequest(client, socket);
71                 break;
72 
73             case deleteRecordRequest:
74                 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id);
75                 protoDeleteRecordRequest(client, socket);
76                 break;
77 
78             case deleteRecordReply:
79                 logInfo("mars - S<--%s - received an deleteRecordReply", client.id);
80                 break;
81 
82             default:
83                 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType);
84                 assert(false);
85         }
86     }
87 
88     // ... cleanup the client
89     //client.wireSocket( typeof(socket).init );
90 }
91 
92 
93 struct MarsProxy(S)
94 {
95     import msgpack : pack, unpack;
96 
97     struct ReceivedMessage(M) {
98         bool wrongMessageReceived = false;
99         int messageId;
100         
101         M m; alias m this;
102     }
103 
104     this(S s, string ci){ this.socket = s; this.clientId = ci; }
105 
106     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
107         ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 
108                                    ~ (cast(ubyte*)(&(rep.type)))[0 .. 4];
109         ubyte[] packed = rep.pack!true();
110         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
111         socket.send(prefix ~ packed);
112     }
113 
114     void sendRequest(A)(int messageId, A req){
115         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
116                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
117         immutable(ubyte)[] packed = req.pack!true().idup;
118         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
119         socket.send(prefix ~ packed);
120     }
121 
122     ReceivedMessage!M receiveMsg(M)(){
123         auto msgType = receiveType();
124         if( msgType != M.type ) return ReceivedMessage!M(true);
125         auto rm = ReceivedMessage!M(false, messageId, binaryAs!M);
126         return rm;
127     }
128 
129     ReceivedMessage!M binaryAs(M)(){
130         auto msg = binary.unpack!(M, true);
131         return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
132     }
133 
134     MsgType receiveType(){
135         auto data = socket.receiveBinary();
136         if( data.length < 8 ){
137             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
138             return MsgType.aborting;
139         }
140         //logInfo("mars - S<--%s - message data:%s", clientId, data);
141         messageId = * cast(int*)(data[0 .. 4].ptr);
142         int msgType = * cast(int*)(data[4 .. 8].ptr);
143         //logInfo("mars - message id %d of type %d", messageId, msgType);
144         if( msgType < MsgType.min || msgType > MsgType.max ){
145             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
146             return MsgType.aborting;
147         }
148 
149         binary = data[8 .. $];
150         return cast(MsgType)msgType;
151     }
152         
153     private {
154         S socket;
155         ubyte[] binary;
156         int messageId;
157         string clientId;
158     }
159 }
160 
161 struct MarsProxyStoC(S)
162 {
163     import msgpack : pack, unpack;
164 
165     struct ReceivedMessage(M) {
166         enum { success, wrongMessageReceived, channelDropped }
167         int status;
168         int messageId;
169         
170         M m; alias m this;
171     }
172 
173     this(S s, string ci){ this.socket = s; this.clientId = ci; }
174 
175     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
176         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 
177                                    ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4];
178         immutable(ubyte)[] packed = rep.pack!true().idup;
179         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
180         socket.send(prefix ~ packed);
181     }
182 
183     /**
184     Returns: true/false on success. */
185     bool sendRequest(A)(int messageId, A req){
186         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
187                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
188         immutable(ubyte)[] packed = req.pack!true().idup;
189         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
190         try { socket.send( (prefix ~ packed).dup ); }
191         catch(Exception e){
192             // XXX libasync is raising a standard exception...
193             if( e.msg == "The remote peer has closed the connection." || 
194                 e.msg == "WebSocket connection already actively closed.")
195             {
196                 return false;
197             }
198             throw e;
199         }
200         return true;
201     }
202 
203     ReceivedMessage!M receiveMsg(M)(){
204         auto msgType = receiveType();
205 
206         ReceivedMessage!M msg;
207         if(msgType == MsgType.aborting) msg.status = msg.channelDropped;
208         else if( msgType != M.type ) msg.status = msg.wrongMessageReceived;
209         else {
210             msg.status = msg.success;
211             msg.messageId = messageId;
212             msg.m =  binaryAs!M;
213         }
214         return msg;
215     }
216 
217     ReceivedMessage!M binaryAs(M)(){
218         import std.experimental.logger;
219         static if( M.sizeof == 1 ){
220             return ReceivedMessage!M(false, messageId, M());
221         }
222         else {
223             auto msg = binary.unpack!(M, true);
224             return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
225         }
226     }
227 
228     MsgType receiveType(){
229         import vibe.http.websockets : WebSocketException;
230 
231         ubyte[] data;
232         try {  
233             data = socket.receiveBinary(); 
234         }
235         catch(WebSocketException e){
236             logInfo("mars - S<--%s - connection closed while reading message", clientId);
237             return MsgType.aborting; // XXX need a better message?
238         }
239         if( data.length < 8 ){
240             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
241             return MsgType.aborting;
242         }
243         //logInfo("mars - S<--%s - message data:%s", clientId, data);
244         messageId = * cast(int*)(data[0 .. 4].ptr);
245         int msgType = * cast(int*)(data[4 .. 8].ptr);
246         logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType);
247         if( msgType < MsgType.min || msgType > MsgType.max ){
248             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
249             return MsgType.aborting;
250         }
251 
252         binary = data[8 .. $];
253         return cast(MsgType)msgType;
254     }
255     
256     private {
257         S socket;
258         ubyte[] binary;
259         int messageId;
260         string clientId;
261     }
262 }