1 /** 2 * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db 3 * like the `pgsql`. 4 */ 5 6 module mars.server; 7 8 import std.algorithm.iteration, std.array, std.conv, std.format; 9 10 11 12 import std.experimental.logger; 13 14 import mars.client; 15 import mars.defs; 16 17 import mars.sync; 18 import mars.pgsql; 19 import mars.msg; 20 version(unittest) import mars.starwars; 21 22 import vibe.core.log; 23 import vibe.data.json; 24 import vibe.core.task; 25 26 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) { 27 static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 28 InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]); 29 static if( tables.length > 1 ){ 30 InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]); 31 } 32 } 33 unittest { 34 MarsServer marsServer_ = new MarsServer(MarsServerConfiguration()); 35 enum ctTables = starwarsSchema.tables[0 .. 2]; 36 InstantiateTables!(ctTables)(marsServer_, [], []); 37 } 38 39 private void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){ 40 auto serverSideTable = new ServerSideTable!(MarsClient*, table)(); 41 m.tables ~= serverSideTable; 42 43 static if( ! is(Fixtures == void[]) ){ // ... no fixtures, empty array of void ... 44 foreach(fixture; fixtures){ 45 serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand)); 46 } 47 } 48 } 49 50 51 auto serverSideTable(immutable(Table) table)(MarsServer m){ 52 foreach( t; m.tables ){ 53 if( t.definition.name == table.name ){ 54 auto c = cast(ServerSideTable!(MarsClient*, table))(t); 55 return c; 56 } 57 } 58 assert(false); 59 } 60 61 class MarsServer 62 { 63 /// Register the client between the connected clients 64 MarsClient* engageClient(string clientId) 65 { 66 auto client = clientId in marsClients; 67 if( client is null ){ 68 logInfo("mars S - %s - this is a new client.", clientId); 69 marsClients[clientId] = MarsClient(clientId, configuration.databaseService); 70 client = clientId in marsClients; 71 } 72 else { 73 // ... the client was already engaged, for safety, wipe the client side tables ... 74 logInfo("mars S - %s - this is a reconnection, wiping out the client side tables.", clientId); 75 foreach(ref table; tables){ 76 table.wipeClientSideTable(client.id); 77 } 78 79 } 80 assert( client !is null ); 81 client.connected(); 82 return client; 83 } 84 85 void disposeClient(MarsClient* client) in { 86 assert(client !is null); assert(client.id in marsClients); 87 } body { 88 client.disconnected(); 89 } 90 91 /// Create the server side tables for the client, preparing the initial sync op. 92 /// Callee: protoauth. 93 void createClientSideTablesFor(MarsClient* client){ 94 // ... the tables that are exposed in the schema ... 95 foreach(ref table; tables){ 96 table.createClientSideTable(client.id); 97 } 98 //logInfo("mars - created %d client side tables", client.tables.length); 99 // XXX questo è da ripensare con un meccanismo generico 100 client.serverSideMethods = this.serverSideMethods; 101 } 102 103 // The mars protocol has completed the handshake and setup, request can be sent and received. 104 /// Called by 'protomars' module. 105 void onMarsProtocolReady(MarsClient* client){ 106 107 //... connect to the DB if autologin is enabled server side 108 if( configuration.pgsqlUser != "" ){ 109 auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword); 110 if( dbAuthorised != AuthoriseError.authorised ) 111 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL"); 112 auto request = AutologinReq(); with(request){ 113 username = configuration.pgsqlUser; 114 sqlCreateDatabase = configuration.alasqlCreateDatabase; 115 sqlStatements = configuration.alasqlStatements; 116 jsStatements = configuration.jsStatements; 117 } 118 logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser); 119 client.sendRequest(request); 120 createClientSideTablesFor(client); 121 } 122 startDatabaseHandler(); 123 } 124 125 126 void broadcast(M)(M message) 127 { 128 import std.experimental.logger : trace; 129 130 //trace("tracing..."); 131 foreach(clientId, marsClient; marsClients) 132 { 133 134 //trace("tracing..."); 135 if( marsClient.isConnected() ) marsClient.sendBroadcast(message); 136 //trace("tracing..."); 137 } 138 } 139 140 /** 141 Returns: a pointer to the mars client with that id, or null if it does not exists. */ 142 MarsClient* getClient(string clientId){ return clientId in marsClients; } 143 144 string delegate(MarsClient, string, Json) serverSideMethods; 145 146 // ================ 147 this(immutable(MarsServerConfiguration) c){ 148 assert(marsServer is null); 149 marsServer = this; 150 configuration = c; 151 } 152 153 154 static MarsServerConfiguration ExposeSchema(immutable(Schema) schema) 155 { 156 import mars.alasql : createDatabase, selectFrom, insertIntoParameter, updateParameter, deleteFromParameter, 157 updateDecorationsParameter, pkValuesJs, pkValuesWhereJs; 158 159 immutable(string)[] statements; 160 immutable(string)[] jsStatements; 161 162 jsStatements ~= jsIndexStatementFor; 163 jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.decorateRows.to!string).join(", ").array); 164 jsStatements ~= `[%s]`.format(schema.tables.map!((t) => t.cacheRows.to!string).join(", ").array); 165 foreach(table; schema.tables){ 166 statements ~= table.insertIntoParameter; 167 statements ~= table.updateParameter; 168 statements ~= table.deleteFromParameter; 169 statements ~= table.updateDecorationsParameter; 170 statements ~= table.selectFrom; 171 // update the 'indexStatementFor' below in this module... 172 jsStatements ~= table.pkValuesJs; 173 jsStatements ~= table.pkValuesWhereJs; 174 } 175 return MarsServerConfiguration( schema, createDatabase(schema), statements, jsStatements ); 176 } 177 178 MarsServerConfiguration configuration; 179 180 private void startDatabaseHandler(){ 181 182 import vibe.core.core : runTask; 183 import vibe.core.log : logInfo; 184 185 logInfo("mars - database handler starting."); 186 187 //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); } 188 if( databaseHandler == Task.init ) databaseHandler = runTask(&handleDatabase); 189 } 190 Task databaseHandler; 191 192 193 /** 194 gestisci le cose se succedono a livello db, come push per i client mars */ 195 void handleDatabase() 196 { 197 import std.algorithm : sort; 198 import std.datetime : seconds; 199 import vibe.core.core : sleep; 200 import vibe.core.log : logInfo; 201 202 while(true) { 203 sleep(2.seconds); 204 logInfo("mars - database handler starting to check for sync...%s", Task.getThis()); 205 206 clientLoop: foreach(ref client; marsClients ){ 207 if( client.isConnected && client.authorised && client.db !is null ){ 208 bool syncStarted = false; 209 logInfo("mars - database operations for client %s", client.id); 210 auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting); 211 foreach( table; tables ){ 212 auto clientTable = table.clientSideTables[client.id]; 213 //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name); 214 foreach(op; clientTable.ops){ 215 if( ! syncStarted ){ 216 syncStarted = true; 217 client.sendRequest(req); 218 logInfo("mars - database operations for client %s sync started", client.id); 219 if( ! client.isConnected ){ 220 logInfo("mars - the client %s seems disconnected, continuing with another client", client.id); 221 continue clientLoop; 222 } 223 } 224 logInfo("mars - executing database operation for client %s", client.id); 225 op.execute(client.db, &client, clientTable, table); 226 if( ! client.isConnected ){ 227 logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id); 228 continue clientLoop; 229 } 230 } 231 clientTable.ops = []; // XXX gestisci le singole failure... 232 } 233 if( syncStarted ){ 234 req.operation = SyncOperationReq.SyncOperation.completed; 235 client.sendRequest(req); 236 logInfo("mars - database operations for client %s sync completed", client.id); 237 } 238 } 239 } 240 foreach( table; tables ){ table.unsafeReset(); } 241 } 242 } 243 244 private { 245 MarsClient[string] marsClients; 246 public BaseServerSideTable!(MarsClient*)[] tables; 247 } 248 249 } 250 __gshared MarsServer marsServer; 251 252 // adjust the same function in mars.ts server 253 ulong indexStatementFor(ulong table, string op){ 254 enum ops = 5; // XXX 255 if (op == "insert"){ return table * ops + 0; } 256 else if(op == "update"){ return table * ops + 1; } 257 else if(op == "delete"){ return table * ops + 2; } 258 else if(op == "updateDecorations"){ return table * ops + 3; } 259 else if(op == "select"){ return table * ops + 4; } 260 assert(false, "unknown ops!"); 261 } 262 enum jsIndexStatementFor = `( 263 function a(table, op) 264 { 265 const ops = 5; 266 if (op == "insert"){ return table * ops + 0; } 267 else if(op == "update"){ return table * ops + 1; } 268 else if(op == "delete"){ return table * ops + 2; } 269 else if(op == "updateDecorations"){ return table * ops + 3; } 270 else if(op == "select"){ return table * ops + 4; } 271 alert("unknown ops!"); 272 }) 273 `; 274 275 struct MarsServerConfiguration 276 { 277 immutable(Schema) schemaExposed; 278 string alasqlCreateDatabase; 279 immutable(string)[] alasqlStatements; 280 immutable(string)[] jsStatements; 281 immutable string[] serverMethods; 282 283 284 immutable DatabaseService databaseService; 285 286 string pgsqlUser, pgsqlPassword; // for autologin 287 } 288 289 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){ 290 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 291 methods.idup, c.databaseService, c.pgsqlUser, c.pgsqlPassword); 292 } 293 294 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){ 295 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 296 c.serverMethods, DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword); 297 } 298 299 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){ 300 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.jsStatements, 301 c.serverMethods, c.databaseService, login, password); 302 }