1 
2 
3 module mars.protomars;
4 
5 import vibe.core.core;
6 import vibe.core.log;
7 
8 import mars.client;
9 import mars.server;
10 import mars.msg;
11 
12 import mars.protoauth;
13 import mars.protocallmethod;
14 import mars.protoinsertvaluerequest;
15 import mars.protodeleterecordrequest;
16 import mars.protoupd;
17 import mars.protosub;
18 
19 void protoMars(S)(MarsClient* client, S socket_)
20 {
21     // ... we are switching to binary msgpack ...
22     //socket_.switchToBinaryMode();
23     auto socket = MarsProxy!S(socket_, client.id);
24     
25     // ... client must be wired to this socket, to be able to 'broadcast' or 'push' message to the browser ...
26     while( ! client.socketWired ) vibe.core.core.yield; 
27 
28     // ... now the procol between client and server is fully operative, inform the server
29     assert(marsServer !is null);
30     marsServer.onMarsProtocolReady(client);
31 
32     while(true)
33     {
34         auto msgType = socket.receiveType();
35         if( msgType == MsgType.aborting) break;
36 
37         switch(msgType) with(MsgType)
38         {
39             case authenticationRequest:
40                 logInfo("mars - S<--%s - received an authenticationRequest", client.id);
41                 protoAuth(client, socket);
42                 break;
43 
44             case discardAuthenticationRequest:
45                 logInfo("mars - S<--%s - received a discardAuthenticationRequest", client.id);
46                 protoDeauth(client, socket);
47                 break;
48 
49             //case syncOperationReply:
50             //    logInfo("mars - S<--%s - received a syncOperationReply", client.id);
51             //    break;
52 
53             case importValuesReply:
54                 logInfo("mars - S<--%s - received an importValuesReply", client.id);
55                 break;
56 
57             case insertValuesReply:
58                 logInfo("mars - S<--%s - received an insertValuesReply", client.id);
59                 break;
60 
61             case updateValuesReply:
62                 logInfo("mars - S<--%s - received an updateValuesReply", client.id);
63                 break;
64 
65             case callServerMethodRequest:
66                 logInfo("mars - S<--%s - received a callServerMethodRequest", client.id);
67                 protoCallServerMathod(client, socket);
68                 break;
69 
70             case insertValuesRequest:
71                 logInfo("mars - S<--%s - received an insertValueRequest", client.id);
72                 protoInsertValueRequest(client, socket);
73                 break;
74 
75             case deleteRecordRequest:
76                 logInfo("mars - S<--%s - received a deleteRecordRequest", client.id);
77                 protoDeleteRecordRequest(client, socket);
78                 break;
79 
80             case deleteRecordReply:
81                 logInfo("mars - S<--%s - received an deleteRecordReply", client.id);
82                 break;
83 
84             case optUpdateReq:
85                 logInfo("mars - S<--%s - received an update originating from an optimistic client update", client.id);
86                 protoOptUpdate(client, socket);
87                 break;
88 
89             case subscribeReq:
90                 logInfo("mars - S<--%s - id:%s - received a request for subscription", client.id, socket.messageId, client.id);
91                 protoSubscribe(client, socket);
92                 break;
93 
94             default:
95                 logInfo("mars - S<--%s - received a message of type %s, skipping!", client.id, msgType);
96                 assert(false);
97         }
98     }
99 
100     // ... cleanup the client
101     //client.wireSocket( typeof(socket).init );
102 }
103 
104 
105 struct MarsProxy(S)
106 {
107     import msgpack : pack, unpack;
108 
109     struct ReceivedMessage(M) {
110         bool wrongMessageReceived = false;
111         int messageId;
112         
113         M m; alias m this;
114     }
115 
116     this(S s, string ci){ this.socket = s; this.clientId = ci; }
117 
118     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
119         ubyte[8] prefix = (cast(ubyte*)(&(req.messageId)))[0 .. 4] 
120                                    ~ (cast(ubyte*)(&(rep.type)))[0 .. 4];
121         ubyte[] packed = rep.pack!true();
122         logInfo("mars - %s<--S - sending reply %s of type %s with a payload of %s bytes", clientId, req.messageId, rep.type, packed.length);
123         socket.send(prefix ~ packed);
124     }
125 
126     void sendRequest(A)(int messageId, A req){
127         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
128                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
129         immutable(ubyte)[] packed = req.pack!true().idup;
130         logInfo("mars - S-->%s - sending request %s of type %s with a payload of %s bytes", clientId, messageId, req.type, packed.length);
131         socket.send(prefix ~ packed);
132     }
133 
134     ReceivedMessage!M receiveMsg(M)(){
135         auto msgType = receiveType();
136         if( msgType != M.type ) return ReceivedMessage!M(true);
137         auto rm = ReceivedMessage!M(false, messageId, binaryAs!M);
138         return rm;
139     }
140 
141     ReceivedMessage!M binaryAs(M)(){
142         auto msg = binary.unpack!(M, true);
143         return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
144     }
145 
146     MsgType receiveType(){
147         auto data = socket.receiveBinary();
148         if( data.length < 8 ){
149             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
150             return MsgType.aborting;
151         }
152         //logInfo("mars - S<--%s - message data:%s", clientId, data);
153         messageId = * cast(int*)(data[0 .. 4].ptr);
154         int msgType = * cast(int*)(data[4 .. 8].ptr);
155         //logInfo("mars - message id %d of type %d", messageId, msgType);
156         if( msgType < MsgType.min || msgType > MsgType.max ){
157             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
158             return MsgType.aborting;
159         }
160 
161         binary = data[8 .. $];
162         return cast(MsgType)msgType;
163     }
164         
165     private {
166         S socket;
167         ubyte[] binary;
168         int messageId;
169         string clientId;
170     }
171 }
172 
173 struct MarsProxyStoC(S)
174 {
175     import msgpack : pack, unpack;
176 
177     struct ReceivedMessage(M) {
178         enum { success, wrongMessageReceived, channelDropped }
179         int status;
180         int messageId;
181         
182         M m; alias m this;
183     }
184 
185     this(S s, string ci){ this.socket = s; this.clientId = ci; }
186 
187     void sendReply(Q, A)(ReceivedMessage!Q req, A rep){
188         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&(req.messageId)))[0 .. 4] 
189                                    ~ (cast(immutable(ubyte)*)(&(rep.type)))[0 .. 4];
190         immutable(ubyte)[] packed = rep.pack!true().idup;
191         logInfo("mars - S-->%s - sending message %d of type %s with a payload of %d bytes", clientId, req.messageId, rep.type, packed.length);
192         socket.send(prefix ~ packed);
193     }
194 
195     /**
196     Returns: true/false on success. */
197     bool sendRequest(A)(int messageId, A req){
198         immutable(ubyte)[8] prefix = (cast(immutable(ubyte)*)(&messageId))[0 .. 4] 
199                                    ~ (cast(immutable(ubyte)*)(&(req.type)))[0 .. 4];
200         immutable(ubyte)[] packed = req.pack!true().idup;
201         logInfo("mars - S-->%s - sending message request %d of type %s with a payload of %d bytes", clientId, messageId, req.type, packed.length);
202         try { socket.send( (prefix ~ packed).dup ); }
203         catch(Exception e){
204             // XXX libasync is raising a standard exception...
205             if( e.msg == "The remote peer has closed the connection." || 
206                 e.msg == "WebSocket connection already actively closed.")
207             {
208                 return false;
209             }
210             throw e;
211         }
212         return true;
213     }
214 
215     ReceivedMessage!M receiveMsg(M)(){
216         auto msgType = receiveType();
217 
218         ReceivedMessage!M msg;
219         if(msgType == MsgType.aborting) msg.status = msg.channelDropped;
220         else if( msgType != M.type ) msg.status = msg.wrongMessageReceived;
221         else {
222             msg.status = msg.success;
223             msg.messageId = messageId;
224             msg.m =  binaryAs!M;
225         }
226         return msg;
227     }
228 
229     ReceivedMessage!M binaryAs(M)(){
230         import std.experimental.logger;
231         static if( M.sizeof == 1 ){
232             return ReceivedMessage!M(false, messageId, M());
233         }
234         else {
235             auto msg = binary.unpack!(M, true);
236             return ReceivedMessage!M(false, messageId, binary.unpack!(M, true));
237         }
238     }
239 
240     MsgType receiveType(){
241         import vibe.http.websockets : WebSocketException;
242 
243         ubyte[] data;
244         try {  
245             data = socket.receiveBinary(); 
246         }
247         catch(WebSocketException e){
248             logInfo("mars - S<--%s - connection closed while reading message", clientId);
249             return MsgType.aborting; // XXX need a better message?
250         }
251         if( data.length < 8 ){
252             logError("mars - S<--%s - received message as binary data from websocket, length:%d, expected at least 8; closing connection", clientId, data.length);
253             return MsgType.aborting;
254         }
255         //logInfo("mars - S<--%s - message data:%s", clientId, data);
256         messageId = * cast(int*)(data[0 .. 4].ptr);
257         int msgType = * cast(int*)(data[4 .. 8].ptr);
258         logInfo("mars - S<--%s - message id %d of type %d", clientId, messageId, msgType);
259         if( msgType < MsgType.min || msgType > MsgType.max ){
260             logError("mars - S<--%s - received message of type %d, unknown; closing connection.", clientId, msgType);
261             return MsgType.aborting;
262         }
263 
264         binary = data[8 .. $];
265         return cast(MsgType)msgType;
266     }
267     
268     private {
269         S socket;
270         ubyte[] binary;
271         int messageId;
272         string clientId;
273     }
274 }