3 /**
4   gestiamo la parte di sincronia tra server e le varie tavole associate ai client.
7   */
8 module mars.sync;
10 enum d =
11 `
12 BaseServerSideTable
13     clientSideTables[clientId] --- SyncOps[]
14 `;
16 import std.algorithm;
17 import std.conv;
18 import std.datetime;
19 import std.format;
20 import std.meta;
21 import std.typecons;
22 import std.variant;
23 import std.base64;
25 import std.experimental.logger;
27 import msgpack;
29 import mars.defs;
30 import mars.pgsql;
31 import mars.msg;
32 import mars.server : indexStatementFor;
33 version(unittest) import mars.starwars;
35 alias Bytes = immutable(ubyte)[];
37 /**
38 A server side table is instantiated only once per marsServer, and they are stored into the 'tables'
39 structure of the marsServer.
41 The instantiation usually is inside the application code: `InstantiateTables!(ctTables)(marsServer, [], [], [], [])`
42 */
43 class BaseServerSideTable(ClientT)
44 {
45     alias ClientType = ClientT;
47     this(immutable(Table) definition){
48         this.definition = definition;
49     }
51     auto createClientSideTable(string clientid){
52         auto cst = new ClientSideTable!ClientT();
53         cst.strategy = definition.cacheRows? Strategy.easilySyncAll : Strategy.easilySyncNone;
54         final switch(cst.strategy) with(Strategy) {
55             case easilySyncAll:
56                 cst.ops ~= new ClientImportValues!ClientT();
57                 break;
58             case easilySyncNone:
59                 break;
60         }
61         // new client, new client side table
62         assert( (clientid in clientSideTables) is null, format("cliendid:%s, clientSideTables:%s",
63                     clientid, clientSideTables.keys() ) );
64         clientSideTables[clientid] = cst;
66         return cst;
67     }
69     auto wipeClientSideTable(string clientid){
70         assert( (clientid in clientSideTables) !is null, clientid );
71         clientSideTables.remove(clientid);
72     }
74     /// execute a sql select statement, and returns a vibe json array with the records as json
75     auto selectAsJson(Database db, string sqlSelect, Variant[string] parameters, ref RequestState state) in {
76         assert(db !is null);
77     } body {
78         import std.math : isNaN;
79         import std.datetime : Date;
80         import vibe.data.json;
82         Json jsonRecords = Json.emptyArray;
83         try {
84             auto resultSet = db.executeQueryUnsafe(sqlSelect, parameters);
85             scope(exit) resultSet.close();
87             foreach(variantRow; resultSet){
88                 Json jsonRow = Json.emptyArray;
89                 foreach(i, variantField; variantRow){
90                     if(variantField.type == typeid(int)){ jsonRow ~= variantField.get!int; }
91                     else if(variantField.type == typeid(short)){ jsonRow ~= variantField.get!short; }
92                     else if(variantField.type == typeid(float)){ jsonRow ~= variantField.get!float; }
93                     else if(variantField.type == typeid(long)){ jsonRow ~= variantField.get!long; }
94                     else if(variantField.type == typeid(string)){ jsonRow ~= variantField.get!string; }
95                     else if(variantField.type == typeid(bool)){ jsonRow ~= variantField.get!bool; }
96                     else if(variantField.type == typeid(ubyte[])){
97                         // ... if I apply directly the '=~', the arrays are flattened!
98                         jsonRow ~= cast(string)Base64.encode(variantField.get!(ubyte[]));
99                     }
100                     else if( variantField.type == typeid(Date) ){
101                         jsonRow ~= variantField.get!Date().toISOExtString(); // '2017-11-29'
102                     }
103                     else if( variantField.type is typeid(null) ){
104                         assert(false, "Returning null as a value is not supported: use coalesce(value, something) in the query to strip it out");
105                     }
106                     else {
107                         import std.stdio; writeln(variantField.type);
108                         assert(false);
109                     }
110                 }
111                 jsonRecords ~= 0; jsonRecords[jsonRecords.length-1] = jsonRow;
112             }
113         }
114         catch(Exception e){
115             error("mars - S <-- ... - exception during query unsafe");
116             error(e.toString());
117             state = RequestState.rejectedAsPGSqlError;
118         }
119         return jsonRecords;
120     }
121     version(unittest_starwars){
122         unittest {
123             AuthoriseError err; auto db = DatabaseService("", 5432, "starwars").connect("jedi", "force", err);
124             auto table = new WhiteHole!BaseServerSideTable(Table());
125             auto json = table.selectAsJson(db, "select * from people", null);
126             //import std.stdio; writeln(json.toPrettyString());
127             assert(json[0][0] == "Luke");
128         }
129         unittest {
130             import std.math : isNaN;
131             import vibe.data.json : Json;
133             AuthoriseError err; auto db = DatabaseService("", 5432, "starwars").connect("jedi", "force", err);
134             auto table = new WhiteHole!BaseServerSideTable(Table());
135             auto json = table.selectAsJson(db, "select * from starships", null);
136             assert(json[0][0] == "NanShip");
137             // here we have a BIG problem: NaN is not encodable in Json! The value is 'null'!
138             // assert(json[0][1].get!float().isNaN);
139         }
140     }
142     abstract Bytes packRows(size_t offset = 0, size_t limit = long.max);
143     abstract Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max);
144     //abstract size_t count() const;
145     abstract size_t count(Database) const;
146     abstract size_t countRowsToInsert() const;
147     abstract size_t countRowsToUpdate() const;
148     abstract size_t countRowsToDelete() const;
149     abstract size_t index() const;
150     abstract Bytes packRowsToInsert();
151     abstract Bytes packRowsToUpdate();
152     abstract Bytes packRowsToDelete();
154     abstract Bytes[2] insertRecord(Database, immutable(ubyte)[], ref InsertError, string, string);
155     abstract void updateRecord(Database, Bytes, immutable(ubyte)[], ref RequestState, string);
156     abstract Bytes    deleteRecord(Database, immutable(ubyte)[], ref DeleteError, string, string);
158     abstract void unsafeReset();
160     immutable Table definition; 
161     private {
163         /// Every server table has a collection of the linked client side tables. The key element is the identifier of
164         /// the client, so that the collection can be kept clean when a client connect/disconnects.
165         public ClientSideTable!(ClientT)*[string] clientSideTables;
167         //public SynOp!ClientT[] ops;
169     }
170 }
172 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT
173 {
174     enum Definition = table; 
175     enum Columns = table.columns;
177     alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns...
178     alias ColumnsStruct = asStruct!table; 
179     alias KeysStruct = asPkStruct!table;
181     this() { super(table); } 
183     // interface needed to handle the records in a generic way ...
185     /// returns the total number of records we are 'talking on' (filters? query?)
186     //deprecated override size_t count() const { return fixtures.length; }
187     override size_t count(Database db) const {
188         import ddb.postgres : ServerErrorException;
190         static if( table.durable ){
191             try {
192                 return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name));
193             }
194             catch(ServerErrorException e){
195                 switch(e.code){
196                     case "42501": // permission denied for relation "relation"
197                         infof("Permission denied:%s", e.toString());
198                         return 0;
199                     default:
200                         warningf("mars - x ... x - Unhandled PostgreSQL exception during 'count'");
201                         info(e.toString());
202                         throw e;
203                 }
204             }
205         }
206         else {
207             return fixtures.length;
208         }
209     }
210     //static if( ! table.durable ){ // XXX
211         override size_t countRowsToInsert() const { return toInsert.length; }
212         override size_t countRowsToUpdate() const { return toUpdate.length; }
213         override size_t countRowsToDelete() const { return toDelete.length; }
214     //}
216     /// return the unique index identifier for this table, that's coming from the table definition in the app.d
217     override size_t index() const { return Definition.index; }
219     /// returns 'limit' rows starting from 'offset'.
220     deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const  {
221         size_t till  = (limit + offset) > count(null) ? count(null) : (limit + offset);
222         return fixtures.values()[offset .. till];
223     }
224     /// returns 'limit' rows starting from 'offset'.
225     auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const {
226         static if(table.durable){
227             auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format(
228                 table.name, limit, offset)
229             );
230             static if( Definition.decorateRows ){
231                 asSyncStruct!table[] rows;
232                 foreach(vr; resultSet){
233                     asStruct!table v = vr;
234                     asSyncStruct!table r;
235                     assignCommonFields!(typeof(r), typeof(v))(r, v);
236                     r.mars_who = "automation@server";
237                     r.mars_what = "imported";
238                     r.mars_when = Clock.currTime.toString(); 
239                     rows ~= r;
240                 }
241             }
242             else {
243                 asStruct!table[] rows;
244                 foreach(v; resultSet){
245                     rows ~= v;
246                 }
247             }
250             resultSet.close();
251             return rows;
252         }
253         else {
254             size_t till  = (limit + offset) > count(db) ? count(db) : (limit + offset);
255             return fixtures.values()[offset .. till];
256         }
257     }
259     /// insert a new row in the server table, turning clients table out of sync
260     deprecated void insertRow(ColumnsStruct fixture){
261         KeysStruct keys = pkValues!(table)(fixture);
262         fixtures[keys] = fixture;
263         static if(table.decorateRows){
264             asSyncStruct!table rec;
265             assignCommonFields(rec, fixture);
266             with(rec){ mars_who = "automation@server"; mars_what = "inserted"; mars_when = Clock.currTime.toString(); }
267         }
268         else auto rec = fixture;
269         toInsert[keys] = rec;
270         foreach(ref cst; clientSideTables.values){
271             cst.ops ~= new ClientInsertValues!ClientT();
272         }
273     }
275     /// insert a new row in the server table, turning clients table out of sync
276     ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err, string username, string clientId){
277         static if(table.durable){
278             auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err);
279         } else {
280             fixtures[pkValues!table(record)] = record;
281             auto inserted = record;
282             err = InsertError.inserted;
283         }
284         if( err == InsertError.inserted ){
285             static if(table.decorateRows){
286                 asSyncStruct!table rec;
287                 assignCommonFields(rec, inserted);
288                 with(rec){
289                     mars_who = username ~ "@" ~ clientId; mars_what = "inserted";
290                     mars_when = Clock.currTime.toString();
291                 }
292             }
293             else {
294                 auto rec = inserted;
295             }
296             toInsert[pkValues!table(inserted)] = rec;
297             // ... don't propagate if not cached, or we are triggering a loot of refresh
298             //     stick with manual refresh done on client.
299             if(table.cacheRows){
300                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
301                     auto cst = key in clientSideTables;
302                     (*cst).ops ~= new ClientInsertValues!ClientT();
303                 }
304             }
305         }
306         return inserted;
307     }
309     override Bytes[2]
310     insertRecord(Database db, Bytes data, ref InsertError err, string username, string clientId){
311         import  msgpack : pack, unpack, MessagePackException;
312         ColumnsStruct record;
313         try {
314             record = unpack!(ColumnsStruct, true)(data);
315         }
316         catch(MessagePackException exc){
317             errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name);
318             errorf(exc.toString);
319             err = InsertError.unknownError;
320             return [[], []];
321         }
322         ColumnsStruct inserted = insertRecord(db, record, err, username, clientId);
323         return [
324             inserted.pack!(true).idup,
325             record.pkParamValues!table().pack!(true).idup // clientKeys
326         ];
327     }
329     override void updateRecord(Database db, Bytes encodedKeys, Bytes encodedRecord, ref RequestState state, string clientId){
330         asPkStruct!table keys;
331         ColumnsStruct record;
332         try { 
333             keys = unpack!(asPkStruct!table, true)(encodedKeys); 
334             record = unpack!(ColumnsStruct, true)(encodedRecord);
335         }
336         catch(MessagePackException exc){
337             errorf("mars - failed to unpack keys for record to update '%s': maybe a wrong type in js", table.name);
338             errorf(exc.toString);
339             state = RequestState.rejectedAsDecodingFailed;
340             return;
341         }
342         updateRecord(db, keys, record, state, clientId);
343     }
345     void updateRecord(Database db, asPkStruct!table keys, ColumnsStruct record, ref RequestState state, string clientId){
346         static if(table.durable){
347             db.executeUpdate!(table, asPkStruct!table, ColumnsStruct)(keys, record, state);
348         }
349         else {  }
350         if( state == RequestState.executed ){
351             static if(table.decorateRows){
352                 asSyncStruct!table rec;
353                 assignCommonFields(rec, record);
354                 with(rec){
355                     mars_who = /+username+/"qualeutente" ~ "@" ~ /+clientid+/ "qualeclient";
356                     mars_what = "updated";
357                     mars_when = Clock.currTime.toString();
358                 }
359             }
360             else {
361                 auto rec = record;
362             }
363             toUpdate[keys] = rec;
364             if(table.cacheRows){
365                 foreach(key; clientSideTables.byKey.filter!( (a) => a != clientId )){
366                     auto cst = key in clientSideTables;
367                     (*cst).ops ~= new UpdateClientRecords!ClientT();
368                 }
369             }
370         }
371     }
373     override Bytes deleteRecord(Database db, Bytes data, ref DeleteError err, string username, string clientid){
374         import msgpack : pack, unpack, MessagePackException;
375         asPkParamStruct!table keys;
376         try {
377             keys = unpack!(asPkParamStruct!table, true)(data);
378         }
379         catch(MessagePackException exc){
380             errorf("mars - failed to unpack keys for record to delete '%s': maybe a wrong type in js", table.name);
381             errorf(exc.toString);
382             err = DeleteError.unknownError;
383             return data;
384         }
385         deleteRecord(db, keys, err, username, clientid);
386         if( err != DeleteError.deleted ) return data;
387         return [];
388     }
390     asPkParamStruct!table
391     deleteRecord(Database db, asPkParamStruct!table keys, ref DeleteError err, string username, string clientid){
392         KeysStruct k;
393         assignFields(k, keys);
394         static if(table.durable){
395             db.executeDelete!(table, asPkParamStruct!table)(keys, err);
396         }
397         else {
398             fixtures.remove(k);
399             err = DeleteError.deleted;
400         }
401         if( err == DeleteError.deleted ){
402             static if(table.decorateRows){
403                 toDelete[k] = Sync(username ~ "@" ~ clientid, "deleted", Clock.currTime.toString());
404             }
405             else {
406                 toDelete[k] = 0;
407             }
408             foreach(key; clientSideTables.byKey.filter!( (a) => a != clientid )){
409                 auto cst = key in clientSideTables;
410                 (*cst).ops ~= new ClientDeleteValues!ClientT();
411             }
412         }
413         return keys;
414     }
416     /// update row in the server table, turning the client tables out of sync
417     deprecated void updateRow(KeysStruct keys, ColumnsStruct record){
418         //KeysStruct keys = pkValues!table(record);
419         auto v = keys in toInsert;
420         if( v !is null ){
421             static if(table.decorateRows){
422                 asSyncStruct!table rec;
423                 assignCommonFields(rec, record);
424                 with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
425             }
426             else {
427                 auto rec = record;
428             }
429             *v = rec;
430             assert( (keys in toUpdate) is null );
431         }
432         else {
433             auto v2 = keys in toUpdate;
434             if( v2 !is null ){
435                 static if(table.decorateRows){ assert(false); }
436                 else { *v2 = record; }
437             }
438             else {
439                 static if(table.decorateRows){ assert(false); }
440                 else { toUpdate[keys] = record; }
441             }
442         }
443         fixtures[keys] = record;
444         foreach(ref cst; clientSideTables.values){
445             cst.ops ~= new ClientUpdateValues!ClientT();
446         }
447     }
449     /// update row in the server table, turning the client tables out of sync
450     void updateRow(Database db, KeysStruct keys, ColumnsStruct record){
451         static if( table.durable ){
452             import msgpack : pack;
454             RequestState state;
455             db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record, state);
456             auto v = keys in toInsert;
457             if( v !is null ){
458                 static if(table.decorateRows){
459                     asSyncStruct!table rec;
460                     assignCommonFields(rec, record);
461                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
462                 }
463                 else {
464                     auto rec = record;
465                 }
466                 *v = rec;
467                 assert( (keys in toUpdate) is null );
468             }
469             else {
470                 auto v2 = keys in toUpdate;
471                 if( v2 !is null ){
472                     static if(table.decorateRows){ assert(false); }
473                     else { *v2 = record; }
474                 }
475                 else {
476                     static if(table.decorateRows){ assert(false); }
477                     else { toUpdate[keys] = record; }
478                 }
479             }
480         }
481         else {
482             //KeysStruct keys = pkValues!table(record);
483             auto v = keys in toInsert;
484             if( v !is null ){
485                 static if(table.decorateRows){
486                     asSyncStruct!table rec;
487                     assignCommonFields(rec, record);
488                     with(rec){ mars_who = "who@where"; mars_what = "updated"; mars_when = Clock.currTime.toString(); }
489                 }
490                 else {
491                     auto rec = record;
492                 }
493                 *v = record;
494                 assert( (keys in toUpdate) is null );
495             }
496             else {
497                 v = keys in toUpdate;
498                 if( v !is null ){
499                     *v = record;
500                 }
501                 else {
502                     toUpdate[keys] = record;
503                 }
504             }
505             fixtures[keys] = record;
506         }
507         foreach(ref cst; clientSideTables.values){
508             cst.ops ~= new ClientUpdateValues!ClientT();
509         }
510     }
512     /// returns the packet selected rows
513     override Bytes packRows(size_t offset = 0, size_t limit = long.max) const {
514         import msgpack : pack;
515         return pack!(true)(selectRows(null, offset, limit)).idup;
516     }
517     /// returns the packet selected rows
518     override Bytes packRows(Database db, size_t offset = 0, size_t limit = long.max) const {
519         import msgpack : pack;
520         return pack!(true)(selectRows(db, offset, limit)).idup;
521     }
523     /// return the packet rows to insert in the client
524     override Bytes packRowsToInsert() {
525         import msgpack : pack;
526         auto packed = pack!(true)(toInsert.values()).idup;
527         //toInsert = null; can't reset... this is called for every client
528         return packed;
529     }
531     /// return the packet rows to delete in the client
532     override Bytes packRowsToDelete() {
533         import msgpack : pack;
534         asSyncPkParamStruct!(table)[] whereKeys;
535         foreach(key; toDelete.keys()){
536             import std.stdio; writeln("+++++++++++++>>>>", key, ":", toDelete[key]);
537             asSyncPkParamStruct!table whereKey;
538             assignFields(whereKey, key);
539             static if(table.decorateRows) assignCommonFields(whereKey, toDelete[key]);
540             whereKeys ~= whereKey;
541             writeln("<<<=== M<<<<<<<<<<", whereKey);
542         }
543         auto packed = pack!(true)(whereKeys).idup;
544         //toInsert = null; can't reset... this is called for every client
545         return packed;
546     }
548     /// return the packet rows to update in the client
549     override Bytes packRowsToUpdate() {
550         static struct UpdateRecord {
551             KeysStruct keys;
552             static if(table.decorateRows){
553                 asSyncStruct!table record;
554             }
555             else {
556                 asStruct!table record;
557             }
558         }
559         UpdateRecord[] records;
560         foreach(r; toUpdate.keys){
561             records ~= UpdateRecord(r, toUpdate[r]);
562         }
564         import msgpack : pack;
565         auto packed = pack!(true)(records).idup;
566         return packed;
567     }
569     void loadFixture(ColumnsStruct fixture){
570         KeysStruct keys = pkValues!table(fixture);
571         fixtures[keys] = fixture;
572     }
574     override void unsafeReset() {
575         //fixtures = null;
576         toInsert = null;
577         toUpdate = null;
578         toDelete = null;
579     }
581     //static if( ! table.durable ){
582         asStruct!(table)[asPkStruct!(table)] fixtures;
583         static if(table.decorateRows){
584             asSyncStruct!(table)[asPkStruct!(table)] toInsert;
585             Sync[asPkStruct!(table)] toDelete;
586             asSyncStruct!(table)[asPkStruct!(table)] toUpdate;
587         }
588         else {
589             asStruct!(table)[asPkStruct!(table)] toInsert;
590             int[asPkStruct!(table)] toDelete;
591             asStruct!(table)[asPkStruct!(table)] toUpdate;
592         }
594         // ... record inserted client side, already patched and inserted for this client.
595         //asStruct!(table)[string] notToInsert;
596     //}
597 }
600 struct ClientSideTable(ClientT)
601 {
602     private {
603         Strategy strategy = Strategy.easilySyncNone;
604         public SynOp!ClientT[] ops;
605     }
606 }
608 private
609 {
610     enum Strategy { easilySyncAll, easilySyncNone }
612     class SynOp(MarsClientT) {
613         /// deprecated way of proceding: use the method below with a null database
614         void execute(MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT){ assert(false); }
615         abstract void execute(Database, MarsClientT, ClientSideTable!(MarsClientT)*, BaseServerSideTable!MarsClientT);
616     }
618     /// take all the rows in the server table and send them on the client table.
619     class ClientImportValues(MarsClientT) : SynOp!MarsClientT {
621         override void execute(
622             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
623         ){
624             //assert(db !is null); not true for not durable table
626             // ... if the table is empty, simply do nothing ...
627             if( sst.count(db) > 0 ){
628                 auto payload = sst.packRows(db);
630                 auto req = ImportRecordsReq(); with(req){
631                     tableIndex = sst.index;
632                     statementIndex = indexStatementFor(sst.index, "insert");
633                     encodedRecords = payload;
634                 }
635                 marsClient.sendRequest(req);
636                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
637             }
638         }
640         override
641         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
642         {
643             import mars.msg : ImportValuesRequest;
644             import std.conv : to;
646             // ... if the table is empty, simply do nothing ...
647             if( sst.count(null) > 0 ){
648                 auto payload = sst.packRows();
650                 auto req = ImportRecordsReq();  with(req){
651                     tableIndex =sst.index;
652                     statementIndex = indexStatementFor(sst.index, "insert");
653                     encodedRecords = payload;
654                 }
655                 marsClient.sendRequest(req);
656                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!ImportRecordsRep();
657             }
658         }
659     }
661     class ClientInsertValues(MarsClientT) : SynOp!MarsClientT {
663         override
664         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
665         {
666             if( sst.countRowsToInsert > 0 ){
667                 auto payload = sst.packRowsToInsert();
668                 auto req = InsertRecordsReq(); with(req){
669                     tableIndex = sst.index;
670                     statementIndex = indexStatementFor(sst.index, "insert");
671                     encodedRecords = payload;
672                 }
673                 marsClient.sendRequest(req);
674                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
675             }
676         }
677         override
678         void execute(
679             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
680         ){
681             if( sst.countRowsToInsert > 0 ){
682                 auto payload = sst.packRowsToInsert();
683                 auto req = InsertRecordsReq(); with(req){
684                     tableIndex = sst.index;
685                     statementIndex = indexStatementFor(sst.index, "insert");
686                     encodedRecords = payload;
687                 }
688                 marsClient.sendRequest(req);
689                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!InsertRecordsRep();
690             }
691         }
692     }
694     class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT {
696         override
697         void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
698         {
699             if( sst.countRowsToDelete > 0 ){
700                 auto payload = sst.packRowsToDelete();
701                 auto req = DeleteRecordsReq(); with(req){
702                     tableIndex = sst.index;
703                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
704                     encodedRecords = payload;
705                 }
706                 marsClient.sendRequest(req);
707                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
708             }
709         }
710         override
711         void execute(
712             Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst
713         ){
714             if( sst.countRowsToDelete > 0 ){
715                 auto payload = sst.packRowsToDelete();
716                 auto req = DeleteRecordsReq(); with(req){
717                     tableIndex = sst.index;
718                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
719                     encodedRecords = payload;
720                 }
721                 marsClient.sendRequest(req);
722                 if(marsClient.isConnected) auto rep = marsClient.receiveReply!DeleteRecordsRep();
723             }
724         }
725     }
727     class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT {
729         override
730         void execute(MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
731         {
732             import mars.msg : UpdateValuesRequest;
733             import std.conv :to;
735             if( sst.countRowsToUpdate > 0 ){
736                 auto payload = sst.packRowsToUpdate();
737                 auto req = UpdateValuesRequest();
738                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
739                 req.bytes = payload;
740                 marsclient.sendRequest(req);
741             }
742         }
744         override void execute(Database db, MarsClientT marsclient, ClientSideTable!(MarsClientT)* cst,
745                 BaseServerSideTable!MarsClientT sst)
746         {
747             import mars.msg : UpdateValuesRequest;
748             import std.conv :to;
750             if( sst.countRowsToUpdate > 0 ){
751                 auto payload = sst.packRowsToUpdate();
752                 auto req = UpdateValuesRequest();
753                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
754                 req.bytes = payload;
755                 marsclient.sendRequest(req);
756             }
758         }
759     }
761     class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT {
762         override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst,
763                 BaseServerSideTable!MarsClientT sst){}
764     }
766     class UpdateClientRecords(MarsClientT) : SynOp!MarsClientT {
767         override
768         void execute(Database db, MarsClientT marsClient, ClientSideTable!MarsClientT* cst, BaseServerSideTable!MarsClientT sst)
769         {
770             import mars.msg : UpdateRecordsReq;
772             if( sst.countRowsToUpdate > 0 ){
773                 auto req = UpdateRecordsReq();
774                 req.tableIndex = sst.index;
775                 req.encodedRecords = sst.packRowsToUpdate();
776                 marsClient.sendRequest(req);
777             }
778         }
779     }
780 }
782 version(unittest)
783 {
784     struct MarsClientMock { void sendRequest(R)(R r){} }
785 }
786 unittest
787 {
788     /+
789     import std.range : zip;
792     auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []);
793     auto sst = new ServerSideTable!(MarsClientMock, t1);
794     zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) );
796     auto cst = sst.createClientSideTable();
797     // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ...
798     assert( cst.strategy == Strategy.easilySyncAll );
799     // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati....
800     assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null );
801     // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances.
802     auto op = sst.ops[$-1];
803     op.execute(MarsClientMock(), cst, sst);
805     // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1
806     sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z"));
807     assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") );
808     +/
809 }
810 /+
811 unittest
812 {
813     version(unittest_starwars){
814         import mars.starwars;
815         enum schema = starwarsSchema();
817         auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]);
818         auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]);
819         auto databaseService = DatabaseService("", 5432, "starwars");
820         AuthoriseError err;
821         auto db = databaseService.connect("jedi", "force", err);
822         db.executeUnsafe("begin transaction");
824         auto rows = people.selectRows(db);
825         assert( rows[0] == luke, rows[0].to!string );
827         auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80);
828         InsertError ierr;
829         auto inserted = people.insertRecord(db, paolo, ierr);
830         assert(inserted == paolo);
833         //import std.stdio;
834         //foreach(row; rows) writeln("---->>>>>", row);
835         //assert(false);
836     }
837 }
838 +/