1 /**
2  * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db
3  * like the `pgsql`.
4  */
5 
6 module mars.server;
7 
8 import std.algorithm.iteration, std.array, std.conv, std.format;
9       
10        
11 
12 import std.experimental.logger;
13 
14 import mars.client;
15 import mars.defs;
16 
17 import mars.sync;
18 import mars.pgsql;
19 import mars.msg;
20 version(unittest) import mars.starwars;
21 
22 import vibe.core.log;
23 import vibe.data.json;
24 import vibe.core.task;
25 
26 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) {
27     static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 
28     InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]);
29     static if( tables.length > 1 ){
30         InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]);
31     }
32 }
33 unittest {
34     MarsServer marsServer_ = new MarsServer(MarsServerConfiguration());
35     enum ctTables = starwarsSchema.tables[0 .. 2];
36     InstantiateTables!(ctTables)(marsServer_, [], []);
37 }
38 
39 private void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){
40     auto serverSideTable = new ServerSideTable!(MarsClient*, table)();
41     m.tables ~= serverSideTable;
42   
43     static if( ! is(Fixtures == void[]) ){  // ... no fixtures, empty array of void ...
44         foreach(fixture; fixtures){
45             serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand));
46         }
47     }
48 }
49 
50 
51 auto serverSideTable(immutable(Table) table)(MarsServer m){
52     foreach( t; m.tables ){
53         if( t.definition.name == table.name ){
54             auto c = cast(ServerSideTable!(MarsClient*, table))(t);
55             return c;
56         }
57     }
58     assert(false);
59 }
60 
61 class MarsServer
62 {
63     /// Register the client between the connected clients
64     MarsClient* engageClient(string clientId)
65     {
66         auto client = clientId in marsClients;
67         if( client is null ){
68             logInfo("mars S - %s - this is a new client.", clientId);
69             marsClients[clientId] = MarsClient(clientId, configuration.databaseService);
70             client = clientId in marsClients;
71         }
72         else {
73             // ... the client was already engaged, for safety, wipe the client side tables ...
74             logInfo("mars S - %s - this is a reconnection, wiping out the client side tables.", clientId);
75             foreach(ref table; tables){
76                 table.wipeClientSideTable(client.id);
77             }
78 
79         }
80         assert( client !is null );
81         client.connected();
82         return client;
83     }
84 
85     void disposeClient(MarsClient* client) in { 
86         assert(client !is null); assert(client.id in marsClients); 
87     } body {
88         client.disconnected();
89     }
90 
91     /// Create the server side tables for the client, preparing the initial sync op.
92     /// Callee: protoauth.
93     void createClientSideTablesFor(MarsClient* client){
94         // ... the tables that are exposed in the schema ...
95         foreach(ref table; tables){
96             table.createClientSideTable(client.id);
97         }
98         //logInfo("mars - created %d client side tables", client.tables.length);
99         // XXX questo è da ripensare con un meccanismo generico
100         client.serverSideMethods = this.serverSideMethods;
101     }
102 
103     // The mars protocol has completed the handshake and setup, request can be sent and received.
104     /// Called by 'protomars'  module.
105     void onMarsProtocolReady(MarsClient* client){
106 
107         //... connect to the DB if autologin is enabled server side
108         if( configuration.pgsqlUser != "" ){
109             auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword);
110             if( dbAuthorised != AuthoriseError.authorised ) 
111                 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL");
112             auto request = AutologinReq(); with(request){
113                 username = configuration.pgsqlUser;
114                 sqlCreateDatabase = configuration.alasqlCreateDatabase;
115                 sqlStatements = configuration.alasqlStatements;
116                 jsStatements = configuration.jsStatements;
117             }
118             logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser);
119             client.sendRequest(request);
120             createClientSideTablesFor(client);
121         }
122         startDatabaseHandler();
123     }
124 
125 
126     void broadcast(M)(M message)
127     {
128         import std.experimental.logger : trace;
129 
130         //trace("tracing...");
131         foreach(clientId, marsClient; marsClients)
132         {
133         
134         //trace("tracing...");
135         if( marsClient.isConnected() ) marsClient.sendBroadcast(message);
136         //trace("tracing...");
137         }
138     }
139 
140     /**
141     Returns: a pointer to the mars client with that id, or null if it does not exists. */
142     MarsClient* getClient(string clientId){ return clientId in marsClients; }
143 
144     string delegate(MarsClient, string, Json) serverSideMethods;
145 
146     // ================
147     this(immutable(MarsServerConfiguration) c){
148         assert(marsServer is null);
149         marsServer = this;
150         configuration = c;
151     }
152 
153 
154     static MarsServerConfiguration ExposeSchema(immutable(Schema) schema)
155     {
156         import mars.alasql : createDatabase, selectFrom, insertIntoParameter, updateParameter, deleteFromParameter,
157                updateDecorationsParameter, pkValuesJs, pkValuesWhereJs;
158         
159         immutable(string)[] statements;
160         immutable(string)[] jsStatements;
161   
162         jsStatements ~= jsIndexStatementFor;
163         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.decorateRows.to!string).join(", ").array);
164         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.cacheRows.to!string).join(", ").array);
165         foreach(table; schema.tables){
166             statements ~= table.insertIntoParameter;
167             statements ~= table.updateParameter;
168             statements ~= table.deleteFromParameter;
169             statements ~= table.updateDecorationsParameter;
170             statements ~= table.selectFrom;
171             // update the 'indexStatementFor' below in this module...
172             jsStatements ~= table.pkValuesJs;
173             jsStatements ~= table.pkValuesWhereJs;
174         }
175         return MarsServerConfiguration( schema, createDatabase(schema), statements, jsStatements );
176     }
177 
178     MarsServerConfiguration configuration;
179 
180     private void startDatabaseHandler(){
181 
182         import vibe.core.core : runTask;
183         import vibe.core.log : logInfo;
184         
185         logInfo("mars - database handler starting.");
186         
187         //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); }
188         if( databaseHandler == Task.init ) databaseHandler = runTask(&handleDatabase);
189     }
190     Task databaseHandler;
191 
192 
193     /**
194     gestisci le cose se succedono a livello db, come push per i client mars */
195     void handleDatabase()
196     {
197         import std.algorithm : sort;
198         import std.datetime : seconds;
199         import vibe.core.core : sleep;
200         import vibe.core.log : logInfo;
201 
202         while(true) {
203             sleep(2.seconds);
204             logInfo("mars - database handler starting to check for sync...%s", Task.getThis());
205 
206             clientLoop: foreach(ref client; marsClients ){
207                if( client.isConnected && client.authorised && client.db !is null ){
208                    bool syncStarted = false;
209                    logInfo("mars - database operations for client %s", client.id);
210                    auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting);
211                    foreach( table; tables ){
212                        auto clientTable = table.clientSideTables[client.id];
213                        //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name);
214                        foreach(op; clientTable.ops){
215                            if( ! syncStarted ){
216                                syncStarted = true; 
217                                client.sendRequest(req);
218                                logInfo("mars - database operations for client %s sync started", client.id);
219                                if( ! client.isConnected ){
220                                    logInfo("mars - the client %s seems disconnected, continuing with another client", client.id);
221                                    continue clientLoop;
222                                }
223                            }
224                            logInfo("mars - executing database operation for client %s", client.id);
225                            op.execute(client.db, &client, clientTable, table);
226                            if( ! client.isConnected ){
227                                logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id);
228                                 continue clientLoop;
229                            }
230                        }
231                        clientTable.ops = []; // XXX gestisci le singole failure...
232                    }
233                    if( syncStarted ){
234                        req.operation = SyncOperationReq.SyncOperation.completed;
235                        client.sendRequest(req);
236                        logInfo("mars - database operations for client %s sync completed", client.id);
237                    }
238                } 
239             }
240             foreach( table; tables ){ table.unsafeReset(); }
241         }
242     }
243 
244     private {
245         MarsClient[string] marsClients;
246     public    BaseServerSideTable!(MarsClient*)[] tables;
247     }
248 
249 }
250 __gshared MarsServer marsServer;
251 
252 // adjust the same function in mars.ts server
253 ulong indexStatementFor(ulong table, string op){
254     enum ops = 5; // XXX
255     if      (op == "insert"){ return table * ops + 0; }
256     else  if(op == "update"){ return table * ops + 1; }
257     else  if(op == "delete"){ return table * ops + 2; }
258     else  if(op == "updateDecorations"){ return table * ops + 3; }
259     else  if(op == "select"){ return table * ops + 4; }
260     assert(false, "unknown ops!");
261 }
262 enum jsIndexStatementFor = `(
263 function a(table, op)
264 {
265     const ops = 5;
266     if      (op == "insert"){ return table * ops + 0; }
267     else  if(op == "update"){ return table * ops + 1; }
268     else  if(op == "delete"){ return table * ops + 2; }
269     else  if(op == "updateDecorations"){ return table * ops + 3; }
270     else  if(op == "select"){ return table * ops + 4; }
271     alert("unknown ops!");
272 })
273 `;
274 
275 struct MarsServerConfiguration
276 {
277     immutable(Schema) schemaExposed;
278     string alasqlCreateDatabase;
279     immutable(string)[] alasqlStatements;
280     immutable(string)[] jsStatements;
281     immutable string[] serverMethods;
282 
283     
284     immutable DatabaseService databaseService;
285 
286     string pgsqlUser, pgsqlPassword; // for autologin
287 }
288 
289 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){
290     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
291             methods.idup, c.databaseService, c.pgsqlUser, c.pgsqlPassword);
292 }
293 
294 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){
295     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
296             c.serverMethods, DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword);
297 }
298 
299 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){
300     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
301             c.serverMethods, c.databaseService, login, password);
302 }