1 2 3 /** 4 gestiamo la parte di sincronia tra server e le varie tavole associate ai client. 5 6 7 */ 8 module mars.sync; 9 10 enum d = 11 ` 12 BaseServerSideTable 13 clientSideTables[clientId] --- SyncOps[] 14 `; 15 16 import std.algorithm; 17 import std.conv; 18 import std.datetime; 19 import std.format; 20 import std.meta; 21 import std.typecons; 22 import std.variant; 23 import std.base64; 24 25 import std.experimental.logger; 26 27 import msgpack; 28 29 import mars.defs; 30 import mars.pgsql; 31 import mars.msg; 32 import mars.server : indexStatementFor; 33 version(unittest) import mars.starwars; 34 35 alias Bytes = immutable(ubyte)[]; 36 37 /** 38 A server side table is instantiated only once per marsServer, and they are stored into the 'tables' 39 structure of the marsServer. 40 41 The instantiation usually is inside the application code: `InstantiateTables!(ctTables)(marsServer, [], [], [], [])` 42 */ 43 class BaseServerSideTable(ClientT) 44 { 45 alias ClientType = ClientT; 46 47 this(immutable(Table) definition){ 48 this.definition = definition; 49 } 50 51 auto createClientSideTable(string clientid){ 52 auto cst = new ClientSideTable!ClientT(); 53 cst.strategy = definition.cacheRows? Strategy.easilySyncAll : Strategy.easilySyncNone; 54 final switch(cst.strategy) with(Strategy) { 55 case easilySyncAll: 56 cst.ops ~= new ClientImportValues!ClientT(); 57 break; 58 case easilySyncNone: 59 break; 60 } 61 // new client, new client side table 62 assert( (clientid in clientSideTables) is null, format("cliendid:%s, clientSideTables:%s", 63 clientid, clientSideTables.keys() ) ); 64 clientSideTables[clientid] = cst; 65 66 return cst; 67 } 68 69 auto wipeClientSideTable(string clientid){ 70 assert( (clientid in clientSideTables) !is null, clientid ); 71 clientSideTables.remove(clientid); 72 } 73 74 /// execute a sql select statement, and returns a vibe json array with the records as json 75 auto selectAsJson(Database db, string sqlSelect, Variant[string] parameters, ref RequestState state) in { 76 assert(db !is null); 77 } body { 78 import std.math : isNaN; 79 import std.datetime : Date; 80 import vibe.data.json; 81 82 Json jsonRecords = Json.emptyArray; 83 try { 84 auto resultSet = db.executeQueryUnsafe(sqlSelect, parameters); 85 scope(exit) resultSet.close(); 86 87 foreach(variantRow; resultSet){ 88 Json jsonRow = Json.emptyArray; 89 foreach(i, variantField; variantRow){ 90 if(variantField.type == typeid(int)){ jsonRow ~= variantField.get!int; } 91 else if(variantField.type == typeid(short)){ jsonRow ~= variantField.get!short; } 92 else if(variantField.type == typeid(float)){ jsonRow ~= variantField.get!float; } 93 else if(variantField.type == typeid(long)){ jsonRow ~= variantField.get!long; } 94 else if(variantField.type == typeid(string)){ jsonRow ~= variantField.get!string; } 95 else if(variantField.type == typeid(bool)){ jsonRow ~= variantField.get!bool; } 96 else if(variantField.type == typeid(ubyte[])){ 97 // ... if I apply directly the '=~', the arrays are flattened! 98 jsonRow ~= cast(string)Base64.encode(variantField.get!(ubyte[])); 99 } 100 else if( variantField.type == typeid(Date) ){ 101 jsonRow ~= variantField.get!Date().toISOExtString(); // '2017-11-29' 102 } 103 else if( variantField.type is typeid(null) ){ 104 assert(false, "Returning null as a value is not supported: use coalesce(value, something) in the query to strip it out"); 105 } 106 else { 107 import std.stdio; writeln(variantField.type); 108 assert(false); 109 } 110 } 111 jsonRecords ~= 0; jsonRecords[jsonRecords.length-1] = jsonRow; 112 } 113 } 114 catch(Exception e){ 115 error("mars - S <-- ... - exception during query unsafe"); 116 error(e.toString()); 117 state = RequestState.rejectedAsPGSqlError; 118 } 119 return jsonRecords; 120 } 121 version(unittest_starwars){ 122 unittest { 123 AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err); 124 auto table = new WhiteHole!BaseServerSideTable(Table()); 125 auto json = table.selectAsJson(db, "select * from people", null); 126 //import std.stdio; writeln(json.toPrettyString()); 127 assert(json[0][0] == "Luke"); 128 } 129 unittest { 130 import std.math : isNaN; 131 import vibe.data.json : Json; 132 133 AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err); 134 auto table = new WhiteHole!BaseServerSideTable(Table()); 135 auto json = table.selectAsJson(db, "select * from starships", null); 136 assert(json[0][0] == "NanShip"); 137 // here we have a BIG problem: NaN is not encodable in Json! The value is 'null'! 138 // assert(json[0][1].get!float().isNaN); 139 } 140 } 141 142 abstract Bytes packRows(size_t offset = 0, size_t limit = long.max); 143 abstract Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max); 144 //abstract size_t count() const; 145 abstract size_t count(Database) const; 146 abstract size_t countRowsToInsert() const; 147 abstract size_t countRowsToUpdate() const; 148 abstract size_t countRowsToDelete() const; 149 abstract size_t index() const; 150 abstract Bytes packRowsToInsert(); 151 abstract Bytes packRowsToUpdate(); 152 abstract Bytes packRowsToDelete(); 153 154 abstract Bytes[2] insertRecord(Database, immutable(ubyte)[], ref InsertError, string, string); 155 abstract void updateRecord(Database, Bytes, immutable(ubyte)[], ref RequestState, string); 156 abstract Bytes deleteRecord(Database, immutable(ubyte)[], ref DeleteError, string, string); 157 158 abstract void unsafeReset(); 159 160 immutable Table definition; 161 private { 162 163 /// Every server table has a collection of the linked client side tables. The key element is the identifier of 164 /// the client, so that the collection can be kept clean when a client connect/disconnects. 165 public ClientSideTable!(ClientT)*[string] clientSideTables; 166 167 //public SynOp!ClientT[] ops; 168 169 } 170 } 171 172 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT 173 { 174 enum Definition = table; 175 enum Columns = table.columns; 176 177 alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns... 178 alias ColumnsStruct = asStruct!table; 179 alias KeysStruct = asPkStruct!table; 180 181 this() { super(table); } 182 183 // interface needed to handle the records in a generic way ... 184 185 /// returns the total number of records we are 'talking on' (filters? query?) 186 //deprecated override size_t count() const { return fixtures.length; } 187 override size_t count(Database db) const { 188 import ddb.postgres : ServerErrorException; 189 190 static if( table.durable ){ 191 try { 192 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name)); 193 } 194 catch(ServerErrorException e){ 195 switch(e.code){ 196 case "42501": // permission denied for relation "relation" 197 infof("Permission denied:%s", e.toString()); 198 return 0; 199 default: 200 warningf("mars - x ... x - Unhandled PostgreSQL exception during 'count'"); 201 info(e.toString()); 202 throw e; 203 } 204 } 205 } 206 else { 207 return fixtures.length; 208 } 209 } 210 //static if( ! table.durable ){ // XXX 211 override size_t countRowsToInsert() const { return toInsert.length; } 212 override size_t countRowsToUpdate() const { return toUpdate.length; } 213 override size_t countRowsToDelete() const { return toDelete.length; } 214 //} 215 216 /// return the unique index identifier for this table, that's coming from the table definition in the app.d 217 override size_t index() const { return Definition.index; } 218 219 /// returns 'limit' rows starting from 'offset'. 220 deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const { 221 size_t till = (limit + offset) > count(null) ? count(null) : (limit + offset); 222 return fixtures.values()[offset .. till]; 223 } 224 /// returns 'limit' rows starting from 'offset'. 225 auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const { 226 static if(table.durable){ 227 auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format( 228 table.name, limit, offset) 229 ); 230 static if( Definition.decorateRows ){ 231 asSyncStruct!table[] rows; 232 foreach(vr; resultSet){ 233 asStruct!table v = vr; 234 asSyncStruct!table r; 235 assignCommonFields!(typeof(r), typeof(v))(r, v); 236 r.mars_who = "automation@server"; 237 r.mars_what = "imported"; 238 r.mars_when = Clock.currTime.toString(); 239 rows ~= r; 240 } 241 } 242 else { 243 asStruct!table[] rows; 244 foreach(v; resultSet){ 245 rows ~= v; 246 } 247 } 248 249 250 resultSet.close(); 251 return rows; 252 } 253 else { 254 size_t till = (limit + offset) > count(db) ? count(db) : (limit + offset); 255 return fixtures.values()[offset .. till]; 256 } 257 } 258 259 /// insert a new row in the server table, turning clients table out of sync 260 deprecated void insertRow(ColumnsStruct fixture){ 261 KeysStruct keys = pkValues!(table)(fixture); 262 fixtures[keys] = fixture; 263 static if(table.decorateRows){ 264 asSyncStruct!table rec; 265 assignCommonFields(rec, fixture); 266 with(rec){ mars_who = "automation@server"; mars_what = "inserted"; mars_when = Clock.currTime.toString(); } 267 } 268 else auto rec = fixture; 269 toInsert[keys] = rec; 270 foreach(ref cst; clientSideTables.values){ 271 cst.ops ~= new ClientInsertValues!ClientT(); 272 } 273 } 274 275 /// insert a new row in the server table, turning clients table out of sync 276 ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err, string username, string clientId){ 277 static if(table.durable){ 278 auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err); 279 } else { 280 fixtures[pkValues!table(record)] = record; 281 auto inserted = record; 282 err = InsertError.inserted; 283 } 284 if( err == InsertError.inserted ){ 285 static if(table.decorateRows){ 286 asSyncStruct!table rec; 287 assignCommonFields(rec, inserted); 288 with(rec){ 289 mars_who = username ~ "@" ~ clientId; mars_what = "inserted"; 290 mars_when = Clock.currTime.toString(); 291 } 292 } 293 else { 294 auto rec = inserted; 295 } 296 toInsert[pkValues!table(inserted)] = rec; 297 // ... don't propagate if not cached, or we are triggering a loot of refresh 298 // stick with manual refresh done on client. 299 if(table.cacheRows){ 300 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){ 301 auto cst = key in clientSideTables; 302 (*cst).ops ~= new ClientInsertValues!ClientT(); 303 } 304 } 305 } 306 return inserted; 307 } 308 309 override Bytes[2] 310 insertRecord(Database db, Bytes data, ref InsertError err, string username, string clientId){ 311 import msgpack : pack, unpack, MessagePackException; 312 ColumnsStruct record; 313 try { 314 record = unpack!(ColumnsStruct, true)(data); 315 } 316 catch(MessagePackException exc){ 317 errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name); 318 errorf(exc.toString); 319 err = InsertError.unknownError; 320 return [[], []]; 321 } 322 ColumnsStruct inserted = insertRecord(db, record, err, username, clientId); 323 return [ 324 inserted.pack!(true).idup, 325 record.pkParamValues!table().pack!(true).idup // clientKeys 326 ]; 327 } 328 329 override void updateRecord(Database db, Bytes encodedKeys, Bytes encodedRecord, ref RequestState state, string clientId){ 330 asPkStruct!table keys; 331 ColumnsStruct record; 332 try { 333 keys = unpack!(asPkStruct!table, true)(encodedKeys); 334 record = unpack!(ColumnsStruct, true)(encodedRecord); 335 } 336 catch(MessagePackException exc){ 337 errorf("mars - failed to unpack keys for record to update '%s': maybe a wrong type in js", table.name); 338 errorf(exc.toString); 339 state = RequestState.rejectedAsDecodingFailed; 340 return; 341 } 342 updateRecord(db, keys, record, state, clientId); 343 } 344 345 void updateRecord(Database db, asPkStruct!table keys, ColumnsStruct record, ref RequestState state, string clientId){ 346 static if(table.durable){ 347 db.executeUpdate!(table, asPkStruct!table, ColumnsStruct)(keys, record, state); 348 } 349 else { } 350 if( state == RequestState.executed ){ 351 static if(table.decorateRows){ 352 asSyncStruct!table rec; 353 assignCommonFields(rec, record); 354 with(rec){ 355 mars_who = /+username+/"qualeutente" ~ "@" ~ /+clientid+/ "qualeclient"; 356 mars_what = "updated"; 357 mars_when = Clock.currTime.toString(); 358 } 359 } 360 else { 361 auto rec = record; 362 } 363 toUpdate[keys] = rec; 364 if(table.cacheRows){ 365 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){ 366 auto cst = key in clientSideTables; 367 (*cst).ops ~= new UpdateClientRecords!ClientT(); 368 } 369 } 370 } 371 } 372 373 override Bytes deleteRecord(Database db, Bytes data, ref DeleteError err, string username, string clientid){ 374 import msgpack : pack, unpack, MessagePackException; 375 asPkParamStruct!table keys; 376 try { 377 keys = unpack!(asPkParamStruct!table, true)(data); 378 } 379 catch(MessagePackException exc){ 380 errorf("mars - failed to unpack keys for record to delete '%s': maybe a wrong type in js", table.name); 381 errorf(exc.toString); 382 err = DeleteError.unknownError; 383 return data; 384 } 385 deleteRecord(db, keys, err, username, clientid); 386 if( err != DeleteError.deleted ) return data; 387 return []; 388 } 389 390 asPkParamStruct!table 391 deleteRecord(Database db, asPkParamStruct!table keys, ref DeleteError err, string username, string clientid){ 392 KeysStruct k; 393 assignFields(k, keys); 394 static if(table.durable){ 395 db.executeDelete!(table, asPkParamStruct!table)(keys, err); 396 } 397 else { 398 fixtures.remove(k); 399 err = DeleteError.deleted; 400 } 401 if( err == DeleteError.deleted ){ 402 static if(table.decorateRows){ 403 toDelete[k] = Sync(username ~ "@" ~ clientid, "deleted", Clock.currTime.toString()); 404 } 405 else { 406 toDelete[k] = 0; 407 } 408 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientid )){ 409 auto cst = key in clientSideTables; 410 (*cst).ops ~= new ClientDeleteValues!ClientT(); 411 } 412 } 413 return keys; 414 } 415 416 /// update row in the server table, turning the client tables out of sync 417 deprecated void updateRow(KeysStruct keys, ColumnsStruct record){ 418 //KeysStruct keys = pkValues!table(record); 419 auto v = keys in toInsert; 420 if( v !is null ){ 421 static if(table.decorateRows){ 422 asSyncStruct!table rec; 423 assignCommonFields(rec, record); 424 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 425 } 426 else { 427 auto rec = record; 428 } 429 *v = rec; 430 assert( (keys in toUpdate) is null ); 431 } 432 else { 433 auto v2 = keys in toUpdate; 434 if( v2 !is null ){ 435 static if(table.decorateRows){ assert(false); } 436 else { *v2 = record; } 437 } 438 else { 439 static if(table.decorateRows){ assert(false); } 440 else { toUpdate[keys] = record; } 441 } 442 } 443 fixtures[keys] = record; 444 foreach(ref cst; clientSideTables.values){ 445 cst.ops ~= new ClientUpdateValues!ClientT(); 446 } 447 } 448 449 /// update row in the server table, turning the client tables out of sync 450 void updateRow(Database db, KeysStruct keys, ColumnsStruct record){ 451 static if( table.durable ){ 452 import msgpack : pack; 453 454 RequestState state; 455 db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record, state); 456 auto v = keys in toInsert; 457 if( v !is null ){ 458 static if(table.decorateRows){ 459 asSyncStruct!table rec; 460 assignCommonFields(rec, record); 461 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 462 } 463 else { 464 auto rec = record; 465 } 466 *v = rec; 467 assert( (keys in toUpdate) is null ); 468 } 469 else { 470 auto v2 = keys in toUpdate; 471 if( v2 !is null ){ 472 static if(table.decorateRows){ assert(false); } 473 else { *v2 = record; } 474 } 475 else { 476 static if(table.decorateRows){ assert(false); } 477 else { toUpdate[keys] = record; } 478 } 479 } 480 } 481 else { 482 //KeysStruct keys = pkValues!table(record); 483 auto v = keys in toInsert; 484 if( v !is null ){ 485 static if(table.decorateRows){ 486 asSyncStruct!table rec; 487 assignCommonFields(rec, record); 488 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); } 489 } 490 else { 491 auto rec = record; 492 } 493 *v = record; 494 assert( (keys in toUpdate) is null ); 495 } 496 else { 497 v = keys in toUpdate; 498 if( v !is null ){ 499 *v = record; 500 } 501 else { 502 toUpdate[keys] = record; 503 } 504 } 505 fixtures[keys] = record; 506 } 507 foreach(ref cst; clientSideTables.values){ 508 cst.ops ~= new ClientUpdateValues!ClientT(); 509 } 510 } 511 512 /// returns the packet selected rows 513 override Bytes packRows(size_t offset = 0, size_t limit = long.max) const { 514 import msgpack : pack; 515 return pack!(true)(selectRows(null, offset, limit)).idup; 516 } 517 /// returns the packet selected rows 518 override Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max) const { 519 import msgpack : pack; 520 return pack!(true)(selectRows(db, offset, limit)).idup; 521 } 522 523 /// return the packet rows to insert in the client 524 override Bytes packRowsToInsert() { 525 import msgpack : pack; 526 auto packed = pack!(true)(toInsert.values()).idup; 527 //toInsert = null; can't reset... this is called for every client 528 return packed; 529 } 530 531 /// return the packet rows to delete in the client 532 override Bytes packRowsToDelete() { 533 import msgpack : pack; 534 asSyncPkParamStruct!(table)[] whereKeys; 535 foreach(key; toDelete.keys()){ 536 import std.stdio; writeln("+++++++++++++>>>>", key, ":", toDelete[key]); 537 asSyncPkParamStruct!table whereKey; 538 assignFields(whereKey, key); 539 static if(table.decorateRows) assignCommonFields(whereKey, toDelete[key]); 540 whereKeys ~= whereKey; 541 writeln("<<<=== M<<<<<<<<<<", whereKey); 542 } 543 auto packed = pack!(true)(whereKeys).idup; 544 //toInsert = null; can't reset... this is called for every client 545 return packed; 546 } 547 548 /// return the packet rows to update in the client 549 override Bytes packRowsToUpdate() { 550 static struct UpdateRecord { 551 KeysStruct keys; 552 static if(table.decorateRows){ 553 asSyncStruct!table record; 554 } 555 else { 556 asStruct!table record; 557 } 558 } 559 UpdateRecord[] records; 560 foreach(r; toUpdate.keys){ 561 records ~= UpdateRecord(r, toUpdate[r]); 562 } 563 564 import msgpack : pack; 565 auto packed = pack!(true)(records).idup; 566 return packed; 567 } 568 569 void loadFixture(ColumnsStruct fixture){ 570 KeysStruct keys = pkValues!table(fixture); 571 fixtures[keys] = fixture; 572 } 573 574 override void unsafeReset() { 575 //fixtures = null; 576 toInsert = null; 577 toUpdate = null; 578 toDelete = null; 579 } 580 581 //static if( ! table.durable ){ 582 asStruct!(table)[asPkStruct!(table)] fixtures; 583 static if(table.decorateRows){ 584 asSyncStruct!(table)[asPkStruct!(table)] toInsert; 585 Sync[asPkStruct!(table)] toDelete; 586 asSyncStruct!(table)[asPkStruct!(table)] toUpdate; 587 } 588 else { 589 asStruct!(table)[asPkStruct!(table)] toInsert; 590 int[asPkStruct!(table)] toDelete; 591 asStruct!(table)[asPkStruct!(table)] toUpdate; 592 } 593 594 // ... record inserted client side, already patched and inserted for this client. 595 //asStruct!(table)[string] notToInsert; 596 //} 597 } 598 599 600 struct ClientSideTable(ClientT) 601 { 602 private { 603 Strategy strategy = Strategy.easilySyncNone; 604 public SynOp!ClientT[] ops; 605 } 606 } 607 608 private 609 { 610 enum Strategy { easilySyncAll, easilySyncNone } 611 612 class SynOp(MarsClientT) { 613 /// deprecated way of proceding: use the method below with a null database 614 void execute(MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT){ assert(false); } 615 abstract void execute(Database, MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT); 616 } 617 618 /// take all the rows in the server table and send them on the client table. 619 class ClientImportValues(MarsClientT) : SynOp!MarsClientT { 620 621 override void execute( 622 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 623 ){ 624 //assert(db !is null); not true for not durable table 625 626 // ... if the table is empty, simply do nothing ... 627 if( sst.count(db) > 0 ){ 628 auto payload = sst.packRows(db); 629 630 auto req = ImportRecordsReq(); with(req){ 631 tableIndex = sst.index; 632 statementIndex = indexStatementFor(sst.index, "insert"); 633 encodedRecords = payload; 634 } 635 marsClient.sendRequest(req); 636 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep(); 637 } 638 } 639 640 override 641 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 642 { 643 import mars.msg : ImportValuesRequest; 644 import std.conv : to; 645 646 // ... if the table is empty, simply do nothing ... 647 if( sst.count(null) > 0 ){ 648 auto payload = sst.packRows(); 649 650 auto req = ImportRecordsReq(); with(req){ 651 tableIndex =sst.index; 652 statementIndex = indexStatementFor(sst.index, "insert"); 653 encodedRecords = payload; 654 } 655 marsClient.sendRequest(req); 656 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep(); 657 } 658 } 659 } 660 661 class ClientInsertValues(MarsClientT) : SynOp!MarsClientT { 662 663 override 664 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 665 { 666 if( sst.countRowsToInsert > 0 ){ 667 auto payload = sst.packRowsToInsert(); 668 auto req = InsertRecordsReq(); with(req){ 669 tableIndex = sst.index; 670 statementIndex = indexStatementFor(sst.index, "insert"); 671 encodedRecords = payload; 672 } 673 marsClient.sendRequest(req); 674 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep(); 675 } 676 } 677 override 678 void execute( 679 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 680 ){ 681 if( sst.countRowsToInsert > 0 ){ 682 auto payload = sst.packRowsToInsert(); 683 auto req = InsertRecordsReq(); with(req){ 684 tableIndex = sst.index; 685 statementIndex = indexStatementFor(sst.index, "insert"); 686 encodedRecords = payload; 687 } 688 marsClient.sendRequest(req); 689 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep(); 690 } 691 } 692 } 693 694 class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT { 695 696 override 697 void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 698 { 699 if( sst.countRowsToDelete > 0 ){ 700 auto payload = sst.packRowsToDelete(); 701 auto req = DeleteRecordsReq(); with(req){ 702 tableIndex = sst.index; 703 statementIndex = indexStatementFor(sst.index, "delete").to!int; 704 encodedRecords = payload; 705 } 706 marsClient.sendRequest(req); 707 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep(); 708 } 709 } 710 override 711 void execute( 712 Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst 713 ){ 714 if( sst.countRowsToDelete > 0 ){ 715 auto payload = sst.packRowsToDelete(); 716 auto req = DeleteRecordsReq(); with(req){ 717 tableIndex = sst.index; 718 statementIndex = indexStatementFor(sst.index, "delete").to!int; 719 encodedRecords = payload; 720 } 721 marsClient.sendRequest(req); 722 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep(); 723 } 724 } 725 } 726 727 class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT { 728 729 override 730 void execute(MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst) 731 { 732 import mars.msg : UpdateValuesRequest; 733 import std.conv :to; 734 735 if( sst.countRowsToUpdate > 0 ){ 736 auto payload = sst.packRowsToUpdate(); 737 auto req = UpdateValuesRequest(); 738 req.statementIndex = indexStatementFor(sst.index, "update").to!int; 739 req.bytes = payload; 740 marsclient.sendRequest(req); 741 } 742 } 743 744 override void execute(Database db, MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, 745 BaseServerSideTable!MarsClientT sst) 746 { 747 import mars.msg : UpdateValuesRequest; 748 import std.conv :to; 749 750 if( sst.countRowsToUpdate > 0 ){ 751 auto payload = sst.packRowsToUpdate(); 752 auto req = UpdateValuesRequest(); 753 req.statementIndex = indexStatementFor(sst.index, "update").to!int; 754 req.bytes = payload; 755 marsclient.sendRequest(req); 756 } 757 758 } 759 } 760 761 class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT { 762 override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst, 763 BaseServerSideTable!MarsClientT sst){} 764 } 765 766 class UpdateClientRecords(MarsClientT) : SynOp!MarsClientT { 767 override 768 void execute(Database db, MarsClientT marsClient, ClientSideTable!MarsClientT* cst, BaseServerSideTable!MarsClientT sst) 769 { 770 import mars.msg : UpdateRecordsReq; 771 772 if( sst.countRowsToUpdate > 0 ){ 773 auto req = UpdateRecordsReq(); 774 req.tableIndex = sst.index; 775 req.encodedRecords = sst.packRowsToUpdate(); 776 marsClient.sendRequest(req); 777 } 778 } 779 } 780 } 781 782 version(unittest) 783 { 784 struct MarsClientMock { void sendRequest(R)(R r){} } 785 } 786 unittest 787 { 788 /+ 789 import std.range : zip; 790 791 792 auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []); 793 auto sst = new ServerSideTable!(MarsClientMock, t1); 794 zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) ); 795 796 auto cst = sst.createClientSideTable(); 797 // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ... 798 assert( cst.strategy == Strategy.easilySyncAll ); 799 // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati.... 800 assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null ); 801 // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances. 802 auto op = sst.ops[$-1]; 803 op.execute(MarsClientMock(), cst, sst); 804 805 // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1 806 sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z")); 807 assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") ); 808 +/ 809 } 810 /+ 811 unittest 812 { 813 version(unittest_starwars){ 814 import mars.starwars; 815 enum schema = starwarsSchema(); 816 817 auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]); 818 auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]); 819 auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars"); 820 AuthoriseError err; 821 auto db = databaseService.connect("jedi", "force", err); 822 db.executeUnsafe("begin transaction"); 823 824 auto rows = people.selectRows(db); 825 assert( rows[0] == luke, rows[0].to!string ); 826 827 auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80); 828 InsertError ierr; 829 auto inserted = people.insertRecord(db, paolo, ierr); 830 assert(inserted == paolo); 831 832 833 //import std.stdio; 834 //foreach(row; rows) writeln("---->>>>>", row); 835 //assert(false); 836 } 837 } 838 +/ 839