1 /**
2  * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db
3  * like the `pgsql`.
4  */
5 
6 module mars.server;
7 
8 import std.experimental.logger;
9 
10 import mars.client;
11 import mars.defs;
12 
13 import mars.sync;
14 import mars.pgsql;
15 import mars.msg;
16 
17 import vibe.core.log;
18 import vibe.data.json;
19 
20 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) {
21     static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 
22     InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]);
23     static if( tables.length > 1 ){
24         InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]);
25     }
26 }
27 
28 void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){
29     auto serverSideTable = new ServerSideTable!(MarsClient*, table)();
30     m.tables ~= serverSideTable;
31   
32     static if( ! is(Fixtures == void[]) ){  // ... no fixtures, empty array of void ...
33         foreach(fixture; fixtures){
34             serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand));
35         }
36     }
37 }
38 
39 
40 auto serverSideTable(immutable(Table) table)(MarsServer m){
41     foreach( t; m.tables ){
42         if( t.definition.name == table.name ){
43             auto c = cast(ServerSideTable!(MarsClient*, table))(t);
44             return c;
45         }
46     }
47     assert(false);
48 }
49 
50 class MarsServer
51 {
52     /// Register the client between the connected clients
53     MarsClient* engageClient(string clientId)
54     {
55         auto client = clientId in marsClients;
56         if( client is null ){
57             marsClients[clientId] = MarsClient(clientId, configuration.databaseService);
58             client = clientId in marsClients;
59         }
60         assert( client !is null );
61         client.connected();
62         return client;
63     }
64 
65     void disposeClient(MarsClient* client) in { 
66         assert(client !is null); assert(client.id in marsClients); 
67     } body {
68         client.disconnected();
69     }
70 
71     /// Create the server side tables for the client, preparing the initial sync op.
72     /// Callee: protoauth.
73     void createClientSideTablesFor(MarsClient* client){
74         // ... the tables that are exposed in the schema ...
75         foreach(ref table; tables){
76             /*client.tables[table.definition.name] =*/ table.createClientSideTable(client.id);
77         }
78         //logInfo("mars - created %d client side tables", client.tables.length);
79         // XXX questo è da ripensare con un meccanismo generico
80         client.serverSideMethods = this.serverSideMethods;
81     }
82 
83     // The mars protocol has completed the handshake and setup, request can be sent and received.
84     /// Called by 'protomars'  module.
85     void onMarsProtocolReady(MarsClient* client){
86 
87         //... connect to the DB if autologin is enabled server side
88         if( configuration.pgsqlUser != "" ){
89             auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword);
90             if( dbAuthorised != AuthoriseError.authorised ) 
91                 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL");
92             auto request = AutologinReq(); with(request){
93                 username = configuration.pgsqlUser;
94                 sqlCreateDatabase = configuration.alasqlCreateDatabase;
95                 sqlStatements = configuration.alasqlStatements;
96             }
97             logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser);
98             client.sendRequest(request);
99             createClientSideTablesFor(client);
100         }
101         startDatabaseHandler();
102     }
103 
104 
105     void broadcast(M)(M message)
106     {
107         import std.experimental.logger : trace;
108 
109         //trace("tracing...");
110         foreach(clientId, marsClient; marsClients)
111         {
112         
113         //trace("tracing...");
114         if( marsClient.isConnected() ) marsClient.sendBroadcast(message);
115         //trace("tracing...");
116         }
117     }
118 
119     /**
120     Returns: a pointer to the mars client with that id, or null if it does not exists. */
121     MarsClient* getClient(string clientId){ return clientId in marsClients; }
122 
123     string delegate(MarsClient, string, Json) serverSideMethods;
124 
125     // ================
126     this(immutable(MarsServerConfiguration) c){
127         assert(marsServer is null);
128         marsServer = this;
129         configuration = c;
130     }
131 
132 
133     static MarsServerConfiguration ExposeSchema(immutable(Schema) schema)
134     {
135         import mars.alasql : createDatabase, insertIntoParameter, updateParameter, deleteFromParameter;
136         
137         immutable(string)[] statements;
138         foreach(table; schema.tables){
139             statements ~= table.insertIntoParameter;
140             statements ~= table.updateParameter;
141             statements ~= table.deleteFromParameter;
142         }
143         return MarsServerConfiguration( schema, createDatabase(schema), statements );    
144     }
145 
146     MarsServerConfiguration configuration;
147 
148     private void startDatabaseHandler(){
149 
150         import vibe.core.core : runTask;
151         import vibe.core.log : logInfo;
152         
153         logInfo("mars - database handler starting.");
154         
155         //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); }
156         runTask(&handleDatabase);
157     }
158 
159 
160     /**
161     gestisci le cose se succedono a livello db, come push per i client mars */
162     void handleDatabase()
163     {
164         import std.algorithm : sort;
165         import std.datetime : seconds;
166         import vibe.core.core : sleep;
167         import vibe.core.log : logInfo;
168 
169         while(true) {
170             sleep(2.seconds);
171 
172             clientLoop: foreach(ref client; marsClients ){
173                if( client.isConnected && client.authorised && client.db !is null ){
174                    bool syncStarted = false;
175                    //logInfo("mars - database operations for client %s", client.id);
176                    auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting);
177                    foreach( table; tables ){
178                        auto clientTable = table.clientSideTables[client.id];
179                        //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name);
180                        foreach(op; clientTable.ops){
181                            if( ! syncStarted ){
182                                syncStarted = true; 
183                                client.sendRequest(req);
184                                if( ! client.isConnected ){
185                                    logInfo("mars - the client %s seems disconnected, continuing with another client", client.id);
186                                    continue clientLoop;
187                                }
188                            }
189                            logInfo("mars - executing database operation for client %s", client.id);
190                            op.execute(client.db, &client, clientTable, table);
191                            if( ! client.isConnected ){
192                                logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id);
193                                 continue clientLoop;
194                            }
195                        }
196                        clientTable.ops = []; // XXX gestisci le singole failure...
197                    }
198                    if( syncStarted ){
199                        req.operation = SyncOperationReq.SyncOperation.completed;
200                        client.sendRequest(req);
201                    }
202                } 
203             }
204             foreach( table; tables ){ table.unsafeReset(); }
205         }
206     }
207 
208     private {
209         MarsClient[string] marsClients;
210     public    BaseServerSideTable!(MarsClient*)[] tables;
211     }
212 
213 }
214 __gshared MarsServer marsServer;
215 
216 ulong indexStatementFor(ulong table, string op){
217     enum ops = 3;
218      if     (op == "insert"){ return table * ops + 0; }
219     else  if(op == "update"){ return table * ops + 1; }
220     else  if(op == "delete"){ return table * ops + 2; }
221      assert(false, "unknown ops!");
222 }
223 
224 struct MarsServerConfiguration
225 {
226     immutable(Schema) schemaExposed;
227     string alasqlCreateDatabase;
228     immutable(string)[] alasqlStatements;
229     immutable string[] serverMethods;
230 
231     
232     immutable DatabaseService databaseService;
233 
234     string pgsqlUser, pgsqlPassword; // for autologin
235 }
236 
237 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){
238     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, methods.idup,
239             c.databaseService, c.pgsqlUser, c.pgsqlPassword);
240 }
241 
242 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){
243     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.serverMethods,
244             DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword);
245 }
246 
247 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){
248     return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.serverMethods, c.databaseService,
249         login, password);
250 }