1 
2 module mars.client;
3 
4 import std.datetime,
5        std.format,
6        std.variant,
7        std.experimental.logger;
8 
9 import vibe.core.core;
10 import vibe.core.log;
11 import vibe.data.json;
12 import vibe.http.websockets;
13 
14 import mars.msg;
15 import mars.server;
16 import mars.websocket;
17 import mars.protomars : MarsProxyStoC; // XXX per instanziare la variabile per il socket... dovrei fare un interfaccia?
18                                    // nel momento in cui instanzio il client, non ho ancora il MarsProxy, che invece
19                                    // instanzio nel protoMars. Dovrei instanziare il client nel protoMars? Maybe yes
20                                    // visto che è li che, dopo aver stabilito che è un client mars, instanzio il socket.
21 
22 import mars.sync;
23 import mars.pgsql;
24 
25 struct MarsClient
26 {
27     void connected() in {
28         assert( connectionEvents.length ==0 || connectionEvents[$-1].type == ConnessionEvent.disconnected );
29     } body {
30         connectionEvents ~= ConnessionEvent(ConnessionEvent.connected, Clock.currTime);
31     }
32     void disconnected() in {
33         assert( connectionEvents[$-1].type == ConnessionEvent.connected, "clientId:%s, events:%s".format(id, connectionEvents) );
34     } body {
35         connectionEvents ~= ConnessionEvent(ConnessionEvent.disconnected, Clock.currTime);
36     }
37 
38     bool isConnected(){ return connectionEvents.length >0 && connectionEvents[$-1].type == ConnessionEvent.connected; }
39     
40     SysTime[] reconnections() {
41         import std.algorithm : filter, map;
42         import std.array : array;
43 
44         return connectionEvents
45             .filter!( (e) => e.type == ConnessionEvent.connected )
46             .map!"a.when"
47             .array;
48     }
49     
50     private {
51         struct ConnessionEvent {
52             enum { connected, disconnected }
53             int type;
54             SysTime when;
55         }
56         ConnessionEvent[] connectionEvents;
57     }
58 
59     this(string id, const DatabaseService databaseService){ 
60         this.id_ = id; 
61         this.databaseService = databaseService;
62     }
63 
64     /**
65      * Push a forward-only message to the client, a reply is not expected. */
66     void sendBroadcast(M)(M msg) in { assert(isConnected); } body 
67     {
68         socket.sendRequest(0, msg); // ... this is really a proxy to the websocket
69     }
70 
71     /**
72      * Push a new message, from the server to the client. Used by the server to inform clients about events. */
73     void sendRequest(M)(M msg) in { assert(isConnected); } body 
74     {
75         bool sent = socket.sendRequest(nextId++, msg);
76         if( ! sent ){
77             disconnected();
78         }
79     }
80     private int nextId = 1;
81 
82     auto receiveReply(M)() in { assert(isConnected); } body
83     {
84         auto msg = socket.receiveMsg!M();
85         if( msg.status == msg.channelDropped ) disconnected();
86         return msg;
87     }
88 
89     /**
90      * The Helo protocol will wire the active socket here, and will set this to null when disconnecting. */
91     void wireSocket(ref MarsProxyStoC!ResilientWebSocket socket, Task task)
92     {
93         this.socket = &socket;
94         this.stocTask = task;
95     }
96 
97     /**
98      * Returns true if the 'server to client' socket was opened and wired to us. */
99     bool socketWired() { return this.socket != this.socket.init; }
100 
101     /**
102      * Called by the authentication protocol.
103      * 
104      * Returns: false if PostgreSQL is offline or user in not authorised, or true. */
105     AuthoriseError authoriseUser(string username, string pgpassword) in {
106         assert(username && pgpassword);
107     } body {
108         this.username = username;
109 
110         AuthoriseError err;
111         if( databaseService.host == "" ){
112             logWarn("S --- C | the database host is not specified, we are operating in offline mode");
113             err = AuthoriseError.authorised;
114         }
115         else {
116             db = databaseService.connect(username, pgpassword, err );
117             if(err != AuthoriseError.authorised) this.username = "";
118         }
119         return err;
120     }
121     void discardAuthorisation() { 
122         logWarn("S --- C | discarding previous authorisation");
123         this.username = ""; 
124     }
125 
126     bool authorised() { return this.username != ""; }
127     string id() { return id_; }
128 
129     bool pingWebClient(){
130         return socket.sendRequest(0, PingReq()); 
131     }
132 
133     string callServerMethod(string method, Json parameters){
134         if( serverSideMethods !is null ){
135             return serverSideMethods(this, method, parameters);
136         }
137         assert(false); // catch on server side;
138     }
139 
140     immutable(ubyte)[][2] vueInsertRecord(int statementIndex, immutable(ubyte)[] record, ref InsertError err){
141         immutable(ubyte)[][2] inserted = marsServer.tables[statementIndex].insertRecord(db, record, err, username, id);
142         return inserted;
143     }
144 
145     immutable(ubyte)[] vueDeleteRecord(int tableIndex, immutable(ubyte)[] record, ref DeleteError err){
146         immutable(ubyte)[] deleted = marsServer.tables[tableIndex].deleteRecord(db, record, err, username, id);
147         return deleted;
148     }
149 
150     void vueUpdateRecord(ulong tableIndex, immutable(ubyte)[] keys, immutable(ubyte)[] record, ref RequestState state){
151         marsServer.tables[tableIndex].updateRecord(db, keys, record, state, id);
152     }
153 
154     auto vueSubscribe(string select, Variant[string] parameters, ref RequestState state){
155         import std.typecons : WhiteHole;
156         import mars.defs : Table;
157         // ... sanity check: the client has requested a subscription, but has not completed the login.. (for bugs, for example)
158         if( ! authorised ){
159             logWarn("mars - rejecting a non authorised subscribe from client %s, client bug?",id);
160             state = RequestState.rejectedAsNotAuthorised;
161             return Json.emptyObject;
162         }
163         auto table = new WhiteHole!(BaseServerSideTable!MarsClient)(Table());
164         auto json = table.selectAsJson(db, select, parameters, state);
165         return json;
166     }
167 
168     private {
169         string id_;
170 
171         string username = "";
172         //string password;
173         string seed;
174 
175         MarsProxyStoC!(ResilientWebSocket)* socket;
176         public Task stocTask;
177         
178         public typeof(MarsServer.serverSideMethods) serverSideMethods;
179         DatabaseService databaseService;
180         public Database db;
181     }
182 }