1 /** 2 * the `sync` module operate on a generic database, to support mock. The module is feeded with a concrete db 3 * like the `pgsql`. 4 */ 5 6 module mars.server; 7 8 import std.experimental.logger; 9 10 import mars.client; 11 import mars.defs; 12 13 import mars.sync; 14 import mars.pgsql; 15 import mars.msg; 16 17 import vibe.core.log; 18 import vibe.data.json; 19 20 void InstantiateTables(alias tables, F...)(MarsServer m, F fixtures) { 21 static assert( tables.length > 0 && fixtures.length > 0, "every table must have at least an empty fixtures array"); 22 InstantiateServerSideTable!(tables[0], typeof(fixtures[0]))(m, fixtures[0]); 23 static if( tables.length > 1 ){ 24 InstantiateTables!(tables[1 .. $])(m, fixtures[1 .. $]); 25 } 26 } 27 28 void InstantiateServerSideTable(immutable(Table) table, Fixtures)(MarsServer m, Fixtures fixtures){ 29 auto serverSideTable = new ServerSideTable!(MarsClient*, table)(); 30 m.tables ~= serverSideTable; 31 32 static if( ! is(Fixtures == void[]) ){ // ... no fixtures, empty array of void ... 33 foreach(fixture; fixtures){ 34 serverSideTable.loadFixture(serverSideTable.ColumnsStruct(fixture.expand)); 35 } 36 } 37 } 38 39 40 auto serverSideTable(immutable(Table) table)(MarsServer m){ 41 foreach( t; m.tables ){ 42 if( t.definition.name == table.name ){ 43 auto c = cast(ServerSideTable!(MarsClient*, table))(t); 44 return c; 45 } 46 } 47 assert(false); 48 } 49 50 class MarsServer 51 { 52 /// Register the client between the connected clients 53 MarsClient* engageClient(string clientId) 54 { 55 auto client = clientId in marsClients; 56 if( client is null ){ 57 marsClients[clientId] = MarsClient(clientId, configuration.databaseService); 58 client = clientId in marsClients; 59 } 60 assert( client !is null ); 61 client.connected(); 62 return client; 63 } 64 65 void disposeClient(MarsClient* client) in { 66 assert(client !is null); assert(client.id in marsClients); 67 } body { 68 client.disconnected(); 69 } 70 71 /// Create the server side tables for the client, preparing the initial sync op. 72 /// Callee: protoauth. 73 void createClientSideTablesFor(MarsClient* client){ 74 // ... the tables that are exposed in the schema ... 75 foreach(ref table; tables){ 76 /*client.tables[table.definition.name] =*/ table.createClientSideTable(client.id); 77 } 78 //logInfo("mars - created %d client side tables", client.tables.length); 79 // XXX questo è da ripensare con un meccanismo generico 80 client.serverSideMethods = this.serverSideMethods; 81 } 82 83 // The mars protocol has completed the handshake and setup, request can be sent and received. 84 /// Called by 'protomars' module. 85 void onMarsProtocolReady(MarsClient* client){ 86 87 //... connect to the DB if autologin is enabled server side 88 if( configuration.pgsqlUser != "" ){ 89 auto dbAuthorised = client.authoriseUser(configuration.pgsqlUser, configuration.pgsqlPassword); 90 if( dbAuthorised != AuthoriseError.authorised ) 91 throw new Exception("Server autologin enabled, but can't authorise with postgreSQL"); 92 auto request = AutologinReq(); with(request){ 93 username = configuration.pgsqlUser; 94 sqlCreateDatabase = configuration.alasqlCreateDatabase; 95 sqlStatements = configuration.alasqlStatements; 96 } 97 logInfo("mars - S --> C | autologin request, autologin for %s", configuration.pgsqlUser); 98 client.sendRequest(request); 99 createClientSideTablesFor(client); 100 } 101 startDatabaseHandler(); 102 } 103 104 105 void broadcast(M)(M message) 106 { 107 import std.experimental.logger : trace; 108 109 //trace("tracing..."); 110 foreach(clientId, marsClient; marsClients) 111 { 112 113 //trace("tracing..."); 114 if( marsClient.isConnected() ) marsClient.sendBroadcast(message); 115 //trace("tracing..."); 116 } 117 } 118 119 /** 120 Returns: a pointer to the mars client with that id, or null if it does not exists. */ 121 MarsClient* getClient(string clientId){ return clientId in marsClients; } 122 123 string delegate(MarsClient, string, Json) serverSideMethods; 124 125 // ================ 126 this(immutable(MarsServerConfiguration) c){ 127 assert(marsServer is null); 128 marsServer = this; 129 configuration = c; 130 } 131 132 133 static MarsServerConfiguration ExposeSchema(immutable(Schema) schema) 134 { 135 import mars.alasql : createDatabase, insertIntoParameter, updateParameter, deleteFromParameter; 136 137 immutable(string)[] statements; 138 foreach(table; schema.tables){ 139 statements ~= table.insertIntoParameter; 140 statements ~= table.updateParameter; 141 statements ~= table.deleteFromParameter; 142 } 143 return MarsServerConfiguration( schema, createDatabase(schema), statements ); 144 } 145 146 MarsServerConfiguration configuration; 147 148 private void startDatabaseHandler(){ 149 150 import vibe.core.core : runTask; 151 import vibe.core.log : logInfo; 152 153 logInfo("mars - database handler starting."); 154 155 //foreach(t; tables){ logInfo("mars - exposing table %s to clients", t); } 156 runTask(&handleDatabase); 157 } 158 159 160 /** 161 gestisci le cose se succedono a livello db, come push per i client mars */ 162 void handleDatabase() 163 { 164 import std.algorithm : sort; 165 import std.datetime : seconds; 166 import vibe.core.core : sleep; 167 import vibe.core.log : logInfo; 168 169 while(true) { 170 sleep(2.seconds); 171 172 clientLoop: foreach(ref client; marsClients ){ 173 if( client.isConnected && client.authorised && client.db !is null ){ 174 bool syncStarted = false; 175 //logInfo("mars - database operations for client %s", client.id); 176 auto req = SyncOperationReq(SyncOperationReq.SyncOperation.starting); 177 foreach( table; tables ){ 178 auto clientTable = table.clientSideTables[client.id]; 179 //logInfo("mars - database operations for client %s table %s", client.id, table.definition.name); 180 foreach(op; clientTable.ops){ 181 if( ! syncStarted ){ 182 syncStarted = true; 183 client.sendRequest(req); 184 if( ! client.isConnected ){ 185 logInfo("mars - the client %s seems disconnected, continuing with another client", client.id); 186 continue clientLoop; 187 } 188 } 189 logInfo("mars - executing database operation for client %s", client.id); 190 op.execute(client.db, &client, clientTable, table); 191 if( ! client.isConnected ){ 192 logInfo("mars - the client %s seems disconnected after some operation, continuing with another client", client.id); 193 continue clientLoop; 194 } 195 } 196 clientTable.ops = []; // XXX gestisci le singole failure... 197 } 198 if( syncStarted ){ 199 req.operation = SyncOperationReq.SyncOperation.completed; 200 client.sendRequest(req); 201 } 202 } 203 } 204 foreach( table; tables ){ table.unsafeReset(); } 205 } 206 } 207 208 private { 209 MarsClient[string] marsClients; 210 public BaseServerSideTable!(MarsClient*)[] tables; 211 } 212 213 } 214 __gshared MarsServer marsServer; 215 216 ulong indexStatementFor(ulong table, string op){ 217 enum ops = 3; 218 if (op == "insert"){ return table * ops + 0; } 219 else if(op == "update"){ return table * ops + 1; } 220 else if(op == "delete"){ return table * ops + 2; } 221 assert(false, "unknown ops!"); 222 } 223 224 struct MarsServerConfiguration 225 { 226 immutable(Schema) schemaExposed; 227 string alasqlCreateDatabase; 228 immutable(string)[] alasqlStatements; 229 immutable string[] serverMethods; 230 231 232 immutable DatabaseService databaseService; 233 234 string pgsqlUser, pgsqlPassword; // for autologin 235 } 236 237 static MarsServerConfiguration ExposeServerMethods(MarsServerConfiguration c, const string[] methods){ 238 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, methods.idup, 239 c.databaseService, c.pgsqlUser, c.pgsqlPassword); 240 } 241 242 MarsServerConfiguration PostgreSQL(MarsServerConfiguration c, const string host, const ushort port, const string db){ 243 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.serverMethods, 244 DatabaseService(host, port, db), c.pgsqlUser, c.pgsqlPassword); 245 } 246 247 MarsServerConfiguration Autologin(MarsServerConfiguration c, const string login, const string password){ 248 return MarsServerConfiguration(c.schemaExposed, c.alasqlCreateDatabase, c.alasqlStatements, c.serverMethods, c.databaseService, 249 login, password); 250 }