1 /**
2  * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db
3  * like the `pgsql`.
4  */
5 
6 module mars.server;
7 
8 import std.algorithm.iteration, std.array, std.conv, std.format;
9       
10        
11 
12 import std.experimental.logger;
13 
14 import mars.client;
15 import mars.defs;
16 
17 import mars.sync;
18 import mars.pgsql;
19 import mars.msg;
20 version(unittest) import mars.starwars;
21 
22 import vibe.core.log;
23 import vibe.data.json;
24 import vibe.core.task;
25 
26 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) {
27     static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 
28     InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]);
29     static if( tables.length > 1 ){
30         InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]);
31     }
32 }
33 unittest {
34     MarsServer marsServer_ = new MarsServer(MarsServerConfiguration());
35     enum ctTables = starwarsSchema.tables[0 .. 2];
36     InstantiateTables!(ctTables)(marsServer_, [], []);
37 }
38 
39 private void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){
40     auto serverSideTable = new ServerSideTable!(MarsClient*, table)();
41     m.tables ~= serverSideTable;
42     
43     static if( ! is(Fixtures == void[]) ){  // ... no fixtures, empty array of void ...
44         foreach(fixture; fixtures){
45             serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand));
46         }
47     }
48 }
49 
50 
51 auto serverSideTable(immutable(Table) table)(MarsServer m){
52     foreach( t; m.tables ){
53         if( t.definition.name == table.name ){
54             auto c = cast(ServerSideTable!(MarsClient*, table))(t);
55             return c;
56         }
57     }
58     assert(false);
59 }
60 
61 class MarsServer
62 {
63     /// Register the client between the connected clients
64     MarsClient* engageClient(string clientId)
65     {
66         auto client = clientId in marsClients;
67         if( client is null ){
68             logInfo("mars S - %s - this is a new client.", clientId);
69             marsClients[clientId] = MarsClient(clientId, configuration.databaseService);
70             client = clientId in marsClients;
71         }
72         else {
73             // ... the client was already engaged, for safety, wipe the client side tables ...
74             logInfo("mars S - %s - this is a reconnection, wiping out the client side tables.", clientId);
75             foreach(ref table; tables){
76                 table.wipeClientSideTable(client.id);
77             }
78 
79         }
80         assert( client !is null );
81         client.connected();
82         return client;
83     }
84 
85     void disposeClient(MarsClient* client) in { 
86         assert(client !is null); assert(client.id in marsClients); 
87     } body {
88         client.disconnected();
89     }
90 
91     /// Create the server side tables for the client, preparing the initial sync op.
92     /// Callee: protoauth.
93     void createClientSideTablesFor(MarsClient* client){
94         // ... the tables that are exposed in the schema ...
95         foreach(ref table; tables){
96             table.createClientSideTable(client.id);
97         }
98         //logInfo("mars - created %d client side tables", client.tables.length);
99         // XXX questo è da ripensare con un meccanismo generico
100         client.serverSideMethods = this.serverSideMethods;
101     }
102 
103     void wipeClientSideTablesFor(MarsClient* client) {
104         foreach(ref table; tables){
105             table.wipeClientSideTable(client.id);
106         }
107     }
108 
109     // The mars protocol has completed the handshake and setup, request can be sent and received.
110     /// Called by 'protomars'  module.
111     void onMarsProtocolReady(MarsClient* client){
112 
113         //... connect to the DB if autologin is enabled server side
114         if( configuration.pgsqlUser != "" ){
115             auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword);
116             if( dbAuthorised != AuthoriseError.authorised ) 
117                 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL");
118             auto request = AutologinReq(); with(request){
119                 username = configuration.pgsqlUser;
120                 sqlCreateDatabase = configuration.alasqlCreateDatabase;
121                 sqlStatements = configuration.alasqlStatements;
122                 jsStatements = configuration.jsStatements;
123             }
124             logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser);
125             client.sendRequest(request);
126             createClientSideTablesFor(client);
127         }
128         startDatabaseHandler();
129     }
130 
131 
132     void broadcast(M)(M message)
133     {
134         import std.experimental.logger : trace;
135 
136         //trace("tracing...");
137         foreach(clientId, marsClient; marsClients)
138         {
139         
140         //trace("tracing...");
141         if( marsClient.isConnected() ) marsClient.sendBroadcast(message);
142         //trace("tracing...");
143         }
144     }
145 
146     /**
147     Returns: a pointer to the mars client with that id, or null if it does not exists. */
148     MarsClient* getClient(string clientId){ return clientId in marsClients; }
149 
150     string delegate(MarsClient, string, Json) serverSideMethods;
151 
152     // ================
153     this(immutable(MarsServerConfiguration) c){
154         assert(marsServer is null);
155         marsServer = this;
156         configuration = c;
157     }
158 
159 
160     static MarsServerConfiguration ExposeSchema(immutable(Schema) schema)
161     {
162         import mars.alasql : createDatabase, selectFrom, selectFromWhere, insertIntoParameter, updateParameter,
163                deleteFromParameter, updateDecorationsParameter, updateDecoratedRecord, 
164                pkValuesJs, pkValuesWhereJs, referenceJs;
165         
166         immutable(string)[] statements;
167         immutable(string)[] jsStatements;
168   
169         jsStatements ~= jsIndexStatementFor; // returns index for operation O on table T
170         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.decorateRows.to!string).join(", ").array);
171         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.cacheRows.to!string).join(", ").array);
172         jsStatements ~= `{ %s }`.format(schema.tables.map!((t) => `"%s": { "index": %d }`.format(t.name, t.index) ).join(", ").array);
173         foreach(table; schema.tables){
174             statements ~= table.insertIntoParameter;           // 'insert'
175             statements ~= table.updateParameter;               // 'update'
176             statements ~= table.deleteFromParameter;           // 'delete'
177             statements ~= table.updateDecorationsParameter;    // 'updateDecorations'
178             statements ~= table.selectFrom;
179             statements ~= table.selectFromWhere;               // 'selectFromWhere'
180             statements ~= table.updateDecoratedRecord;         // 'updateDecoratedRecord'
181             // update the 'indexStatementFor' below in this module...
182 
183             // ... update in server.ts under the 'prepareJsStatements' method
184             jsStatements ~= table.pkValuesJs;
185             jsStatements ~= table.pkValuesWhereJs;
186             jsStatements ~= referenceJs(table, schema);
187         }
188         return MarsServerConfiguration( schema, createDatabase(schema), statements, jsStatements );
189     }
190 
191     MarsServerConfiguration configuration;
192 
193     private void startDatabaseHandler(){
194 
195         import vibe.core.core : runTask;
196         import vibe.core.log : logInfo;
197         
198         logInfo("mars - database handler starting.");
199         
200         //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); }
201         if( databaseHandler == Task.init ) databaseHandler = runTask(&handleDatabase);
202     }
203     Task databaseHandler;
204 
205 
206     /**
207     gestisci le cose se succedono a livello db, come push per i client mars */
208     void handleDatabase()
209     {
210         import std.algorithm : sort;
211         import std.datetime : seconds;
212         import vibe.core.core : sleep, exitEventLoop;
213         import vibe.core.log : logInfo;
214 
215         int loopCount = 0;
216         scope(failure) exitEventLoop();
217         while(true) {
218             loopCount ++;
219             sleep(2.seconds);
220             //logInfo("mars - database handler starting to check for sync...%s", Task.getThis());
221 
222             clientLoop: foreach(ref client; marsClients ){
223                if( client.isConnected && client.authorised ){
224                    if( loopCount % 5 == 0 ){
225                        bool sent = client.pingWebClient();
226                        if( ! sent ){
227                            logInfo("mars - the client %s seems disconnected after a ping request, continuing with another client", client.id);
228                            continue clientLoop;
229                        }
230                    }
231                    bool syncStarted = false;
232                    logInfo("mars - database operations for client %s", client.id);
233                    auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting);
234                    foreach( table; tables ){
235                         if(client.id in table.clientSideTables)
236                         {
237                             auto clientTable = table.clientSideTables[client.id];
238                             //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name);
239                             foreach(op; clientTable.ops){
240                                 if( ! syncStarted ){
241                                     syncStarted = true; 
242                                     client.sendRequest(req);
243                                     logInfo("mars - database operations for client %s sync started", client.id);
244                                     if( ! client.isConnected ){
245                                         logInfo("mars - the client %s seems disconnected, continuing with another client", client.id);
246                                         continue clientLoop;
247                                     }
248                                 }
249                                 logInfo("mars - executing database operation for client %s", client.id);
250                                 op.execute(client.db, &client, clientTable, table);
251                                 if( ! client.isConnected ){
252                                     logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id);
253                                         continue clientLoop;
254                                 }
255                             }
256                             clientTable.ops = []; // XXX gestisci le singole failure...
257                         }
258                    }
259                    if( syncStarted ){
260                        req.operation = SyncOperationReq.SyncOperation.completed;
261                        client.sendRequest(req);
262                        logInfo("mars - database operations for client %s sync completed", client.id);
263                    }
264                } 
265             }
266             foreach( table; tables ){ table.unsafeReset(); }
267         }
268     }
269 
270     private {
271         MarsClient[string] marsClients;
272     public    BaseServerSideTable!(MarsClient*)[] tables;
273     }
274 
275 }
276 __gshared MarsServer marsServer;
277 
278 // adjust the same function in mars.ts server
279 ulong indexStatementFor(ulong table, string op){
280     enum ops = 7; // XXX
281     if      (op == "insert"){ return table * ops + 0; }
282     else  if(op == "update"){ return table * ops + 1; }
283     else  if(op == "delete"){ return table * ops + 2; }
284     else  if(op == "updateDecorations"){ return table * ops + 3; }
285     else  if(op == "select"){ return table * ops + 4; }
286     else  if(op == "selectFromWhere"){ return table * ops + 5; }
287     else  if(op == "updateDecoratedRecord"){ return table * ops + 6; }
288     assert(false, "unknown ops!");
289 }
290 enum jsIndexStatementFor = `(
291 function a(table, op)
292 {
293     const ops = 7;
294     if      (op == "insert"){ return table * ops + 0; }
295     else  if(op == "update"){ return table * ops + 1; }
296     else  if(op == "delete"){ return table * ops + 2; }
297     else  if(op == "updateDecorations"){ return table * ops + 3; }
298     else  if(op == "select"){ return table * ops + 4; }
299     else  if(op == "selectFromWhere"){ return table * ops + 5; }
300     else  if(op == "updateDecoratedRecord"){ return table * ops + 6; }
301     alert("unknown ops!");
302 })
303 `;
304 
305 struct MarsServerConfiguration
306 {
307     immutable(Schema) schemaExposed;
308     string alasqlCreateDatabase;
309     immutable(string)[] alasqlStatements;
310     immutable(string)[] jsStatements;
311     immutable string[] serverMethods;
312 
313     
314     immutable DatabaseService databaseService;
315 
316     string pgsqlUser, pgsqlPassword; // for autologin
317 }
318 
319 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){
320     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
321             methods.idup, c.databaseService, c.pgsqlUser, c.pgsqlPassword);
322 }
323 
324 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){
325     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
326             c.serverMethods, DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword);
327 }
328 
329 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){
330     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
331             c.serverMethods, c.databaseService, login, password);
332 }