1 
2 
3 /**
4   gestiamo la parte di sincronia tra server e le varie tavole associate ai client.
5 
6 
7   */
8 module mars.sync;
9 
10 enum d =
11 `
12 BaseServerSideTable
13     clientSideTables[clientId] --- SyncOps[]
14 `;
15 
16 import std.algorithm;
17 import std.conv;
18 import std.datetime;
19 import std.format;
20 import std.meta;
21 import std.typecons;
22 import std.variant;
23 import std.base64;
24 
25 import std.experimental.logger;
26 
27 import msgpack;
28 
29 import mars.defs;
30 import mars.pgsql;
31 import mars.msg;
32 import mars.server : indexStatementFor;
33 version(unittest) import mars.starwars;
34 
35 alias Bytes = immutable(ubyte)[];
36 
37 /**
38 A server side table is instantiated only once per marsServer, and they are stored into the 'tables'
39 structure of the marsServer.
40 
41 The instantiation usually is inside the application code: `InstantiateTables!(ctTables)(marsServer, [], [], [], [])`
42 */
43 class BaseServerSideTable(ClientT)
44 {
45     alias ClientType = ClientT;
46 
47     this(immutable(Table) definition){
48         this.definition = definition;
49     }
50 
51     auto createClientSideTable(string clientid){
52         auto cst = new ClientSideTable!ClientT();
53         cst.strategy = definition.cacheRows? Strategy.easilySyncAll : Strategy.easilySyncNone;
54         final switch(cst.strategy) with(Strategy) {
55             case easilySyncAll:
56                 cst.ops ~= new ClientImportValues!ClientT();
57                 break;
58             case easilySyncNone:
59                 break;
60         }
61         // new client, new client side table
62         assert( (clientid in clientSideTables) is null, format("cliendid:%s, clientSideTables:%s",
63                     clientid, clientSideTables.keys() ) );
64         clientSideTables[clientid] = cst;
65 
66         return cst;
67     }
68 
69     auto wipeClientSideTable(string clientid){
70         assert( (clientid in clientSideTables) !is null, clientid );
71         clientSideTables.remove(clientid);
72     }
73 
74     /// execute a sql select statement, and returns a vibe json array with the records as json
75     auto selectAsJson(Database db, string sqlSelect, Variant[string] parameters, ref RequestState state) in {
76         assert(db !is null);
77     } body {
78         import std.math : isNaN;
79         import std.datetime : Date;
80         import vibe.data.json;
81 
82         Json jsonRecords = Json.emptyArray;
83         try {
84             auto resultSet = db.executeQueryUnsafe(sqlSelect, parameters);
85             scope(exit) resultSet.close();
86 
87             foreach(variantRow; resultSet){
88                 Json jsonRow = Json.emptyArray;
89                 foreach(i, variantField; variantRow){
90                     if(variantField.type == typeid(int)){ jsonRow ~= variantField.get!int; }
91                     else if(variantField.type == typeid(short)){ jsonRow ~= variantField.get!short; }
92                     else if(variantField.type == typeid(float)){ jsonRow ~= variantField.get!float; }
93                     else if(variantField.type == typeid(long)){ jsonRow ~= variantField.get!long; }
94                     else if(variantField.type == typeid(string)){ jsonRow ~= variantField.get!string; }
95                     else if(variantField.type == typeid(bool)){ jsonRow ~= variantField.get!bool; }
96                     else if(variantField.type == typeid(ubyte[])){
97                         // ... if I apply directly the '=~', the arrays are flattened!
98                         jsonRow ~= cast(string)Base64.encode(variantField.get!(ubyte[]));
99                     }
100                     else if( variantField.type == typeid(Date) ){
101                         jsonRow ~= variantField.get!Date().toISOExtString(); // '2017-11-29'
102                     }
103                     else if( variantField.type is typeid(null) ){
104                         assert(false, "Returning null as a value is not supported: use coalesce(value, something) in the query to strip it out");
105                     }
106                     else {
107                         import std.stdio; writeln(variantField.type);
108                         assert(false);
109                     }
110                 }
111                 jsonRecords ~= 0; jsonRecords[jsonRecords.length-1] = jsonRow;
112             }
113         }
114         catch(Exception e){
115             error("mars - S <-- ... - exception during query unsafe");
116             error(e.toString());
117             state = RequestState.rejectedAsPGSqlError;
118         }
119         return jsonRecords;
120     }
121     version(unittest_starwars){
122         unittest {
123             AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err);
124             auto table = new WhiteHole!BaseServerSideTable(Table());
125             auto json = table.selectAsJson(db, "select * from people", null);
126             //import std.stdio; writeln(json.toPrettyString());
127             assert(json[0][0] == "Luke");
128         }
129         unittest {
130             import std.math : isNaN;
131             import vibe.data.json : Json;
132 
133             AuthoriseError err; auto db = DatabaseService("127.0.0.1", 5432, "starwars").connect("jedi", "force", err);
134             auto table = new WhiteHole!BaseServerSideTable(Table());
135             auto json = table.selectAsJson(db, "select * from starships", null);
136             assert(json[0][0] == "NanShip");
137             // here we have a BIG problem: NaN is not encodable in Json! The value is 'null'!
138             // assert(json[0][1].get!float().isNaN);
139         }
140     }
141 
142     abstract Bytes packRows(size_t offset = 0, size_t limit = long.max);
143     abstract Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max);
144     //abstract size_t count() const;
145     abstract size_t count(Database) const;
146     abstract size_t countRowsToInsert() const;
147     abstract size_t countRowsToUpdate() const;
148     abstract size_t countRowsToDelete() const;
149     abstract size_t index() const;
150     abstract Bytes packRowsToInsert();
151     abstract Bytes packRowsToUpdate();
152     abstract Bytes packRowsToDelete();
153 
154     abstract Bytes[2] insertRecord(Database, immutable(ubyte)[], ref InsertError, string, string);
155     abstract void updateRecord(Database, Bytes, immutable(ubyte)[], ref RequestState, string);
156     abstract Bytes    deleteRecord(Database, immutable(ubyte)[], ref DeleteError, string, string);
157 
158     abstract void unsafeReset();
159 
160     immutable Table definition; 
161     private {
162 
163         /// Every server table has a collection of the linked client side tables. The key element is the identifier of
164         /// the client, so that the collection can be kept clean when a client connect/disconnects.
165         public ClientSideTable!(ClientT)*[string] clientSideTables;
166         
167         //public SynOp!ClientT[] ops;
168 
169     }
170 }
171 
172 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT
173 {
174     enum Definition = table; 
175     enum Columns = table.columns;
176      
177     alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns...
178     alias ColumnsStruct = asStruct!table; 
179     alias KeysStruct = asPkStruct!table;
180 
181     this() { super(table); } 
182 
183     // interface needed to handle the records in a generic way ...
184 
185     /// returns the total number of records we are 'talking on' (filters? query?)
186     //deprecated override size_t count() const { return fixtures.length; }
187     override size_t count(Database db) const {
188         import ddb.postgres : ServerErrorException;
189 
190         static if( table.durable ){
191             try {
192                 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name));
193             }
194             catch(ServerErrorException e){
195                 switch(e.code){
196                     case "42501": // permission denied for relation "relation"
197                         infof("Permission denied:%s", e.toString());
198                         return 0;
199                     default:
200                         warningf("mars - x ... x - Unhandled PostgreSQL exception during 'count'");
201                         info(e.toString());
202                         throw e;
203                 }
204             }
205         }
206         else {
207             return fixtures.length;
208         }
209     }
210     //static if( ! table.durable ){ // XXX
211         override size_t countRowsToInsert() const { return toInsert.length; }
212         override size_t countRowsToUpdate() const { return toUpdate.length; }
213         override size_t countRowsToDelete() const { return toDelete.length; }
214     //}
215 
216     /// return the unique index identifier for this table, that's coming from the table definition in the app.d
217     override size_t index() const { return Definition.index; }
218 
219     /// returns 'limit' rows starting from 'offset'.
220     deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const  {
221         size_t till  = (limit + offset) > count(null) ? count(null) : (limit + offset);
222         return fixtures.values()[offset .. till];
223     }
224     /// returns 'limit' rows starting from 'offset'.
225     auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const {
226         static if(table.durable){
227             auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format(
228                 table.name, limit, offset)
229             );
230             static if( Definition.decorateRows ){
231                 asSyncStruct!table[] rows;
232                 foreach(vr; resultSet){
233                     asStruct!table v = vr;
234                     asSyncStruct!table r;
235                     assignCommonFields!(typeof(r), typeof(v))(r, v);
236                     r.mars_who = "automation@server";
237                     r.mars_what = "imported";
238                     r.mars_when = Clock.currTime.toString(); 
239                     rows ~= r;
240                 }
241             }
242             else {
243                 asStruct!table[] rows;
244                 foreach(v; resultSet){
245                     rows ~= v;
246                 }
247             }
248             
249             
250             resultSet.close();
251             return rows;
252         }
253         else {
254             size_t till  = (limit + offset) > count(db) ? count(db) : (limit + offset);
255             return fixtures.values()[offset .. till];
256         }
257     }
258 
259     /// insert a new row in the server table, turning clients table out of sync
260     deprecated void insertRow(ColumnsStruct fixture){
261         KeysStruct keys = pkValues!(table)(fixture);
262         fixtures[keys] = fixture;
263         static if(table.decorateRows){
264             asSyncStruct!table rec;
265             assignCommonFields(rec, fixture);
266             with(rec){ mars_who = "automation@server"; mars_what = "inserted"; mars_when = Clock.currTime.toString(); }
267         }
268         else auto rec = fixture;
269         toInsert[keys] = rec;
270         foreach(ref cst; clientSideTables.values){
271             cst.ops ~= new ClientInsertValues!ClientT();
272         }
273     }
274 
275     /// insert a new row in the server table, turning clients table out of sync
276     ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err, string username, string clientId){
277         static if(table.durable){
278             auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err);
279         } else {
280             fixtures[pkValues!table(record)] = record;
281             auto inserted = record;
282             err = InsertError.inserted;
283         }
284         if( err == InsertError.inserted ){
285             static if(table.decorateRows){
286                 asSyncStruct!table rec;
287                 assignCommonFields(rec, inserted);
288                 with(rec){
289                     mars_who = username ~ "@" ~ clientId; mars_what = "inserted";
290                     mars_when = Clock.currTime.toString();
291                 }
292             }
293             else {
294                 auto rec = inserted;
295             }
296             toInsert[pkValues!table(inserted)] = rec;
297             // ... don't propagate if not cached, or we are triggering a loot of refresh
298             //     stick with manual refresh done on client.
299             if(table.cacheRows){
300                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
301                     auto cst = key in clientSideTables;
302                     (*cst).ops ~= new ClientInsertValues!ClientT();
303                 }
304             }
305         }
306         return inserted;
307     }
308 
309     override Bytes[2]
310     insertRecord(Database db, Bytes data, ref InsertError err, string username, string clientId){
311         import  msgpack : pack, unpack, MessagePackException;
312         ColumnsStruct record;
313         try {
314             record = unpack!(ColumnsStruct, true)(data);
315         }
316         catch(MessagePackException exc){
317             errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name);
318             errorf(exc.toString);
319             err = InsertError.unknownError;
320             return [[], []];
321         }
322         ColumnsStruct inserted = insertRecord(db, record, err, username, clientId);
323         return [
324             inserted.pack!(true).idup,
325             record.pkParamValues!table().pack!(true).idup // clientKeys
326         ];
327     }
328 
329     override void updateRecord(Database db, Bytes encodedKeys, Bytes encodedRecord, ref RequestState state, string clientId){
330         asPkStruct!table keys;
331         ColumnsStruct record;
332         try { 
333             keys = unpack!(asPkStruct!table, true)(encodedKeys); 
334             record = unpack!(ColumnsStruct, true)(encodedRecord);
335         }
336         catch(MessagePackException exc){
337             errorf("mars - failed to unpack keys for record to update '%s': maybe a wrong type in js", table.name);
338             errorf(exc.toString);
339             state = RequestState.rejectedAsDecodingFailed;
340             return;
341         }
342         updateRecord(db, keys, record, state, clientId);
343     }
344 
345     void updateRecord(Database db, asPkStruct!table keys, ColumnsStruct record, ref RequestState state, string clientId){
346         static if(table.durable){
347             db.executeUpdate!(table, asPkStruct!table, ColumnsStruct)(keys, record, state);
348         }
349         else {  }
350         if( state == RequestState.executed ){
351             static if(table.decorateRows){
352                 asSyncStruct!table rec;
353                 assignCommonFields(rec, record);
354                 with(rec){
355                     mars_who = /+username+/"qualeutente" ~ "@" ~ /+clientid+/ "qualeclient";
356                     mars_what = "updated";
357                     mars_when = Clock.currTime.toString();
358                 }
359             }
360             else {
361                 auto rec = record;
362             }
363             toUpdate[keys] = rec;
364             if(table.cacheRows){
365                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
366                     auto cst = key in clientSideTables;
367                     (*cst).ops ~= new UpdateClientRecords!ClientT();
368                 }
369             }
370         }
371     }
372 
373     override Bytes deleteRecord(Database db, Bytes data, ref DeleteError err, string username, string clientid){
374         import msgpack : pack, unpack, MessagePackException;
375         asPkParamStruct!table keys;
376         try {
377             keys = unpack!(asPkParamStruct!table, true)(data);
378         }
379         catch(MessagePackException exc){
380             errorf("mars - failed to unpack keys for record to delete '%s': maybe a wrong type in js", table.name);
381             errorf(exc.toString);
382             err = DeleteError.unknownError;
383             return data;
384         }
385         deleteRecord(db, keys, err, username, clientid);
386         if( err != DeleteError.deleted ) return data;
387         return [];
388     }
389 
390     asPkParamStruct!table
391     deleteRecord(Database db, asPkParamStruct!table keys, ref DeleteError err, string username, string clientid){
392         KeysStruct k;
393         assignFields(k, keys);
394         static if(table.durable){
395             db.executeDelete!(table, asPkParamStruct!table)(keys, err);
396         }
397         else {
398             fixtures.remove(k);
399             err = DeleteError.deleted;
400         }
401         if( err == DeleteError.deleted ){
402             static if(table.decorateRows){
403                 toDelete[k] = Sync(username ~ "@" ~ clientid, "deleted", Clock.currTime.toString());
404             }
405             else {
406                 toDelete[k] = 0;
407             }
408             foreach(key; clientSideTables.byKey.filter!( (a) => a != clientid )){
409                 auto cst = key in clientSideTables;
410                 (*cst).ops ~= new ClientDeleteValues!ClientT();
411             }
412         }
413         return keys;
414     }
415 
416     /// update row in the server table, turning the client tables out of sync
417     deprecated void updateRow(KeysStruct keys, ColumnsStruct record){
418         //KeysStruct keys = pkValues!table(record);
419         auto v = keys in toInsert;
420         if( v !is null ){
421             static if(table.decorateRows){
422                 asSyncStruct!table rec;
423                 assignCommonFields(rec, record);
424                 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
425             }
426             else {
427                 auto rec = record;
428             }
429             *v = rec;
430             assert( (keys in toUpdate) is null );
431         }
432         else {
433             auto v2 = keys in toUpdate;
434             if( v2 !is null ){
435                 static if(table.decorateRows){ assert(false); }
436                 else { *v2 = record; }
437             }
438             else {
439                 static if(table.decorateRows){ assert(false); }
440                 else { toUpdate[keys] = record; }
441             }
442         }
443         fixtures[keys] = record;
444         foreach(ref cst; clientSideTables.values){
445             cst.ops ~= new ClientUpdateValues!ClientT();
446         }
447     }
448 
449     /// update row in the server table, turning the client tables out of sync
450     void updateRow(Database db, KeysStruct keys, ColumnsStruct record){
451         static if( table.durable ){
452             import msgpack : pack;
453 
454             RequestState state;
455             db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record, state);
456             auto v = keys in toInsert;
457             if( v !is null ){
458                 static if(table.decorateRows){
459                     asSyncStruct!table rec;
460                     assignCommonFields(rec, record);
461                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
462                 }
463                 else {
464                     auto rec = record;
465                 }
466                 *v = rec;
467                 assert( (keys in toUpdate) is null );
468             }
469             else {
470                 auto v2 = keys in toUpdate;
471                 if( v2 !is null ){
472                     static if(table.decorateRows){ assert(false); }
473                     else { *v2 = record; }
474                 }
475                 else {
476                     static if(table.decorateRows){ assert(false); }
477                     else { toUpdate[keys] = record; }
478                 }
479             }
480         }
481         else {
482             //KeysStruct keys = pkValues!table(record);
483             auto v = keys in toInsert;
484             if( v !is null ){
485                 static if(table.decorateRows){
486                     asSyncStruct!table rec;
487                     assignCommonFields(rec, record);
488                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
489                 }
490                 else {
491                     auto rec = record;
492                 }
493                 *v = record;
494                 assert( (keys in toUpdate) is null );
495             }
496             else {
497                 v = keys in toUpdate;
498                 if( v !is null ){
499                     *v = record;
500                 }
501                 else {
502                     toUpdate[keys] = record;
503                 }
504             }
505             fixtures[keys] = record;
506         }
507         foreach(ref cst; clientSideTables.values){
508             cst.ops ~= new ClientUpdateValues!ClientT();
509         }
510     }
511 
512     /// returns the packet selected rows
513     override Bytes packRows(size_t offset = 0, size_t limit = long.max) const {
514         import msgpack : pack;
515         return pack!(true)(selectRows(null, offset, limit)).idup;
516     }
517     /// returns the packet selected rows
518     override Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max) const {
519         import msgpack : pack;
520         return pack!(true)(selectRows(db, offset, limit)).idup;
521     }
522 
523     /// return the packet rows to insert in the client
524     override Bytes packRowsToInsert() {
525         import msgpack : pack;
526         auto packed = pack!(true)(toInsert.values()).idup;
527         //toInsert = null; can't reset... this is called for every client
528         return packed;
529     }
530 
531     /// return the packet rows to delete in the client
532     override Bytes packRowsToDelete() {
533         import msgpack : pack;
534         asSyncPkParamStruct!(table)[] whereKeys;
535         foreach(key; toDelete.keys()){
536             import std.stdio; writeln("+++++++++++++>>>>", key, ":", toDelete[key]);
537             asSyncPkParamStruct!table whereKey;
538             assignFields(whereKey, key);
539             static if(table.decorateRows) assignCommonFields(whereKey, toDelete[key]);
540             whereKeys ~= whereKey;
541             writeln("<<<=== M<<<<<<<<<<", whereKey);
542         }
543         auto packed = pack!(true)(whereKeys).idup;
544         //toInsert = null; can't reset... this is called for every client
545         return packed;
546     }
547 
548     /// return the packet rows to update in the client
549     override Bytes packRowsToUpdate() {
550         static struct UpdateRecord {
551             KeysStruct keys;
552             static if(table.decorateRows){
553                 asSyncStruct!table record;
554             }
555             else {
556                 asStruct!table record;
557             }
558         }
559         UpdateRecord[] records;
560         foreach(r; toUpdate.keys){
561             records ~= UpdateRecord(r, toUpdate[r]);
562         }
563 
564         import msgpack : pack;
565         auto packed = pack!(true)(records).idup;
566         return packed;
567     }
568 
569     void loadFixture(ColumnsStruct fixture){
570         KeysStruct keys = pkValues!table(fixture);
571         fixtures[keys] = fixture;
572     }
573 
574     override void unsafeReset() {
575         //fixtures = null;
576         toInsert = null;
577         toUpdate = null;
578         toDelete = null;
579     }
580 
581     //static if( ! table.durable ){
582         asStruct!(table)[asPkStruct!(table)] fixtures;
583         static if(table.decorateRows){
584             asSyncStruct!(table)[asPkStruct!(table)] toInsert;
585             Sync[asPkStruct!(table)] toDelete;
586             asSyncStruct!(table)[asPkStruct!(table)] toUpdate;
587         }
588         else {
589             asStruct!(table)[asPkStruct!(table)] toInsert;
590             int[asPkStruct!(table)] toDelete;
591             asStruct!(table)[asPkStruct!(table)] toUpdate;
592         }
593 
594         // ... record inserted client side, already patched and inserted for this client.
595         //asStruct!(table)[string] notToInsert;
596     //}
597 }
598 
599 
600 struct ClientSideTable(ClientT)
601 {
602     private {
603         Strategy strategy = Strategy.easilySyncNone;
604         public SynOp!ClientT[] ops;
605     }
606 }
607 
608 private
609 {
610     enum Strategy { easilySyncAll, easilySyncNone }
611 
612     class SynOp(MarsClientT) {
613         /// deprecated way of proceding: use the method below with a null database
614         void execute(MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT){ assert(false); }
615         abstract void execute(Database, MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT);
616     }
617 
618     /// take all the rows in the server table and send them on the client table.
619     class ClientImportValues(MarsClientT) : SynOp!MarsClientT {
620 
621         override void execute(
622             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
623         ){
624             //assert(db !is null); not true for not durable table
625 
626             // ... if the table is empty, simply do nothing ...
627             if( sst.count(db) > 0 ){
628                 auto payload = sst.packRows(db);
629 
630                 auto req = ImportRecordsReq(); with(req){
631                     tableIndex = sst.index;
632                     statementIndex = indexStatementFor(sst.index, "insert");
633                     encodedRecords = payload;
634                 }
635                 marsClient.sendRequest(req);
636                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
637             }
638         }
639         
640         override
641         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
642         {
643             import mars.msg : ImportValuesRequest;
644             import std.conv : to;
645 
646             // ... if the table is empty, simply do nothing ...
647             if( sst.count(null) > 0 ){
648                 auto payload = sst.packRows();
649 
650                 auto req = ImportRecordsReq();  with(req){
651                     tableIndex =sst.index;
652                     statementIndex = indexStatementFor(sst.index, "insert");
653                     encodedRecords = payload;
654                 }
655                 marsClient.sendRequest(req);
656                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
657             }
658         }
659     }
660 
661     class ClientInsertValues(MarsClientT) : SynOp!MarsClientT {
662 
663         override
664         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
665         {
666             if( sst.countRowsToInsert > 0 ){
667                 auto payload = sst.packRowsToInsert();
668                 auto req = InsertRecordsReq(); with(req){
669                     tableIndex = sst.index;
670                     statementIndex = indexStatementFor(sst.index, "insert");
671                     encodedRecords = payload;
672                 }
673                 marsClient.sendRequest(req);
674                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
675             }
676         }
677         override
678         void execute(
679             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
680         ){
681             if( sst.countRowsToInsert > 0 ){
682                 auto payload = sst.packRowsToInsert();
683                 auto req = InsertRecordsReq(); with(req){
684                     tableIndex = sst.index;
685                     statementIndex = indexStatementFor(sst.index, "insert");
686                     encodedRecords = payload;
687                 }
688                 marsClient.sendRequest(req);
689                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
690             }
691         }
692     }
693 
694     class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT {
695 
696         override
697         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
698         {
699             if( sst.countRowsToDelete > 0 ){
700                 auto payload = sst.packRowsToDelete();
701                 auto req = DeleteRecordsReq(); with(req){
702                     tableIndex = sst.index;
703                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
704                     encodedRecords = payload;
705                 }
706                 marsClient.sendRequest(req);
707                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
708             }
709         }
710         override
711         void execute(
712             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
713         ){
714             if( sst.countRowsToDelete > 0 ){
715                 auto payload = sst.packRowsToDelete();
716                 auto req = DeleteRecordsReq(); with(req){
717                     tableIndex = sst.index;
718                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
719                     encodedRecords = payload;
720                 }
721                 marsClient.sendRequest(req);
722                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
723             }
724         }
725     }
726 
727     class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT {
728 
729         override
730         void execute(MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
731         {
732             import mars.msg : UpdateValuesRequest;
733             import std.conv :to;
734 
735             if( sst.countRowsToUpdate > 0 ){
736                 auto payload = sst.packRowsToUpdate();
737                 auto req = UpdateValuesRequest();
738                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
739                 req.bytes = payload;
740                 marsclient.sendRequest(req);
741             }
742         }
743 
744         override void execute(Database db, MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst,
745                 BaseServerSideTable!MarsClientT sst)
746         {
747             import mars.msg : UpdateValuesRequest;
748             import std.conv :to;
749 
750             if( sst.countRowsToUpdate > 0 ){
751                 auto payload = sst.packRowsToUpdate();
752                 auto req = UpdateValuesRequest();
753                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
754                 req.bytes = payload;
755                 marsclient.sendRequest(req);
756             }
757 
758         }
759     }
760 
761     class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT {
762         override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst,
763                 BaseServerSideTable!MarsClientT sst){}
764     }
765 
766     class UpdateClientRecords(MarsClientT) : SynOp!MarsClientT {
767         override
768         void execute(Database db, MarsClientT marsClient, ClientSideTable!MarsClientT* cst, BaseServerSideTable!MarsClientT sst)
769         {
770             import mars.msg : UpdateRecordsReq;
771 
772             if( sst.countRowsToUpdate > 0 ){
773                 auto req = UpdateRecordsReq();
774                 req.tableIndex = sst.index;
775                 req.encodedRecords = sst.packRowsToUpdate();
776                 marsClient.sendRequest(req);
777             }
778         }
779     }
780 }
781 
782 version(unittest)
783 {
784     struct MarsClientMock { void sendRequest(R)(R r){} }
785 }
786 unittest
787 {
788     /+
789     import std.range : zip;
790 
791     
792     auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []);
793     auto sst = new ServerSideTable!(MarsClientMock, t1);
794     zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) );
795     
796     auto cst = sst.createClientSideTable();
797     // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ...
798     assert( cst.strategy == Strategy.easilySyncAll );
799     // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati....
800     assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null );
801     // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances.
802     auto op = sst.ops[$-1];
803     op.execute(MarsClientMock(), cst, sst);
804 
805     // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1
806     sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z"));
807     assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") );
808     +/
809 }
810 /+
811 unittest
812 {
813     version(unittest_starwars){
814         import mars.starwars;
815         enum schema = starwarsSchema();
816 
817         auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]);
818         auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]);
819         auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars");
820         AuthoriseError err;
821         auto db = databaseService.connect("jedi", "force", err);
822         db.executeUnsafe("begin transaction");
823         
824         auto rows = people.selectRows(db);
825         assert( rows[0] == luke, rows[0].to!string );
826 
827         auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80);
828         InsertError ierr;
829         auto inserted = people.insertRecord(db, paolo, ierr);
830         assert(inserted == paolo);
831         
832 
833         //import std.stdio;
834         //foreach(row; rows) writeln("---->>>>>", row);
835         //assert(false);
836     }
837 }
838 +/
839