1 
2 
3 /**
4   gestiamo la parte di sincronia tra server e le varie tavole associate ai client.
5 
6 
7   */
8 module mars.sync;
9 
10 enum d =
11 `
12 BaseServerSideTable
13     clientSideTables[clientId] --- SyncOps[]
14 `;
15 
16 import std.algorithm;
17 import std.conv;
18 import std.datetime;
19 import std.format;
20 import std.meta;
21 import std.typecons;
22 import std.variant;
23 
24 import std.experimental.logger;
25 
26 import msgpack;
27 
28 import mars.defs;
29 import mars.pgsql;
30 import mars.msg;
31 import mars.server : indexStatementFor;
32 version(unittest) import mars.starwars;
33 
34 alias Bytes = immutable(ubyte)[];
35 
36 /**
37 A server side table is instantiated only once per marsServer, and they are stored into the 'tables'
38 structure of the marsServer.
39 
40 The instantiation usually is inside the application code: `InstantiateTables!(ctTables)(marsServer, [], [], [], [])`
41 */
42 class BaseServerSideTable(ClientT)
43 {
44     alias ClientType = ClientT;
45 
46     this(immutable(Table) definition){
47         this.definition = definition;
48     }
49 
50     auto createClientSideTable(string clientid){
51         auto cst = new ClientSideTable!ClientT();
52         cst.strategy = definition.cacheRows? Strategy.easilySyncAll : Strategy.easilySyncNone;
53         final switch(cst.strategy) with(Strategy) {
54             case easilySyncAll:
55                 cst.ops ~= new ClientImportValues!ClientT();
56                 break;
57             case easilySyncNone:
58                 break;
59         }
60         // new client, new client side table
61         assert( (clientid in clientSideTables) is null, format("cliendid:%s, clientSideTables:%s",
62                     clientid, clientSideTables.keys() ) );
63         clientSideTables[clientid] = cst;
64 
65         return cst;
66     }
67 
68     auto wipeClientSideTable(string clientid){
69         assert( (clientid in clientSideTables) !is null, clientid );
70         clientSideTables.remove(clientid);
71     }
72 
73     /// execute a sql select statement, and returns a vibe json array with the records as json
74     auto selectAsJson(Database db, string sqlSelect, Variant[string] parameters) in {
75         assert(db !is null);
76     } body {
77         import std.math : isNaN;
78         import vibe.data.json;
79 
80         auto resultSet = db.executeQueryUnsafe(sqlSelect, parameters);
81         scope(exit) resultSet.close();
82 
83         Json jsonRecords = Json.emptyArray;
84         foreach(variantRow; resultSet){
85             Json jsonRow = Json.emptyArray;
86             foreach(i, variantField; variantRow){
87                 if(variantField.type == typeid(int)){ jsonRow ~= variantField.get!int; }
88                 else if(variantField.type == typeid(short)){ jsonRow ~= variantField.get!short; }
89                 else if(variantField.type == typeid(float)){ jsonRow ~= variantField.get!float; }
90                 else if(variantField.type == typeid(long)){ jsonRow ~= variantField.get!long; }
91                 else if(variantField.type == typeid(string)){ jsonRow ~= variantField.get!string; }
92                 else if(variantField.type == typeid(ubyte[])){
93                     // ... if I apply directly the '=~', the arrays are flattened!
94                     jsonRow ~= 0;
95                     jsonRow[jsonRow.length-1] = variantField.get!(ubyte[]).serializeToJson;
96                 }
97                 else {
98                     import std.stdio; writeln(variantField.type);
99                     assert(false);
100                 }
101             }
102             jsonRecords ~= 0; jsonRecords[jsonRecords.length-1] = jsonRow;
103         }
104         return jsonRecords;
105     }
106     version(unittest_starwars){
107         unittest {
108             AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err);
109             auto table = new WhiteHole!BaseServerSideTable(Table());
110             auto json = table.selectAsJson(db, "select * from people", null);
111             //import std.stdio; writeln(json.toPrettyString());
112             assert(json[0][0] == "Luke");
113         }
114         unittest {
115             import std.math : isNaN;
116             import vibe.data.json : Json;
117 
118             AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err);
119             auto table = new WhiteHole!BaseServerSideTable(Table());
120             auto json = table.selectAsJson(db, "select * from starships", null);
121             assert(json[0][0] == "NanShip");
122             // here we have a BIG problem: NaN is not encodable in Json! The value is 'null'!
123             // assert(json[0][1].get!float().isNaN);
124         }
125     }
126 
127     abstract Bytes packRows(size_t offset = 0, size_t limit = long.max);
128     abstract Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max);
129     //abstract size_t count() const;
130     abstract size_t count(Database) const;
131     abstract size_t countRowsToInsert() const;
132     abstract size_t countRowsToUpdate() const;
133     abstract size_t countRowsToDelete() const;
134     abstract size_t index() const;
135     abstract Bytes packRowsToInsert();
136     abstract Bytes packRowsToUpdate();
137     abstract Bytes packRowsToDelete();
138 
139     abstract Bytes[2] insertRecord(Database, immutable(ubyte)[], ref InsertError, string, string);
140     abstract void updateRecord(Database, Bytes, immutable(ubyte)[], ref RequestState, string);
141     abstract Bytes    deleteRecord(Database, immutable(ubyte)[], ref DeleteError, string, string);
142 
143     abstract void unsafeReset();
144 
145     immutable Table definition; 
146     private {
147 
148         /// Every server table has a collection of the linked client side tables. The key element is the identifier of
149         /// the client, so that the collection can be kept clean when a client connect/disconnects.
150         public ClientSideTable!(ClientT)*[string] clientSideTables;
151         
152         //public SynOp!ClientT[] ops;
153 
154     }
155 }
156 
157 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT
158 {
159     enum Definition = table; 
160     enum Columns = table.columns;
161      
162     alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns...
163     alias ColumnsStruct = asStruct!table; 
164     alias KeysStruct = asPkStruct!table;
165 
166     this() { super(table); } 
167 
168     // interface needed to handle the records in a generic way ...
169 
170     /// returns the total number of records we are 'talking on' (filters? query?)
171     //deprecated override size_t count() const { return fixtures.length; }
172     override size_t count(Database db) const {
173         import ddb.postgres : ServerErrorException;
174 
175         static if( table.durable ){
176             try {
177                 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name));
178             }
179             catch(ServerErrorException e){
180                 switch(e.code){
181                     case "42501": // permission denied for relation "relation"
182                         infof("Permission denied:%s", e.toString());
183                         return 0;
184                     default:
185                         warningf("mars - x ... x - Unhandled PostgreSQL exception during 'count'");
186                         info(e.toString());
187                         throw e;
188                 }
189             }
190         }
191         else {
192             return fixtures.length;
193         }
194     }
195     //static if( ! table.durable ){ // XXX
196         override size_t countRowsToInsert() const { return toInsert.length; }
197         override size_t countRowsToUpdate() const { return toUpdate.length; }
198         override size_t countRowsToDelete() const { return toDelete.length; }
199     //}
200 
201     /// return the unique index identifier for this table, that's coming from the table definition in the app.d
202     override size_t index() const { return Definition.index; }
203 
204     /// returns 'limit' rows starting from 'offset'.
205     deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const  {
206         size_t till  = (limit + offset) > count(null) ? count(null) : (limit + offset);
207         return fixtures.values()[offset .. till];
208     }
209     /// returns 'limit' rows starting from 'offset'.
210     auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const {
211         static if(table.durable){
212             auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format(
213                 table.name, limit, offset)
214             );
215             static if( Definition.decorateRows ){
216                 asSyncStruct!table[] rows;
217                 foreach(vr; resultSet){
218                     asStruct!table v = vr;
219                     asSyncStruct!table r;
220                     assignCommonFields!(typeof(r), typeof(v))(r, v);
221                     r.mars_who = "automation@server";
222                     r.mars_what = "imported";
223                     r.mars_when = Clock.currTime.toString(); 
224                     rows ~= r;
225                 }
226             }
227             else {
228                 asStruct!table[] rows;
229                 foreach(v; resultSet){
230                     rows ~= v;
231                 }
232             }
233             
234             
235             resultSet.close();
236             return rows;
237         }
238         else {
239             size_t till  = (limit + offset) > count(db) ? count(db) : (limit + offset);
240             return fixtures.values()[offset .. till];
241         }
242     }
243 
244     /// insert a new row in the server table, turning clients table out of sync
245     deprecated void insertRow(ColumnsStruct fixture){
246         KeysStruct keys = pkValues!(table)(fixture);
247         fixtures[keys] = fixture;
248         static if(table.decorateRows){
249             asSyncStruct!table rec;
250             assignCommonFields(rec, fixture);
251             with(rec){ mars_who = "automation@server"; mars_what = "inserted"; mars_when = Clock.currTime.toString(); }
252         }
253         else auto rec = fixture;
254         toInsert[keys] = rec;
255         foreach(ref cst; clientSideTables.values){
256             cst.ops ~= new ClientInsertValues!ClientT();
257         }
258     }
259 
260     /// insert a new row in the server table, turning clients table out of sync
261     ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err, string username, string clientId){
262         static if(table.durable){
263             auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err);
264         } else {
265             fixtures[pkValues!table(record)] = record;
266             auto inserted = record;
267             err = InsertError.inserted;
268         }
269         if( err == InsertError.inserted ){
270             static if(table.decorateRows){
271                 asSyncStruct!table rec;
272                 assignCommonFields(rec, inserted);
273                 with(rec){
274                     mars_who = username ~ "@" ~ clientId; mars_what = "inserted";
275                     mars_when = Clock.currTime.toString();
276                 }
277             }
278             else {
279                 auto rec = inserted;
280             }
281             toInsert[pkValues!table(inserted)] = rec;
282             // ... don't propagate if not cached, or we are triggering a loot of refresh
283             //     stick with manual refresh done on client.
284             if(table.cacheRows){
285                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
286                     auto cst = key in clientSideTables;
287                     (*cst).ops ~= new ClientInsertValues!ClientT();
288                 }
289             }
290         }
291         return inserted;
292     }
293 
294     override Bytes[2]
295     insertRecord(Database db, Bytes data, ref InsertError err, string username, string clientId){
296         import  msgpack : pack, unpack, MessagePackException;
297         ColumnsStruct record;
298         try {
299             record = unpack!(ColumnsStruct, true)(data);
300         }
301         catch(MessagePackException exc){
302             errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name);
303             errorf(exc.toString);
304             err = InsertError.unknownError;
305             return [[], []];
306         }
307         ColumnsStruct inserted = insertRecord(db, record, err, username, clientId);
308         return [
309             inserted.pack!(true).idup,
310             record.pkParamValues!table().pack!(true).idup // clientKeys
311         ];
312     }
313 
314     override void updateRecord(Database db, Bytes encodedKeys, Bytes encodedRecord, ref RequestState state, string clientId){
315         asPkStruct!table keys;
316         ColumnsStruct record;
317         try { 
318             keys = unpack!(asPkStruct!table, true)(encodedKeys); 
319             record = unpack!(ColumnsStruct, true)(encodedRecord);
320         }
321         catch(MessagePackException exc){
322             errorf("mars - failed to unpack keys for record to update '%s': maybe a wrong type in js", table.name);
323             errorf(exc.toString);
324             state = RequestState.rejectedAsDecodingFailed;
325             return;
326         }
327         updateRecord(db, keys, record, state, clientId);
328     }
329 
330     void updateRecord(Database db, asPkStruct!table keys, ColumnsStruct record, ref RequestState state, string clientId){
331         static if(table.durable){
332             db.executeUpdate!(table, asPkStruct!table, ColumnsStruct)(keys, record, state);
333         }
334         else {  }
335         if( state == RequestState.executed ){
336             static if(table.decorateRows){
337                 asSyncStruct!table rec;
338                 assignCommonFields(rec, record);
339                 with(rec){
340                     mars_who = /+username+/"qualeutente" ~ "@" ~ /+clientid+/ "qualeclient";
341                     mars_what = "updated";
342                     mars_when = Clock.currTime.toString();
343                 }
344             }
345             else {
346                 auto rec = record;
347             }
348             toUpdate[keys] = rec;
349             if(table.cacheRows){
350                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
351                     auto cst = key in clientSideTables;
352                     (*cst).ops ~= new UpdateClientRecords!ClientT();
353                 }
354             }
355         }
356     }
357 
358     override Bytes deleteRecord(Database db, Bytes data, ref DeleteError err, string username, string clientid){
359         import msgpack : pack, unpack, MessagePackException;
360         asPkParamStruct!table keys;
361         try {
362             keys = unpack!(asPkParamStruct!table, true)(data);
363         }
364         catch(MessagePackException exc){
365             errorf("mars - failed to unpack keys for record to delete '%s': maybe a wrong type in js", table.name);
366             errorf(exc.toString);
367             err = DeleteError.unknownError;
368             return data;
369         }
370         deleteRecord(db, keys, err, username, clientid);
371         if( err != DeleteError.deleted ) return data;
372         return [];
373     }
374 
375     asPkParamStruct!table
376     deleteRecord(Database db, asPkParamStruct!table keys, ref DeleteError err, string username, string clientid){
377         KeysStruct k;
378         assignFields(k, keys);
379         static if(table.durable){
380             db.executeDelete!(table, asPkParamStruct!table)(keys, err);
381         }
382         else {
383             fixtures.remove(k);
384             err = DeleteError.deleted;
385         }
386         if( err == DeleteError.deleted ){
387             static if(table.decorateRows){
388                 toDelete[k] = Sync(username ~ "@" ~ clientid, "deleted", Clock.currTime.toString());
389             }
390             else {
391                 toDelete[k] = 0;
392             }
393             foreach(key; clientSideTables.byKey.filter!( (a) => a != clientid )){
394                 auto cst = key in clientSideTables;
395                 (*cst).ops ~= new ClientDeleteValues!ClientT();
396             }
397         }
398         return keys;
399     }
400 
401     /// update row in the server table, turning the client tables out of sync
402     deprecated void updateRow(KeysStruct keys, ColumnsStruct record){
403         //KeysStruct keys = pkValues!table(record);
404         auto v = keys in toInsert;
405         if( v !is null ){
406             static if(table.decorateRows){
407                 asSyncStruct!table rec;
408                 assignCommonFields(rec, record);
409                 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
410             }
411             else {
412                 auto rec = record;
413             }
414             *v = rec;
415             assert( (keys in toUpdate) is null );
416         }
417         else {
418             auto v2 = keys in toUpdate;
419             if( v2 !is null ){
420                 static if(table.decorateRows){ assert(false); }
421                 else { *v2 = record; }
422             }
423             else {
424                 static if(table.decorateRows){ assert(false); }
425                 else { toUpdate[keys] = record; }
426             }
427         }
428         fixtures[keys] = record;
429         foreach(ref cst; clientSideTables.values){
430             cst.ops ~= new ClientUpdateValues!ClientT();
431         }
432     }
433 
434     /// update row in the server table, turning the client tables out of sync
435     void updateRow(Database db, KeysStruct keys, ColumnsStruct record){
436         static if( table.durable ){
437             import msgpack : pack;
438 
439             RequestState state;
440             db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record, state);
441             auto v = keys in toInsert;
442             if( v !is null ){
443                 static if(table.decorateRows){
444                     asSyncStruct!table rec;
445                     assignCommonFields(rec, record);
446                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
447                 }
448                 else {
449                     auto rec = record;
450                 }
451                 *v = rec;
452                 assert( (keys in toUpdate) is null );
453             }
454             else {
455                 auto v2 = keys in toUpdate;
456                 if( v2 !is null ){
457                     static if(table.decorateRows){ assert(false); }
458                     else { *v2 = record; }
459                 }
460                 else {
461                     static if(table.decorateRows){ assert(false); }
462                     else { toUpdate[keys] = record; }
463                 }
464             }
465         }
466         else {
467             //KeysStruct keys = pkValues!table(record);
468             auto v = keys in toInsert;
469             if( v !is null ){
470                 static if(table.decorateRows){
471                     asSyncStruct!table rec;
472                     assignCommonFields(rec, record);
473                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
474                 }
475                 else {
476                     auto rec = record;
477                 }
478                 *v = record;
479                 assert( (keys in toUpdate) is null );
480             }
481             else {
482                 v = keys in toUpdate;
483                 if( v !is null ){
484                     *v = record;
485                 }
486                 else {
487                     toUpdate[keys] = record;
488                 }
489             }
490             fixtures[keys] = record;
491         }
492         foreach(ref cst; clientSideTables.values){
493             cst.ops ~= new ClientUpdateValues!ClientT();
494         }
495     }
496 
497     /// returns the packet selected rows
498     override Bytes packRows(size_t offset = 0, size_t limit = long.max) const {
499         import msgpack : pack;
500         return pack!(true)(selectRows(null, offset, limit)).idup;
501     }
502     /// returns the packet selected rows
503     override Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max) const {
504         import msgpack : pack;
505         return pack!(true)(selectRows(db, offset, limit)).idup;
506     }
507 
508     /// return the packet rows to insert in the client
509     override Bytes packRowsToInsert() {
510         import msgpack : pack;
511         auto packed = pack!(true)(toInsert.values()).idup;
512         //toInsert = null; can't reset... this is called for every client
513         return packed;
514     }
515 
516     /// return the packet rows to delete in the client
517     override Bytes packRowsToDelete() {
518         import msgpack : pack;
519         asSyncPkParamStruct!(table)[] whereKeys;
520         foreach(key; toDelete.keys()){
521             import std.stdio; writeln("+++++++++++++>>>>", key, ":", toDelete[key]);
522             asSyncPkParamStruct!table whereKey;
523             assignFields(whereKey, key);
524             static if(table.decorateRows) assignCommonFields(whereKey, toDelete[key]);
525             whereKeys ~= whereKey;
526             writeln("<<<=== M<<<<<<<<<<", whereKey);
527         }
528         auto packed = pack!(true)(whereKeys).idup;
529         //toInsert = null; can't reset... this is called for every client
530         return packed;
531     }
532 
533     /// return the packet rows to update in the client
534     override Bytes packRowsToUpdate() {
535         static struct UpdateRecord {
536             KeysStruct keys;
537             static if(table.decorateRows){
538                 asSyncStruct!table record;
539             }
540             else {
541                 asStruct!table record;
542             }
543         }
544         UpdateRecord[] records;
545         foreach(r; toUpdate.keys){
546             records ~= UpdateRecord(r, toUpdate[r]);
547         }
548 
549         import msgpack : pack;
550         auto packed = pack!(true)(records).idup;
551         return packed;
552     }
553 
554     void loadFixture(ColumnsStruct fixture){
555         KeysStruct keys = pkValues!table(fixture);
556         fixtures[keys] = fixture;
557     }
558 
559     override void unsafeReset() {
560         //fixtures = null;
561         toInsert = null;
562         toUpdate = null;
563         toDelete = null;
564     }
565 
566     //static if( ! table.durable ){
567         asStruct!(table)[asPkStruct!(table)] fixtures;
568         static if(table.decorateRows){
569             asSyncStruct!(table)[asPkStruct!(table)] toInsert;
570             Sync[asPkStruct!(table)] toDelete;
571             asSyncStruct!(table)[asPkStruct!(table)] toUpdate;
572         }
573         else {
574             asStruct!(table)[asPkStruct!(table)] toInsert;
575             int[asPkStruct!(table)] toDelete;
576             asStruct!(table)[asPkStruct!(table)] toUpdate;
577         }
578 
579         // ... record inserted client side, already patched and inserted for this client.
580         //asStruct!(table)[string] notToInsert;
581     //}
582 }
583 
584 
585 struct ClientSideTable(ClientT)
586 {
587     private {
588         Strategy strategy = Strategy.easilySyncNone;
589         public SynOp!ClientT[] ops;
590     }
591 }
592 
593 private
594 {
595     enum Strategy { easilySyncAll, easilySyncNone }
596 
597     class SynOp(MarsClientT) {
598         /// deprecated way of proceding: use the method below with a null database
599         void execute(MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT){ assert(false); }
600         abstract void execute(Database, MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT);
601     }
602 
603     /// take all the rows in the server table and send them on the client table.
604     class ClientImportValues(MarsClientT) : SynOp!MarsClientT {
605 
606         override void execute(
607             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
608         ){
609             assert(db !is null);
610 
611             // ... if the table is empty, simply do nothing ...
612             if( sst.count(db) > 0 ){
613                 auto payload = sst.packRows(db);
614 
615                 auto req = ImportRecordsReq(); with(req){
616                     tableIndex = sst.index;
617                     statementIndex = indexStatementFor(sst.index, "insert");
618                     encodedRecords = payload;
619                 }
620                 marsClient.sendRequest(req);
621                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
622             }
623         }
624         
625         override
626         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
627         {
628             import mars.msg : ImportValuesRequest;
629             import std.conv : to;
630 
631             // ... if the table is empty, simply do nothing ...
632             if( sst.count(null) > 0 ){
633                 auto payload = sst.packRows();
634 
635                 auto req = ImportRecordsReq();  with(req){
636                     tableIndex =sst.index;
637                     statementIndex = indexStatementFor(sst.index, "insert");
638                     encodedRecords = payload;
639                 }
640                 marsClient.sendRequest(req);
641                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
642             }
643         }
644     }
645 
646     class ClientInsertValues(MarsClientT) : SynOp!MarsClientT {
647 
648         override
649         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
650         {
651             if( sst.countRowsToInsert > 0 ){
652                 auto payload = sst.packRowsToInsert();
653                 auto req = InsertRecordsReq(); with(req){
654                     tableIndex = sst.index;
655                     statementIndex = indexStatementFor(sst.index, "insert");
656                     encodedRecords = payload;
657                 }
658                 marsClient.sendRequest(req);
659                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
660             }
661         }
662         override
663         void execute(
664             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
665         ){
666             if( sst.countRowsToInsert > 0 ){
667                 auto payload = sst.packRowsToInsert();
668                 auto req = InsertRecordsReq(); with(req){
669                     tableIndex = sst.index;
670                     statementIndex = indexStatementFor(sst.index, "insert");
671                     encodedRecords = payload;
672                 }
673                 marsClient.sendRequest(req);
674                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
675             }
676         }
677     }
678 
679     class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT {
680 
681         override
682         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
683         {
684             if( sst.countRowsToDelete > 0 ){
685                 auto payload = sst.packRowsToDelete();
686                 auto req = DeleteRecordsReq(); with(req){
687                     tableIndex = sst.index;
688                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
689                     encodedRecords = payload;
690                 }
691                 marsClient.sendRequest(req);
692                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
693             }
694         }
695         override
696         void execute(
697             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
698         ){
699             if( sst.countRowsToDelete > 0 ){
700                 auto payload = sst.packRowsToDelete();
701                 auto req = DeleteRecordsReq(); with(req){
702                     tableIndex = sst.index;
703                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
704                     encodedRecords = payload;
705                 }
706                 marsClient.sendRequest(req);
707                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
708             }
709         }
710     }
711 
712     class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT {
713 
714         override
715         void execute(MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
716         {
717             import mars.msg : UpdateValuesRequest;
718             import std.conv :to;
719 
720             if( sst.countRowsToUpdate > 0 ){
721                 auto payload = sst.packRowsToUpdate();
722                 auto req = UpdateValuesRequest();
723                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
724                 req.bytes = payload;
725                 marsclient.sendRequest(req);
726             }
727         }
728 
729         override void execute(Database db, MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst,
730                 BaseServerSideTable!MarsClientT sst)
731         {
732             import mars.msg : UpdateValuesRequest;
733             import std.conv :to;
734 
735             if( sst.countRowsToUpdate > 0 ){
736                 auto payload = sst.packRowsToUpdate();
737                 auto req = UpdateValuesRequest();
738                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
739                 req.bytes = payload;
740                 marsclient.sendRequest(req);
741             }
742 
743         }
744     }
745 
746     class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT {
747         override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst,
748                 BaseServerSideTable!MarsClientT sst){}
749     }
750 
751     class UpdateClientRecords(MarsClientT) : SynOp!MarsClientT {
752         override
753         void execute(Database db, MarsClientT marsClient, ClientSideTable!MarsClientT* cst, BaseServerSideTable!MarsClientT sst)
754         {
755             import mars.msg : UpdateRecordsReq;
756 
757             if( sst.countRowsToUpdate > 0 ){
758                 auto req = UpdateRecordsReq();
759                 req.tableIndex = sst.index;
760                 req.encodedRecords = sst.packRowsToUpdate();
761                 marsClient.sendRequest(req);
762             }
763         }
764     }
765 }
766 
767 version(unittest)
768 {
769     struct MarsClientMock { void sendRequest(R)(R r){} }
770 }
771 unittest
772 {
773     /+
774     import std.range : zip;
775 
776     
777     auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []);
778     auto sst = new ServerSideTable!(MarsClientMock, t1);
779     zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) );
780     
781     auto cst = sst.createClientSideTable();
782     // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ...
783     assert( cst.strategy == Strategy.easilySyncAll );
784     // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati....
785     assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null );
786     // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances.
787     auto op = sst.ops[$-1];
788     op.execute(MarsClientMock(), cst, sst);
789 
790     // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1
791     sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z"));
792     assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") );
793     +/
794 }
795 /+
796 unittest
797 {
798     version(unittest_starwars){
799         import mars.starwars;
800         enum schema = starwarsSchema();
801 
802         auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]);
803         auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]);
804         auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars");
805         AuthoriseError err;
806         auto db = databaseService.connect("jedi", "force", err);
807         db.executeUnsafe("begin transaction");
808         
809         auto rows = people.selectRows(db);
810         assert( rows[0] == luke, rows[0].to!string );
811 
812         auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80);
813         InsertError ierr;
814         auto inserted = people.insertRecord(db, paolo, ierr);
815         assert(inserted == paolo);
816         
817 
818         //import std.stdio;
819         //foreach(row; rows) writeln("---->>>>>", row);
820         //assert(false);
821     }
822 }
823 +/
824