1 /**
2  * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db
3  * like the `pgsql`.
4  */
5 
6 module mars.server;
7 
8 import std.algorithm.iteration, std.array, std.conv, std.format;
9       
10        
11 
12 import std.experimental.logger;
13 
14 import mars.client;
15 import mars.defs;
16 
17 import mars.sync;
18 import mars.pgsql;
19 import mars.msg;
20 version(unittest) import mars.starwars;
21 
22 import vibe.core.log;
23 import vibe.data.json;
24 import vibe.core.task;
25 
26 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) {
27     static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 
28     InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]);
29     static if( tables.length > 1 ){
30         InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]);
31     }
32 }
33 unittest {
34     MarsServer marsServer_ = new MarsServer(MarsServerConfiguration());
35     enum ctTables = starwarsSchema.tables[0 .. 2];
36     InstantiateTables!(ctTables)(marsServer_, [], []);
37 }
38 
39 private void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){
40     auto serverSideTable = new ServerSideTable!(MarsClient*, table)();
41     m.tables ~= serverSideTable;
42   
43     static if( ! is(Fixtures == void[]) ){  // ... no fixtures, empty array of void ...
44         foreach(fixture; fixtures){
45             serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand));
46         }
47     }
48 }
49 
50 
51 auto serverSideTable(immutable(Table) table)(MarsServer m){
52     foreach( t; m.tables ){
53         if( t.definition.name == table.name ){
54             auto c = cast(ServerSideTable!(MarsClient*, table))(t);
55             return c;
56         }
57     }
58     assert(false);
59 }
60 
61 class MarsServer
62 {
63     /// Register the client between the connected clients
64     MarsClient* engageClient(string clientId)
65     {
66         auto client = clientId in marsClients;
67         if( client is null ){
68             logInfo("mars S - %s - this is a new client.", clientId);
69             marsClients[clientId] = MarsClient(clientId, configuration.databaseService);
70             client = clientId in marsClients;
71         }
72         else {
73             // ... the client was already engaged, for safety, wipe the client side tables ...
74             logInfo("mars S - %s - this is a reconnection, wiping out the client side tables.", clientId);
75             foreach(ref table; tables){
76                 table.wipeClientSideTable(client.id);
77             }
78 
79         }
80         assert( client !is null );
81         client.connected();
82         return client;
83     }
84 
85     void disposeClient(MarsClient* client) in { 
86         assert(client !is null); assert(client.id in marsClients); 
87     } body {
88         client.disconnected();
89     }
90 
91     /// Create the server side tables for the client, preparing the initial sync op.
92     /// Callee: protoauth.
93     void createClientSideTablesFor(MarsClient* client){
94         // ... the tables that are exposed in the schema ...
95         foreach(ref table; tables){
96             table.createClientSideTable(client.id);
97         }
98         //logInfo("mars - created %d client side tables", client.tables.length);
99         // XXX questo è da ripensare con un meccanismo generico
100         client.serverSideMethods = this.serverSideMethods;
101     }
102 
103     void wipeClientSideTablesFor(MarsClient* client) {
104         foreach(ref table; tables){
105             table.wipeClientSideTable(client.id);
106         }
107     }
108 
109     // The mars protocol has completed the handshake and setup, request can be sent and received.
110     /// Called by 'protomars'  module.
111     void onMarsProtocolReady(MarsClient* client){
112 
113         //... connect to the DB if autologin is enabled server side
114         if( configuration.pgsqlUser != "" ){
115             auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword);
116             if( dbAuthorised != AuthoriseError.authorised ) 
117                 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL");
118             auto request = AutologinReq(); with(request){
119                 username = configuration.pgsqlUser;
120                 sqlCreateDatabase = configuration.alasqlCreateDatabase;
121                 sqlStatements = configuration.alasqlStatements;
122                 jsStatements = configuration.jsStatements;
123             }
124             logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser);
125             client.sendRequest(request);
126             createClientSideTablesFor(client);
127         }
128         startDatabaseHandler();
129     }
130 
131 
132     void broadcast(M)(M message)
133     {
134         import std.experimental.logger : trace;
135 
136         //trace("tracing...");
137         foreach(clientId, marsClient; marsClients)
138         {
139         
140         //trace("tracing...");
141         if( marsClient.isConnected() ) marsClient.sendBroadcast(message);
142         //trace("tracing...");
143         }
144     }
145 
146     /**
147     Returns: a pointer to the mars client with that id, or null if it does not exists. */
148     MarsClient* getClient(string clientId){ return clientId in marsClients; }
149 
150     string delegate(MarsClient, string, Json) serverSideMethods;
151 
152     // ================
153     this(immutable(MarsServerConfiguration) c){
154         assert(marsServer is null);
155         marsServer = this;
156         configuration = c;
157     }
158 
159 
160     static MarsServerConfiguration ExposeSchema(immutable(Schema) schema)
161     {
162         import mars.alasql : createDatabase, selectFrom, insertIntoParameter, updateParameter, deleteFromParameter,
163                updateDecorationsParameter, pkValuesJs, pkValuesWhereJs;
164         
165         immutable(string)[] statements;
166         immutable(string)[] jsStatements;
167   
168         jsStatements ~= jsIndexStatementFor;
169         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.decorateRows.to!string).join(", ").array);
170         jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.cacheRows.to!string).join(", ").array);
171         foreach(table; schema.tables){
172             statements ~= table.insertIntoParameter;
173             statements ~= table.updateParameter;
174             statements ~= table.deleteFromParameter;
175             statements ~= table.updateDecorationsParameter;
176             statements ~= table.selectFrom;
177             // update the 'indexStatementFor' below in this module...
178             jsStatements ~= table.pkValuesJs;
179             jsStatements ~= table.pkValuesWhereJs;
180         }
181         return MarsServerConfiguration( schema, createDatabase(schema), statements, jsStatements );
182     }
183 
184     MarsServerConfiguration configuration;
185 
186     private void startDatabaseHandler(){
187 
188         import vibe.core.core : runTask;
189         import vibe.core.log : logInfo;
190         
191         logInfo("mars - database handler starting.");
192         
193         //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); }
194         if( databaseHandler == Task.init ) databaseHandler = runTask(&handleDatabase);
195     }
196     Task databaseHandler;
197 
198 
199     /**
200     gestisci le cose se succedono a livello db, come push per i client mars */
201     void handleDatabase()
202     {
203         import std.algorithm : sort;
204         import std.datetime : seconds;
205         import vibe.core.core : sleep;
206         import vibe.core.log : logInfo;
207 
208         while(true) {
209             sleep(2.seconds);
210             logInfo("mars - database handler starting to check for sync...%s", Task.getThis());
211 
212             clientLoop: foreach(ref client; marsClients ){
213                if( client.isConnected && client.authorised && client.db !is null ){
214                    bool syncStarted = false;
215                    logInfo("mars - database operations for client %s", client.id);
216                    auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting);
217                    foreach( table; tables ){
218                        auto clientTable = table.clientSideTables[client.id];
219                        //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name);
220                        foreach(op; clientTable.ops){
221                            if( ! syncStarted ){
222                                syncStarted = true; 
223                                client.sendRequest(req);
224                                logInfo("mars - database operations for client %s sync started", client.id);
225                                if( ! client.isConnected ){
226                                    logInfo("mars - the client %s seems disconnected, continuing with another client", client.id);
227                                    continue clientLoop;
228                                }
229                            }
230                            logInfo("mars - executing database operation for client %s", client.id);
231                            op.execute(client.db, &client, clientTable, table);
232                            if( ! client.isConnected ){
233                                logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id);
234                                 continue clientLoop;
235                            }
236                        }
237                        clientTable.ops = []; // XXX gestisci le singole failure...
238                    }
239                    if( syncStarted ){
240                        req.operation = SyncOperationReq.SyncOperation.completed;
241                        client.sendRequest(req);
242                        logInfo("mars - database operations for client %s sync completed", client.id);
243                    }
244                } 
245             }
246             foreach( table; tables ){ table.unsafeReset(); }
247         }
248     }
249 
250     private {
251         MarsClient[string] marsClients;
252     public    BaseServerSideTable!(MarsClient*)[] tables;
253     }
254 
255 }
256 __gshared MarsServer marsServer;
257 
258 // adjust the same function in mars.ts server
259 ulong indexStatementFor(ulong table, string op){
260     enum ops = 5; // XXX
261     if      (op == "insert"){ return table * ops + 0; }
262     else  if(op == "update"){ return table * ops + 1; }
263     else  if(op == "delete"){ return table * ops + 2; }
264     else  if(op == "updateDecorations"){ return table * ops + 3; }
265     else  if(op == "select"){ return table * ops + 4; }
266     assert(false, "unknown ops!");
267 }
268 enum jsIndexStatementFor = `(
269 function a(table, op)
270 {
271     const ops = 5;
272     if      (op == "insert"){ return table * ops + 0; }
273     else  if(op == "update"){ return table * ops + 1; }
274     else  if(op == "delete"){ return table * ops + 2; }
275     else  if(op == "updateDecorations"){ return table * ops + 3; }
276     else  if(op == "select"){ return table * ops + 4; }
277     alert("unknown ops!");
278 })
279 `;
280 
281 struct MarsServerConfiguration
282 {
283     immutable(Schema) schemaExposed;
284     string alasqlCreateDatabase;
285     immutable(string)[] alasqlStatements;
286     immutable(string)[] jsStatements;
287     immutable string[] serverMethods;
288 
289     
290     immutable DatabaseService databaseService;
291 
292     string pgsqlUser, pgsqlPassword; // for autologin
293 }
294 
295 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){
296     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
297             methods.idup, c.databaseService, c.pgsqlUser, c.pgsqlPassword);
298 }
299 
300 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){
301     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
302             c.serverMethods, DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword);
303 }
304 
305 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){
306     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements,
307             c.serverMethods, c.databaseService, login, password);
308 }