1 2 3 /** 4 gestiamo la parte di sincronia tra server e le varie tavole associate ai client. 5 */ 6 7 module mars.sync; 8 9 import std.algorithm; 10 import std.meta; 11 import std.typecons; 12 import std.experimental.logger; 13 import std.format; 14 import std.conv; 15 16 import mars.defs; 17 import mars.pgsql; 18 import mars.msg; 19 import mars.server; 20 21 class BaseServerSideTable(ClientT) 22 { 23 alias ClientType = ClientT; 24 25 this(immutable(Table) definition){ 26 this.definition = definition; 27 } 28 29 auto createClientSideTable(string clientid){ 30 auto cst = new ClientSideTable!ClientT(); 31 final switch(cst.strategy) with(Strategy) { 32 case easilySyncAll: 33 cst.ops ~= new ClientImportValues!ClientT(); 34 } 35 // new client, new client side table 36 assert( (clientid in clientSideTables) is null ); 37 clientSideTables[clientid] = cst; 38 39 return cst; 40 } 41 42 abstract immutable(ubyte)[] packRows(size_t offset = 0, size_t limit = long.max); 43 abstract immutable(ubyte)[] packRows(Database db, size_t offset = 0, size_t limit = long.max); 44 abstract size_t count() const; 45 abstract size_t count(Database) const; 46 abstract size_t countRowsToInsert() const; 47 abstract size_t countRowsToUpdate() const; 48 abstract size_t countRowsToDelete() const; 49 abstract size_t index() const; 50 abstract immutable(ubyte)[] packRowsToInsert(); 51 abstract immutable(ubyte)[] packRowsToUpdate(); 52 abstract immutable(ubyte)[] packRowsToDelete(); 53 54 abstract immutable(ubyte)[][2] insertRecord(Database, immutable(ubyte)[], ref InsertError); 55 abstract immutable(ubyte)[] deleteRecord(Database, immutable(ubyte)[], ref DeleteError); 56 57 abstract void unsafeReset(); 58 59 immutable Table definition; 60 private { 61 62 /// Every server table has a collection of the linked client side tables. The key element is the identifier of 63 /// the client, so that the collection can be kept clean when a client connect/disconnects. 64 public ClientSideTable!(ClientT)*[string] clientSideTables; 65 66 //public SynOp!ClientT[] ops; 67 68 } 69 } 70 71 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT 72 { 73 enum Definition = table; 74 enum Columns = table.columns; 75 76 alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns... 77 alias ColumnsStruct = asStruct!table; 78 alias KeysStruct = asPkStruct!table; 79 80 this() { super(table); } 81 82 // interface needed to handle the records in a generic way ... 83 84 /// returns the total number of records we are 'talking on' (filters? query?) 85 deprecated override size_t count() const { return fixtures.length; } 86 override size_t count(Database db) const { 87 static if( table.durable ){ 88 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name)); 89 } 90 else { 91 return fixtures.length; 92 } 93 } 94 //static if( ! table.durable ){ // XXX 95 override size_t countRowsToInsert() const { return toInsert.length; } 96 override size_t countRowsToUpdate() const { return toUpdate.length; } 97 override size_t countRowsToDelete() const { return toDelete.length; } 98 //} 99 100 /// return the unique index identifier for this table, that's coming from the table definition in the app.d 101 override size_t index() const { return Definition.index; } 102 103 /// returns 'limit' rows starting from 'offset'. 104 deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const { 105 size_t till = (limit + offset) > count ? count : (limit + offset); 106 return fixtures.values()[offset .. till]; 107 } 108 /// returns 'limit' rows starting from 'offset'. 109 auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const { 110 static if(table.durable){ 111 auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format( 112 table.name, limit, offset) 113 ); 114 asStruct!table[] rows; 115 foreach(v; resultSet){ 116 rows ~= v; 117 //import std.stdio; writeln("selectRows:", v); 118 // XXX 119 } 120 resultSet.close(); 121 return rows; 122 } 123 else { 124 size_t till = (limit + offset) > count(db) ? count(db) : (limit + offset); 125 return fixtures.values()[offset .. till]; 126 } 127 } 128 129 /// insert a new row in the server table, turning clients table out of sync 130 deprecated void insertRow(ColumnsStruct fixture){ 131 KeysStruct keys = pkValues!(table)(fixture); 132 fixtures[keys] = fixture; 133 toInsert[keys] = fixture; 134 foreach(ref cst; clientSideTables.values){ 135 cst.ops ~= new ClientInsertValues!ClientT(); 136 } 137 } 138 139 /// insert a new row in the server table, turning clients table out of sync 140 ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err){ 141 static if(table.durable){ 142 auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err); 143 KeysStruct keys = pkValues!table(record); 144 toInsert[keys] = record; 145 } 146 else { 147 auto inserted = record; 148 KeysStruct keys = pkValues!table(record); 149 fixtures[keys] = record; 150 toInsert[keys] = record; 151 } 152 foreach(ref cst; clientSideTables.values){ 153 cst.ops ~= new ClientInsertValues!ClientT(); 154 } 155 return inserted; 156 } 157 158 override immutable(ubyte)[][2] insertRecord(Database db, immutable(ubyte)[] data, ref InsertError err){ 159 import msgpack : pack, unpack, MessagePackException; 160 ColumnsStruct record; 161 try { 162 record = unpack!(ColumnsStruct, true)(data); 163 } 164 catch(MessagePackException exc){ 165 errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name); 166 errorf(exc.toString); 167 err = InsertError.unknownError; 168 return [[], []]; 169 } 170 ColumnsStruct inserted = insertRecord(db, record, err); 171 return [ 172 inserted.pack!(true).idup, 173 record.pkParamValues!table().pack!(true).idup // clientKeys 174 ]; 175 } 176 177 override immutable(ubyte)[] deleteRecord(Database db, immutable(ubyte)[] data, ref DeleteError err){ 178 import msgpack : pack, unpack, MessagePackException; 179 asStruct!table record; 180 try { 181 record = unpack!(ColumnsStruct, true)(data); 182 } 183 catch(MessagePackException exc){ 184 errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name); 185 errorf(exc.toString); 186 err = DeleteError.unknownError; 187 return data; 188 } 189 KeysStruct keys = record.pkValues!table(); 190 deleteRecord(db, keys, err); 191 if( err != DeleteError.deleted ) return data; 192 return []; 193 } 194 195 KeysStruct deleteRecord(Database db, KeysStruct keys, ref DeleteError err){ 196 static if(table.durable){ 197 db.executeDelete!(table, KeysStruct)(keys, err); 198 toDelete[keys] = 0; 199 } 200 else { 201 fixtures.remove(keys); 202 toDelete[keys] = 0; 203 } 204 foreach(ref cst; clientSideTables.values){ 205 cst.ops ~= new ClientDeleteValues!ClientT(); 206 } 207 return keys; 208 } 209 210 /// update row in the server table, turning the client tables out of sync 211 deprecated void updateRow(KeysStruct keys, ColumnsStruct record){ 212 //KeysStruct keys = pkValues!table(record); 213 auto v = keys in toInsert; 214 if( v !is null ){ 215 *v = record; 216 assert( (keys in toUpdate) is null ); 217 } 218 else { 219 v = keys in toUpdate; 220 if( v !is null ){ 221 *v = record; 222 } 223 else { 224 toUpdate[keys] = record; 225 } 226 } 227 fixtures[keys] = record; 228 foreach(ref cst; clientSideTables.values){ 229 cst.ops ~= new ClientUpdateValues!ClientT(); 230 } 231 } 232 233 /// update row in the server table, turning the client tables out of sync 234 void updateRow(Database db, KeysStruct keys, ColumnsStruct record){ 235 static if( table.durable ){ 236 import msgpack : pack; 237 238 db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record); 239 auto v = keys in toInsert; 240 if( v !is null ){ 241 *v = record; 242 assert( (keys in toUpdate) is null ); 243 } 244 else { 245 v = keys in toUpdate; 246 if( v !is null ){ 247 *v = record; 248 } 249 else { 250 toUpdate[keys] = record; 251 } 252 } 253 } 254 else { 255 //KeysStruct keys = pkValues!table(record); 256 auto v = keys in toInsert; 257 if( v !is null ){ 258 *v = record; 259 assert( (keys in toUpdate) is null ); 260 } 261 else { 262 v = keys in toUpdate; 263 if( v !is null ){ 264 *v = record; 265 } 266 else { 267 toUpdate[keys] = record; 268 } 269 } 270 fixtures[keys] = record; 271 } 272 foreach(ref cst; clientSideTables.values){ 273 cst.ops ~= new ClientUpdateValues!ClientT(); 274 } 275 } 276 277 /// returns the packet selected rows 278 override immutable(ubyte)[] packRows(size_t offset = 0, size_t limit = long.max) const { 279 import msgpack : pack; 280 return pack!(true)(selectRows(null, offset, limit)).idup; 281 } 282 /// returns the packet selected rows 283 override immutable(ubyte)[] packRows(Database db, size_t offset = 0, size_t limit = long.max) const { 284 import msgpack : pack; 285 return pack!(true)(selectRows(db, offset, limit)).idup; 286 } 287 288 /// return the packet rows to insert in the client 289 override immutable(ubyte)[] packRowsToInsert() { 290 import msgpack : pack; 291 auto packed = pack!(true)(toInsert.values()).idup; 292 //toInsert = null; can't reset... this is called for every client 293 return packed; 294 } 295 296 /// return the packet rows to delete in the client 297 override immutable(ubyte)[] packRowsToDelete() { 298 import msgpack : pack; 299 auto packed = pack!(true)(toDelete.keys()).idup; 300 //toInsert = null; can't reset... this is called for every client 301 return packed; 302 } 303 304 /// return the packet rows to update in the client 305 override immutable(ubyte)[] packRowsToUpdate() { 306 static struct UpdateRecord { 307 KeysStruct keys; 308 asStruct!table record; 309 } 310 UpdateRecord[] records; 311 foreach(r; toUpdate.keys){ 312 records ~= UpdateRecord(r, toUpdate[r]); 313 } 314 315 import msgpack : pack; 316 auto packed = pack!(true)(records).idup; 317 //toUpdate = null; can't reset... this is called for every client 318 return packed; 319 } 320 321 void loadFixture(ColumnsStruct fixture){ 322 KeysStruct keys = pkValues!table(fixture); 323 fixtures[keys] = fixture; 324 } 325 326 override void unsafeReset() { 327 //fixtures = null; 328 toInsert = null; 329 toUpdate = null; 330 toDelete = null; 331 } 332 333 //static if( ! table.durable ){ 334 asStruct!(table)[asPkStruct!(table)] fixtures; 335 asStruct!(table)[asPkStruct!(table)] toInsert; 336 asStruct!(table)[asPkStruct!(table)] toUpdate; 337 int[asPkStruct!(table)] toDelete; 338 339 // ... record inserted client side, already patched and inserted for this client. 340 //asStruct!(table)[string] notToInsert; 341 //} 342 } 343 344 345 struct ClientSideTable(ClientT) 346 { 347 private { 348 Strategy strategy = Strategy.easilySyncAll; 349 public SynOp!ClientT[] ops; 350 } 351 } 352 353 private 354 { 355 enum Strategy { easilySyncAll } 356 357 class SynOp(MarsClientT) { 358 abstract void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst); 359 abstract void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst); 360 } 361 362 /// take all the rows in the server table and send them on the client table. 363 class ClientImportValues(MarsClientT) : SynOp!MarsClientT { 364 365 override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 366 { 367 assert(db !is null); 368 369 // ... if the table is empty, simply do nothing ... 370 if( sst.count(db) > 0 ){ 371 auto payload = sst.packRows(db); 372 373 auto req = ImportRecordsReq(); with(req){ 374 tableIndex = sst.index; 375 statementIndex = indexStatementFor(sst.index, "insert"); 376 encodedRecords = payload; 377 } 378 marsClient.sendRequest(req); 379 auto rep = marsClient.receiveReply!ImportRecordsRep(); 380 } 381 } 382 override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 383 { 384 import mars.msg : ImportValuesRequest; 385 import std.conv : to; 386 387 // ... if the table is empty, simply do nothing ... 388 if( sst.count > 0 ){ 389 auto payload = sst.packRows(); 390 391 auto req = ImportRecordsReq(); with(req){ 392 tableIndex =sst.index; 393 statementIndex = indexStatementFor(sst.index, "insert"); 394 encodedRecords = payload; 395 } 396 marsClient.sendRequest(req); 397 auto rep = marsClient.receiveReply!ImportRecordsRep(); 398 } 399 } 400 } 401 402 class ClientInsertValues(MarsClientT) : SynOp!MarsClientT { 403 404 override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 405 { 406 if( sst.countRowsToInsert > 0 ){ 407 auto payload = sst.packRowsToInsert(); 408 auto req = InsertRecordsReq(); with(req){ 409 tableIndex = sst.index; 410 statementIndex = indexStatementFor(sst.index, "insert"); 411 encodedRecords = payload; 412 } 413 marsClient.sendRequest(req); 414 auto rep = marsClient.receiveReply!InsertRecordsRep(); 415 } 416 } 417 override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 418 { 419 if( sst.countRowsToInsert > 0 ){ 420 auto payload = sst.packRowsToInsert(); 421 auto req = InsertRecordsReq(); with(req){ 422 tableIndex = sst.index; 423 statementIndex = indexStatementFor(sst.index, "insert"); 424 encodedRecords = payload; 425 } 426 marsClient.sendRequest(req); 427 auto rep = marsClient.receiveReply!InsertRecordsRep(); 428 } 429 } 430 } 431 432 class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT { 433 434 override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 435 { 436 if( sst.countRowsToDelete > 0 ){ 437 auto payload = sst.packRowsToDelete(); 438 auto req = DeleteRecordsReq(); with(req){ 439 tableIndex = sst.index; 440 statementIndex = indexStatementFor(sst.index, "delete").to!int; 441 encodedRecords = payload; 442 } 443 marsClient.sendRequest(req); 444 auto rep = marsClient.receiveReply!DeleteRecordsRep(); 445 } 446 } 447 override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst){ 448 if( sst.countRowsToDelete > 0 ){ 449 auto payload = sst.packRowsToDelete(); 450 auto req = DeleteRecordsReq(); with(req){ 451 tableIndex = sst.index; 452 statementIndex = indexStatementFor(sst.index, "delete").to!int; 453 encodedRecords = payload; 454 } 455 marsClient.sendRequest(req); 456 auto rep = marsClient.receiveReply!DeleteRecordsRep(); 457 } 458 } 459 } 460 461 class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT { 462 463 override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 464 { 465 import mars.msg : UpdateValuesRequest; 466 import std.conv :to; 467 468 if( sst.countRowsToUpdate > 0 ){ 469 auto payload = sst.packRowsToUpdate(); 470 auto req = UpdateValuesRequest(); 471 req.statementIndex = indexStatementFor(sst.index, "update").to!int; 472 req.bytes = payload; 473 marsClient.sendRequest(req); 474 } 475 } 476 override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst){} 477 } 478 479 class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT { 480 override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst, BaseServerSideTable!MarsClientT sst){ 481 482 } 483 } 484 } 485 486 version(unittest) 487 { 488 struct MarsClientMock { void sendRequest(R)(R r){} } 489 } 490 unittest 491 { 492 /+ 493 import std.range : zip; 494 495 496 auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []); 497 auto sst = new ServerSideTable!(MarsClientMock, t1); 498 zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) ); 499 500 auto cst = sst.createClientSideTable(); 501 // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ... 502 assert( cst.strategy == Strategy.easilySyncAll ); 503 // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati.... 504 assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null ); 505 // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances. 506 auto op = sst.ops[$-1]; 507 op.execute(MarsClientMock(), cst, sst); 508 509 // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1 510 sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z")); 511 assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") ); 512 +/ 513 } 514 /+ 515 unittest 516 { 517 version(starwars){ 518 import mars.starwars; 519 enum schema = starwarsSchema(); 520 521 auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]); 522 auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]); 523 auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars"); 524 AuthoriseError err; 525 auto db = databaseService.connect("jedi", "force", err); 526 db.executeUnsafe("begin transaction"); 527 528 auto rows = people.selectRows(db); 529 assert( rows[0] == luke, rows[0].to!string ); 530 531 auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80); 532 InsertError ierr; 533 auto inserted = people.insertRecord(db, paolo, ierr); 534 assert(inserted == paolo); 535 536 537 //import std.stdio; 538 //foreach(row; rows) writeln("---->>>>>", row); 539 //assert(false); 540 } 541 } 542 +/ 543