1 /** 2 * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db 3 * like the `pgsql`. 4 */ 5 6 module mars.server; 7 8 import std.algorithm.iteration, std.array, std.conv, std.format; 9 10 11 12 import std.experimental.logger; 13 14 import mars.client; 15 import mars.defs; 16 17 import mars.sync; 18 import mars.pgsql; 19 import mars.msg; 20 version(unittest) import mars.starwars; 21 22 import vibe.core.log; 23 import vibe.data.json; 24 import vibe.core.task; 25 26 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) { 27 static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 28 InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]); 29 static if( tables.length > 1 ){ 30 InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]); 31 } 32 } 33 unittest { 34 MarsServer marsServer_ = new MarsServer(MarsServerConfiguration()); 35 enum ctTables = starwarsSchema.tables[0 .. 2]; 36 InstantiateTables!(ctTables)(marsServer_, [], []); 37 } 38 39 private void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){ 40 auto serverSideTable = new ServerSideTable!(MarsClient*, table)(); 41 m.tables ~= serverSideTable; 42 43 static if( ! is(Fixtures == void[]) ){ // ... no fixtures, empty array of void ... 44 foreach(fixture; fixtures){ 45 serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand)); 46 } 47 } 48 } 49 50 51 auto serverSideTable(immutable(Table) table)(MarsServer m){ 52 foreach( t; m.tables ){ 53 if( t.definition.name == table.name ){ 54 auto c = cast(ServerSideTable!(MarsClient*, table))(t); 55 return c; 56 } 57 } 58 assert(false); 59 } 60 61 class MarsServer 62 { 63 /// Register the client between the connected clients 64 MarsClient* engageClient(string clientId) 65 { 66 auto client = clientId in marsClients; 67 if( client is null ){ 68 logInfo("mars S - %s - this is a new client.", clientId); 69 marsClients[clientId] = MarsClient(clientId, configuration.databaseService); 70 client = clientId in marsClients; 71 } 72 else { 73 // ... the client was already engaged, for safety, wipe the client side tables ... 74 logInfo("mars S - %s - this is a reconnection, wiping out the client side tables.", clientId); 75 foreach(ref table; tables){ 76 table.wipeClientSideTable(client.id); 77 } 78 79 } 80 assert( client !is null ); 81 client.connected(); 82 return client; 83 } 84 85 void disposeClient(MarsClient* client) in { 86 assert(client !is null); assert(client.id in marsClients); 87 } body { 88 client.disconnected(); 89 } 90 91 /// Create the server side tables for the client, preparing the initial sync op. 92 /// Callee: protoauth. 93 void createClientSideTablesFor(MarsClient* client){ 94 // ... the tables that are exposed in the schema ... 95 foreach(ref table; tables){ 96 table.createClientSideTable(client.id); 97 } 98 //logInfo("mars - created %d client side tables", client.tables.length); 99 // XXX questo è da ripensare con un meccanismo generico 100 client.serverSideMethods = this.serverSideMethods; 101 } 102 103 void wipeClientSideTablesFor(MarsClient* client) { 104 foreach(ref table; tables){ 105 table.wipeClientSideTable(client.id); 106 } 107 } 108 109 // The mars protocol has completed the handshake and setup, request can be sent and received. 110 /// Called by 'protomars' module. 111 void onMarsProtocolReady(MarsClient* client){ 112 113 //... connect to the DB if autologin is enabled server side 114 if( configuration.pgsqlUser != "" ){ 115 auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword); 116 if( dbAuthorised != AuthoriseError.authorised ) 117 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL"); 118 auto request = AutologinReq(); with(request){ 119 username = configuration.pgsqlUser; 120 sqlCreateDatabase = configuration.alasqlCreateDatabase; 121 sqlStatements = configuration.alasqlStatements; 122 jsStatements = configuration.jsStatements; 123 } 124 logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser); 125 client.sendRequest(request); 126 createClientSideTablesFor(client); 127 } 128 startDatabaseHandler(); 129 } 130 131 132 void broadcast(M)(M message) 133 { 134 import std.experimental.logger : trace; 135 136 //trace("tracing..."); 137 foreach(clientId, marsClient; marsClients) 138 { 139 140 //trace("tracing..."); 141 if( marsClient.isConnected() ) marsClient.sendBroadcast(message); 142 //trace("tracing..."); 143 } 144 } 145 146 /** 147 Returns: a pointer to the mars client with that id, or null if it does not exists. */ 148 MarsClient* getClient(string clientId){ return clientId in marsClients; } 149 150 string delegate(MarsClient, string, Json) serverSideMethods; 151 152 // ================ 153 this(immutable(MarsServerConfiguration) c){ 154 assert(marsServer is null); 155 marsServer = this; 156 configuration = c; 157 } 158 159 160 static MarsServerConfiguration ExposeSchema(immutable(Schema) schema) 161 { 162 import mars.alasql : createDatabase, selectFrom, selectFromWhere, insertIntoParameter, updateParameter, 163 deleteFromParameter, updateDecorationsParameter, updateDecoratedRecord, 164 pkValuesJs, pkValuesWhereJs, referenceJs; 165 166 immutable(string)[] statements; 167 immutable(string)[] jsStatements; 168 169 jsStatements ~= jsIndexStatementFor; // returns index for operation O on table T 170 jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.decorateRows.to!string).join(", ").array); 171 jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.cacheRows.to!string).join(", ").array); 172 jsStatements ~= `{ %s }`.format(schema.tables.map!((t) => `"%s": { "index": %d }`.format(t.name, t.index) ).join(", ").array); 173 foreach(table; schema.tables){ 174 statements ~= table.insertIntoParameter; // 'insert' 175 statements ~= table.updateParameter; // 'update' 176 statements ~= table.deleteFromParameter; // 'delete' 177 statements ~= table.updateDecorationsParameter; // 'updateDecorations' 178 statements ~= table.selectFrom; 179 statements ~= table.selectFromWhere; // 'selectFromWhere' 180 statements ~= table.updateDecoratedRecord; // 'updateDecoratedRecord' 181 // update the 'indexStatementFor' below in this module... 182 183 // ... update in server.ts under the 'prepareJsStatements' method 184 jsStatements ~= table.pkValuesJs; 185 jsStatements ~= table.pkValuesWhereJs; 186 jsStatements ~= referenceJs(table, schema); 187 } 188 return MarsServerConfiguration( schema, createDatabase(schema), statements, jsStatements ); 189 } 190 191 MarsServerConfiguration configuration; 192 193 private void startDatabaseHandler(){ 194 195 import vibe.core.core : runTask; 196 import vibe.core.log : logInfo; 197 198 logInfo("mars - database handler starting."); 199 200 //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); } 201 if( databaseHandler == Task.init ) databaseHandler = runTask(&handleDatabase); 202 } 203 Task databaseHandler; 204 205 206 /** 207 gestisci le cose se succedono a livello db, come push per i client mars */ 208 void handleDatabase() 209 { 210 import std.algorithm : sort; 211 import std.datetime : seconds; 212 import vibe.core.core : sleep, exitEventLoop; 213 import vibe.core.log : logInfo; 214 215 int loopCount = 0; 216 scope(failure) exitEventLoop(); 217 while(true) { 218 loopCount ++; 219 sleep(2.seconds); 220 //logInfo("mars - database handler starting to check for sync...%s", Task.getThis()); 221 222 clientLoop: foreach(ref client; marsClients ){ 223 if( client.isConnected && client.authorised ){ 224 if( loopCount % 5 == 0 ){ 225 bool sent = client.pingWebClient(); 226 if( ! sent ){ 227 logInfo("mars - the client %s seems disconnected after a ping request, continuing with another client", client.id); 228 continue clientLoop; 229 } 230 } 231 bool syncStarted = false; 232 logInfo("mars - database operations for client %s", client.id); 233 auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting); 234 foreach( table; tables ){ 235 if(client.id in table.clientSideTables) 236 { 237 auto clientTable = table.clientSideTables[client.id]; 238 //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name); 239 foreach(op; clientTable.ops){ 240 if( ! syncStarted ){ 241 syncStarted = true; 242 client.sendRequest(req); 243 logInfo("mars - database operations for client %s sync started", client.id); 244 if( ! client.isConnected ){ 245 logInfo("mars - the client %s seems disconnected, continuing with another client", client.id); 246 continue clientLoop; 247 } 248 } 249 logInfo("mars - executing database operation for client %s", client.id); 250 op.execute(client.db, &client, clientTable, table); 251 if( ! client.isConnected ){ 252 logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id); 253 continue clientLoop; 254 } 255 } 256 clientTable.ops = []; // XXX gestisci le singole failure... 257 } 258 } 259 if( syncStarted ){ 260 req.operation = SyncOperationReq.SyncOperation.completed; 261 client.sendRequest(req); 262 logInfo("mars - database operations for client %s sync completed", client.id); 263 } 264 } 265 } 266 foreach( table; tables ){ table.unsafeReset(); } 267 } 268 } 269 270 private { 271 MarsClient[string] marsClients; 272 public BaseServerSideTable!(MarsClient*)[] tables; 273 } 274 275 } 276 __gshared MarsServer marsServer; 277 278 // adjust the same function in mars.ts server 279 ulong indexStatementFor(ulong table, string op){ 280 enum ops = 7; // XXX 281 if (op == "insert"){ return table * ops + 0; } 282 else if(op == "update"){ return table * ops + 1; } 283 else if(op == "delete"){ return table * ops + 2; } 284 else if(op == "updateDecorations"){ return table * ops + 3; } 285 else if(op == "select"){ return table * ops + 4; } 286 else if(op == "selectFromWhere"){ return table * ops + 5; } 287 else if(op == "updateDecoratedRecord"){ return table * ops + 6; } 288 assert(false, "unknown ops!"); 289 } 290 enum jsIndexStatementFor = `( 291 function a(table, op) 292 { 293 const ops = 7; 294 if (op == "insert"){ return table * ops + 0; } 295 else if(op == "update"){ return table * ops + 1; } 296 else if(op == "delete"){ return table * ops + 2; } 297 else if(op == "updateDecorations"){ return table * ops + 3; } 298 else if(op == "select"){ return table * ops + 4; } 299 else if(op == "selectFromWhere"){ return table * ops + 5; } 300 else if(op == "updateDecoratedRecord"){ return table * ops + 6; } 301 alert("unknown ops!"); 302 }) 303 `; 304 305 struct MarsServerConfiguration 306 { 307 immutable(Schema) schemaExposed; 308 string alasqlCreateDatabase; 309 immutable(string)[] alasqlStatements; 310 immutable(string)[] jsStatements; 311 immutable string[] serverMethods; 312 313 314 immutable DatabaseService databaseService; 315 316 string pgsqlUser, pgsqlPassword; // for autologin 317 } 318 319 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){ 320 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 321 methods.idup, c.databaseService, c.pgsqlUser, c.pgsqlPassword); 322 } 323 324 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){ 325 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 326 c.serverMethods, DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword); 327 } 328 329 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){ 330 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 331 c.serverMethods, c.databaseService, login, password); 332 }