1 2 3 /** 4 gestiamo la parte di sincronia tra server e le varie tavole associate ai client. 5 6 7 */ 8 module mars.sync; 9 10 enum d = 11 ` 12 BaseServerSideTable 13 clientSideTables[clientId] --- SyncOps[] 14 `; 15 16 import std.algorithm; 17 import std.conv; 18 import std.datetime; 19 import std.format; 20 import std.meta; 21 import std.typecons; 22 import std.variant; 23 24 import std.experimental.logger; 25 26 import msgpack; 27 28 import mars.defs; 29 import mars.pgsql; 30 import mars.msg; 31 import mars.server : indexStatementFor; 32 version(unittest) import mars.starwars; 33 34 alias Bytes = immutable(ubyte)[]; 35 36 /** 37 A server side table is instantiated only once per marsServer, and they are stored into the 'tables' 38 structure of the marsServer. 39 40 The instantiation usually is inside the application code: `InstantiateTables!(ctTables)(marsServer, [], [], [], [])` 41 */ 42 class BaseServerSideTable(ClientT) 43 { 44 alias ClientType = ClientT; 45 46 this(immutable(Table) definition){ 47 this.definition = definition; 48 } 49 50 auto createClientSideTable(string clientid){ 51 auto cst = new ClientSideTable!ClientT(); 52 cst.strategy = definition.cacheRows? Strategy.easilySyncAll : Strategy.easilySyncNone; 53 final switch(cst.strategy) with(Strategy) { 54 case easilySyncAll: 55 cst.ops ~= new ClientImportValues!ClientT(); 56 break; 57 case easilySyncNone: 58 break; 59 } 60 // new client, new client side table 61 assert( (clientid in clientSideTables) is null, format("cliendid:%s, clientSideTables:%s", 62 clientid, clientSideTables.keys() ) ); 63 clientSideTables[clientid] = cst; 64 65 return cst; 66 } 67 68 auto wipeClientSideTable(string clientid){ 69 assert( (clientid in clientSideTables) !is null, clientid ); 70 clientSideTables.remove(clientid); 71 } 72 73 /// execute a sql select statement, and returns a vibe json array with the records as json 74 auto selectAsJson(Database db, string sqlSelect, Variant[string] parameters) in { 75 assert(db !is null); 76 } body { 77 import std.math : isNaN; 78 import vibe.data.json; 79 80 auto resultSet = db.executeQueryUnsafe(sqlSelect, parameters); 81 scope(exit) resultSet.close(); 82 83 Json jsonRecords = Json.emptyArray; 84 foreach(variantRow; resultSet){ 85 Json jsonRow = Json.emptyArray; 86 foreach(i, variantField; variantRow){ 87 if(variantField.type == typeid(int)){ jsonRow ~= variantField.get!int; } 88 else if(variantField.type == typeid(short)){ jsonRow ~= variantField.get!short; } 89 else if(variantField.type == typeid(float)){ jsonRow ~= variantField.get!float; } 90 else if(variantField.type == typeid(long)){ jsonRow ~= variantField.get!long; } 91 else if(variantField.type == typeid(string)){ jsonRow ~= variantField.get!string; } 92 else if(variantField.type == typeid(ubyte[])){ 93 // ... if I apply directly the '=~', the arrays are flattened! 94 jsonRow ~= 0; 95 jsonRow[jsonRow.length-1] = variantField.get!(ubyte[]).serializeToJson; 96 } 97 else { 98 import std.stdio; writeln(variantField.type); 99 assert(false); 100 } 101 } 102 jsonRecords ~= 0; jsonRecords[jsonRecords.length-1] = jsonRow; 103 } 104 return jsonRecords; 105 } 106 version(unittest_starwars){ 107 unittest { 108 AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err); 109 auto table = new WhiteHole!BaseServerSideTable(Table()); 110 auto json = table.selectAsJson(db, "select * from people", null); 111 //import std.stdio; writeln(json.toPrettyString()); 112 assert(json[0][0] == "Luke"); 113 } 114 unittest { 115 import std.math : isNaN; 116 import vibe.data.json : Json; 117 118 AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err); 119 auto table = new WhiteHole!BaseServerSideTable(Table()); 120 auto json = table.selectAsJson(db, "select * from starships", null); 121 assert(json[0][0] == "NanShip"); 122 // here we have a BIG problem: NaN is not encodable in Json! The value is 'null'! 123 // assert(json[0][1].get!float().isNaN); 124 } 125 } 126 127 abstract Bytes packRows(size_t offset = 0, size_t limit = long.max); 128 abstract Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max); 129 //abstract size_t count() const; 130 abstract size_t count(Database) const; 131 abstract size_t countRowsToInsert() const; 132 abstract size_t countRowsToUpdate() const; 133 abstract size_t countRowsToDelete() const; 134 abstract size_t index() const; 135 abstract Bytes packRowsToInsert(); 136 abstract Bytes packRowsToUpdate(); 137 abstract Bytes packRowsToDelete(); 138 139 abstract Bytes[2] insertRecord(Database, immutable(ubyte)[], ref InsertError, string, string); 140 abstract void updateRecord(Database, Bytes, immutable(ubyte)[], ref RequestState, string); 141 abstract Bytes deleteRecord(Database, immutable(ubyte)[], ref DeleteError, string, string); 142 143 abstract void unsafeReset(); 144 145 immutable Table definition; 146 private { 147 148 /// Every server table has a collection of the linked client side tables. The key element is the identifier of 149 /// the client, so that the collection can be kept clean when a client connect/disconnects. 150 public ClientSideTable!(ClientT)*[string] clientSideTables; 151 152 //public SynOp!ClientT[] ops; 153 154 } 155 } 156 157 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT 158 { 159 enum Definition = table; 160 enum Columns = table.columns; 161 162 alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns... 163 alias ColumnsStruct = asStruct!table; 164 alias KeysStruct = asPkStruct!table; 165 166 this() { super(table); } 167 168 // interface needed to handle the records in a generic way ... 169 170 /// returns the total number of records we are 'talking on' (filters? query?) 171 //deprecated override size_t count() const { return fixtures.length; } 172 override size_t count(Database db) const { 173 import ddb.postgres : ServerErrorException; 174 175 static if( table.durable ){ 176 try { 177 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name)); 178 } 179 catch(ServerErrorException e){ 180 switch(e.code){ 181 case "42501": // permission denied for relation "relation" 182 infof("Permission denied:%s", e.toString()); 183 return 0; 184 default: 185 warningf("mars - x ... x - Unhandled PostgreSQL exception during 'count'"); 186 info(e.toString()); 187 throw e; 188 } 189 } 190 } 191 else { 192 return fixtures.length; 193 } 194 } 195 //static if( ! table.durable ){ // XXX 196 override size_t countRowsToInsert() const { return toInsert.length; } 197 override size_t countRowsToUpdate() const { return toUpdate.length; } 198 override size_t countRowsToDelete() const { return toDelete.length; } 199 //} 200 201 /// return the unique index identifier for this table, that's coming from the table definition in the app.d 202 override size_t index() const { return Definition.index; } 203 204 /// returns 'limit' rows starting from 'offset'. 205 deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const { 206 size_t till = (limit + offset) > count(null) ? count(null) : (limit + offset); 207 return fixtures.values()[offset .. till]; 208 } 209 /// returns 'limit' rows starting from 'offset'. 210 auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const { 211 static if(table.durable){ 212 auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format( 213 table.name, limit, offset) 214 ); 215 static if( Definition.decorateRows ){ 216 asSyncStruct!table[] rows; 217 foreach(vr; resultSet){ 218 asStruct!table v = vr; 219 asSyncStruct!table r; 220 assignCommonFields!(typeof(r), typeof(v))(r, v); 221 r.mars_who = "automation@server"; 222 r.mars_what = "imported"; 223 r.mars_when = Clock.currTime.toString(); 224 rows ~= r; 225 } 226 } 227 else { 228 asStruct!table[] rows; 229 foreach(v; resultSet){ 230 rows ~= v; 231 } 232 } 233 234 235 resultSet.close(); 236 return rows; 237 } 238 else { 239 size_t till = (limit + offset) > count(db) ? count(db) : (limit + offset); 240 return fixtures.values()[offset .. till]; 241 } 242 } 243 244 /// insert a new row in the server table, turning clients table out of sync 245 deprecated void insertRow(ColumnsStruct fixture){ 246 KeysStruct keys = pkValues!(table)(fixture); 247 fixtures[keys] = fixture; 248 static if(table.decorateRows){ 249 asSyncStruct!table rec; 250 assignCommonFields(rec, fixture); 251 with(rec){ mars_who = "automation@server"; mars_what = "inserted"; mars_when = Clock.currTime.toString(); } 252 } 253 else auto rec = fixture; 254 toInsert[keys] = rec; 255 foreach(ref cst; clientSideTables.values){ 256 cst.ops ~= new ClientInsertValues!ClientT(); 257 } 258 } 259 260 /// insert a new row in the server table, turning clients table out of sync 261 ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err, string username, string clientId){ 262 static if(table.durable){ 263 auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err); 264 } else { 265 fixtures[pkValues!table(record)] = record; 266 auto inserted = record; 267 err = InsertError.inserted; 268 } 269 if( err == InsertError.inserted ){ 270 static if(table.decorateRows){ 271 asSyncStruct!table rec; 272 assignCommonFields(rec, inserted); 273 with(rec){ 274 mars_who = username ~ "@" ~ clientId; mars_what = "inserted"; 275 mars_when = Clock.currTime.toString(); 276 } 277 } 278 else { 279 auto rec = inserted; 280 } 281 toInsert[pkValues!table(inserted)] = rec; 282 // ... don't propagate if not cached, or we are triggering a loot of refresh 283 // stick with manual refresh done on client. 284 if(table.cacheRows){ 285 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){ 286 auto cst = key in clientSideTables; 287 (*cst).ops ~= new ClientInsertValues!ClientT(); 288 } 289 } 290 } 291 return inserted; 292 } 293 294 override Bytes[2] 295 insertRecord(Database db, Bytes data, ref InsertError err, string username, string clientId){ 296 import msgpack : pack, unpack, MessagePackException; 297 ColumnsStruct record; 298 try { 299 record = unpack!(ColumnsStruct, true)(data); 300 } 301 catch(MessagePackException exc){ 302 errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name); 303 errorf(exc.toString); 304 err = InsertError.unknownError; 305 return [[], []]; 306 } 307 ColumnsStruct inserted = insertRecord(db, record, err, username, clientId); 308 return [ 309 inserted.pack!(true).idup, 310 record.pkParamValues!table().pack!(true).idup // clientKeys 311 ]; 312 } 313 314 override void updateRecord(Database db, Bytes encodedKeys, Bytes encodedRecord, ref RequestState state, string clientId){ 315 asPkStruct!table keys; 316 ColumnsStruct record; 317 try { 318 keys = unpack!(asPkStruct!table, true)(encodedKeys); 319 record = unpack!(ColumnsStruct, true)(encodedRecord); 320 } 321 catch(MessagePackException exc){ 322 errorf("mars - failed to unpack keys for record to update '%s': maybe a wrong type in js", table.name); 323 errorf(exc.toString); 324 state = RequestState.rejectedAsDecodingFailed; 325 return; 326 } 327 updateRecord(db, keys, record, state, clientId); 328 } 329 330 void updateRecord(Database db, asPkStruct!table keys, ColumnsStruct record, ref RequestState state, string clientId){ 331 static if(table.durable){ 332 db.executeUpdate!(table, asPkStruct!table, ColumnsStruct)(keys, record, state); 333 } 334 else { } 335 if( state == RequestState.executed ){ 336 static if(table.decorateRows){ 337 asSyncStruct!table rec; 338 assignCommonFields(rec, record); 339 with(rec){ 340 mars_who = /+username+/"qualeutente" ~ "@" ~ /+clientid+/ "qualeclient"; 341 mars_what = "updated"; 342 mars_when = Clock.currTime.toString(); 343 } 344 } 345 else { 346 auto rec = record; 347 } 348 toUpdate[keys] = rec; 349 if(table.cacheRows){ 350 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){ 351 auto cst = key in clientSideTables; 352 (*cst).ops ~= new UpdateClientRecords!ClientT(); 353 } 354 } 355 } 356 } 357 358 override Bytes deleteRecord(Database db, Bytes data, ref DeleteError err, string username, string clientid){ 359 import msgpack : pack, unpack, MessagePackException; 360 asPkParamStruct!table keys; 361 try { 362 keys = unpack!(asPkParamStruct!table, true)(data); 363 } 364 catch(MessagePackException exc){ 365 errorf("mars - failed to unpack keys for record to delete '%s': maybe a wrong type in js", table.name); 366 errorf(exc.toString); 367 err = DeleteError.unknownError; 368 return data; 369 } 370 deleteRecord(db, keys, err, username, clientid); 371 if( err != DeleteError.deleted ) return data; 372 return []; 373 } 374 375 asPkParamStruct!table 376 deleteRecord(Database db, asPkParamStruct!table keys, ref DeleteError err, string username, string clientid){ 377 KeysStruct k; 378 assignFields(k, keys); 379 static if(table.durable){ 380 db.executeDelete!(table, asPkParamStruct!table)(keys, err); 381 } 382 else { 383 fixtures.remove(k); 384 err = DeleteError.deleted; 385 } 386 if( err == DeleteError.deleted ){ 387 static if(table.decorateRows){ 388 toDelete[k] = Sync(username ~ "@" ~ clientid, "deleted", Clock.currTime.toString()); 389 } 390 else { 391 toDelete[k] = 0; 392 } 393 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientid )){ 394 auto cst = key in clientSideTables; 395 (*cst).ops ~= new ClientDeleteValues!ClientT(); 396 } 397 } 398 return keys; 399 } 400 401 /// update row in the server table, turning the client tables out of sync 402 deprecated void updateRow(KeysStruct keys, ColumnsStruct record){ 403 //KeysStruct keys = pkValues!table(record); 404 auto v = keys in toInsert; 405 if( v !is null ){ 406 static if(table.decorateRows){ 407 asSyncStruct!table rec; 408 assignCommonFields(rec, record); 409 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 410 } 411 else { 412 auto rec = record; 413 } 414 *v = rec; 415 assert( (keys in toUpdate) is null ); 416 } 417 else { 418 auto v2 = keys in toUpdate; 419 if( v2 !is null ){ 420 static if(table.decorateRows){ assert(false); } 421 else { *v2 = record; } 422 } 423 else { 424 static if(table.decorateRows){ assert(false); } 425 else { toUpdate[keys] = record; } 426 } 427 } 428 fixtures[keys] = record; 429 foreach(ref cst; clientSideTables.values){ 430 cst.ops ~= new ClientUpdateValues!ClientT(); 431 } 432 } 433 434 /// update row in the server table, turning the client tables out of sync 435 void updateRow(Database db, KeysStruct keys, ColumnsStruct record){ 436 static if( table.durable ){ 437 import msgpack : pack; 438 439 RequestState state; 440 db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record, state); 441 auto v = keys in toInsert; 442 if( v !is null ){ 443 static if(table.decorateRows){ 444 asSyncStruct!table rec; 445 assignCommonFields(rec, record); 446 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 447 } 448 else { 449 auto rec = record; 450 } 451 *v = rec; 452 assert( (keys in toUpdate) is null ); 453 } 454 else { 455 auto v2 = keys in toUpdate; 456 if( v2 !is null ){ 457 static if(table.decorateRows){ assert(false); } 458 else { *v2 = record; } 459 } 460 else { 461 static if(table.decorateRows){ assert(false); } 462 else { toUpdate[keys] = record; } 463 } 464 } 465 } 466 else { 467 //KeysStruct keys = pkValues!table(record); 468 auto v = keys in toInsert; 469 if( v !is null ){ 470 static if(table.decorateRows){ 471 asSyncStruct!table rec; 472 assignCommonFields(rec, record); 473 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 474 } 475 else { 476 auto rec = record; 477 } 478 *v = record; 479 assert( (keys in toUpdate) is null ); 480 } 481 else { 482 v = keys in toUpdate; 483 if( v !is null ){ 484 *v = record; 485 } 486 else { 487 toUpdate[keys] = record; 488 } 489 } 490 fixtures[keys] = record; 491 } 492 foreach(ref cst; clientSideTables.values){ 493 cst.ops ~= new ClientUpdateValues!ClientT(); 494 } 495 } 496 497 /// returns the packet selected rows 498 override Bytes packRows(size_t offset = 0, size_t limit = long.max) const { 499 import msgpack : pack; 500 return pack!(true)(selectRows(null, offset, limit)).idup; 501 } 502 /// returns the packet selected rows 503 override Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max) const { 504 import msgpack : pack; 505 return pack!(true)(selectRows(db, offset, limit)).idup; 506 } 507 508 /// return the packet rows to insert in the client 509 override Bytes packRowsToInsert() { 510 import msgpack : pack; 511 auto packed = pack!(true)(toInsert.values()).idup; 512 //toInsert = null; can't reset... this is called for every client 513 return packed; 514 } 515 516 /// return the packet rows to delete in the client 517 override Bytes packRowsToDelete() { 518 import msgpack : pack; 519 asSyncPkParamStruct!(table)[] whereKeys; 520 foreach(key; toDelete.keys()){ 521 import std.stdio; writeln("+++++++++++++>>>>", key, ":", toDelete[key]); 522 asSyncPkParamStruct!table whereKey; 523 assignFields(whereKey, key); 524 static if(table.decorateRows) assignCommonFields(whereKey, toDelete[key]); 525 whereKeys ~= whereKey; 526 writeln("<<<=== M<<<<<<<<<<", whereKey); 527 } 528 auto packed = pack!(true)(whereKeys).idup; 529 //toInsert = null; can't reset... this is called for every client 530 return packed; 531 } 532 533 /// return the packet rows to update in the client 534 override Bytes packRowsToUpdate() { 535 static struct UpdateRecord { 536 KeysStruct keys; 537 static if(table.decorateRows){ 538 asSyncStruct!table record; 539 } 540 else { 541 asStruct!table record; 542 } 543 } 544 UpdateRecord[] records; 545 foreach(r; toUpdate.keys){ 546 records ~= UpdateRecord(r, toUpdate[r]); 547 } 548 549 import msgpack : pack; 550 auto packed = pack!(true)(records).idup; 551 return packed; 552 } 553 554 void loadFixture(ColumnsStruct fixture){ 555 KeysStruct keys = pkValues!table(fixture); 556 fixtures[keys] = fixture; 557 } 558 559 override void unsafeReset() { 560 //fixtures = null; 561 toInsert = null; 562 toUpdate = null; 563 toDelete = null; 564 } 565 566 //static if( ! table.durable ){ 567 asStruct!(table)[asPkStruct!(table)] fixtures; 568 static if(table.decorateRows){ 569 asSyncStruct!(table)[asPkStruct!(table)] toInsert; 570 Sync[asPkStruct!(table)] toDelete; 571 asSyncStruct!(table)[asPkStruct!(table)] toUpdate; 572 } 573 else { 574 asStruct!(table)[asPkStruct!(table)] toInsert; 575 int[asPkStruct!(table)] toDelete; 576 asStruct!(table)[asPkStruct!(table)] toUpdate; 577 } 578 579 // ... record inserted client side, already patched and inserted for this client. 580 //asStruct!(table)[string] notToInsert; 581 //} 582 } 583 584 585 struct ClientSideTable(ClientT) 586 { 587 private { 588 Strategy strategy = Strategy.easilySyncNone; 589 public SynOp!ClientT[] ops; 590 } 591 } 592 593 private 594 { 595 enum Strategy { easilySyncAll, easilySyncNone } 596 597 class SynOp(MarsClientT) { 598 /// deprecated way of proceding: use the method below with a null database 599 void execute(MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT){ assert(false); } 600 abstract void execute(Database, MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT); 601 } 602 603 /// take all the rows in the server table and send them on the client table. 604 class ClientImportValues(MarsClientT) : SynOp!MarsClientT { 605 606 override void execute( 607 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 608 ){ 609 assert(db !is null); 610 611 // ... if the table is empty, simply do nothing ... 612 if( sst.count(db) > 0 ){ 613 auto payload = sst.packRows(db); 614 615 auto req = ImportRecordsReq(); with(req){ 616 tableIndex = sst.index; 617 statementIndex = indexStatementFor(sst.index, "insert"); 618 encodedRecords = payload; 619 } 620 marsClient.sendRequest(req); 621 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep(); 622 } 623 } 624 625 override 626 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 627 { 628 import mars.msg : ImportValuesRequest; 629 import std.conv : to; 630 631 // ... if the table is empty, simply do nothing ... 632 if( sst.count(null) > 0 ){ 633 auto payload = sst.packRows(); 634 635 auto req = ImportRecordsReq(); with(req){ 636 tableIndex =sst.index; 637 statementIndex = indexStatementFor(sst.index, "insert"); 638 encodedRecords = payload; 639 } 640 marsClient.sendRequest(req); 641 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep(); 642 } 643 } 644 } 645 646 class ClientInsertValues(MarsClientT) : SynOp!MarsClientT { 647 648 override 649 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 650 { 651 if( sst.countRowsToInsert > 0 ){ 652 auto payload = sst.packRowsToInsert(); 653 auto req = InsertRecordsReq(); with(req){ 654 tableIndex = sst.index; 655 statementIndex = indexStatementFor(sst.index, "insert"); 656 encodedRecords = payload; 657 } 658 marsClient.sendRequest(req); 659 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep(); 660 } 661 } 662 override 663 void execute( 664 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 665 ){ 666 if( sst.countRowsToInsert > 0 ){ 667 auto payload = sst.packRowsToInsert(); 668 auto req = InsertRecordsReq(); with(req){ 669 tableIndex = sst.index; 670 statementIndex = indexStatementFor(sst.index, "insert"); 671 encodedRecords = payload; 672 } 673 marsClient.sendRequest(req); 674 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep(); 675 } 676 } 677 } 678 679 class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT { 680 681 override 682 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 683 { 684 if( sst.countRowsToDelete > 0 ){ 685 auto payload = sst.packRowsToDelete(); 686 auto req = DeleteRecordsReq(); with(req){ 687 tableIndex = sst.index; 688 statementIndex = indexStatementFor(sst.index, "delete").to!int; 689 encodedRecords = payload; 690 } 691 marsClient.sendRequest(req); 692 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep(); 693 } 694 } 695 override 696 void execute( 697 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 698 ){ 699 if( sst.countRowsToDelete > 0 ){ 700 auto payload = sst.packRowsToDelete(); 701 auto req = DeleteRecordsReq(); with(req){ 702 tableIndex = sst.index; 703 statementIndex = indexStatementFor(sst.index, "delete").to!int; 704 encodedRecords = payload; 705 } 706 marsClient.sendRequest(req); 707 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep(); 708 } 709 } 710 } 711 712 class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT { 713 714 override 715 void execute(MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 716 { 717 import mars.msg : UpdateValuesRequest; 718 import std.conv :to; 719 720 if( sst.countRowsToUpdate > 0 ){ 721 auto payload = sst.packRowsToUpdate(); 722 auto req = UpdateValuesRequest(); 723 req.statementIndex = indexStatementFor(sst.index, "update").to!int; 724 req.bytes = payload; 725 marsclient.sendRequest(req); 726 } 727 } 728 729 override void execute(Database db, MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, 730 BaseServerSideTable!MarsClientT sst) 731 { 732 import mars.msg : UpdateValuesRequest; 733 import std.conv :to; 734 735 if( sst.countRowsToUpdate > 0 ){ 736 auto payload = sst.packRowsToUpdate(); 737 auto req = UpdateValuesRequest(); 738 req.statementIndex = indexStatementFor(sst.index, "update").to!int; 739 req.bytes = payload; 740 marsclient.sendRequest(req); 741 } 742 743 } 744 } 745 746 class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT { 747 override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst, 748 BaseServerSideTable!MarsClientT sst){} 749 } 750 751 class UpdateClientRecords(MarsClientT) : SynOp!MarsClientT { 752 override 753 void execute(Database db, MarsClientT marsClient, ClientSideTable!MarsClientT* cst, BaseServerSideTable!MarsClientT sst) 754 { 755 import mars.msg : UpdateRecordsReq; 756 757 if( sst.countRowsToUpdate > 0 ){ 758 auto req = UpdateRecordsReq(); 759 req.tableIndex = sst.index; 760 req.encodedRecords = sst.packRowsToUpdate(); 761 marsClient.sendRequest(req); 762 } 763 } 764 } 765 } 766 767 version(unittest) 768 { 769 struct MarsClientMock { void sendRequest(R)(R r){} } 770 } 771 unittest 772 { 773 /+ 774 import std.range : zip; 775 776 777 auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []); 778 auto sst = new ServerSideTable!(MarsClientMock, t1); 779 zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) ); 780 781 auto cst = sst.createClientSideTable(); 782 // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ... 783 assert( cst.strategy == Strategy.easilySyncAll ); 784 // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati.... 785 assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null ); 786 // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances. 787 auto op = sst.ops[$-1]; 788 op.execute(MarsClientMock(), cst, sst); 789 790 // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1 791 sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z")); 792 assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") ); 793 +/ 794 } 795 /+ 796 unittest 797 { 798 version(unittest_starwars){ 799 import mars.starwars; 800 enum schema = starwarsSchema(); 801 802 auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]); 803 auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]); 804 auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars"); 805 AuthoriseError err; 806 auto db = databaseService.connect("jedi", "force", err); 807 db.executeUnsafe("begin transaction"); 808 809 auto rows = people.selectRows(db); 810 assert( rows[0] == luke, rows[0].to!string ); 811 812 auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80); 813 InsertError ierr; 814 auto inserted = people.insertRecord(db, paolo, ierr); 815 assert(inserted == paolo); 816 817 818 //import std.stdio; 819 //foreach(row; rows) writeln("---->>>>>", row); 820 //assert(false); 821 } 822 } 823 +/ 824