1 
2 
3 /**
4   gestiamo la parte di sincronia tra server e le varie tavole associate ai client.
5   */
6 
7 module mars.sync;
8 
9 import std.algorithm;
10 import std.meta;
11 import std.typecons;
12 import std.experimental.logger;
13 import std.format;
14 import std.conv;
15 
16 import mars.defs;
17 import mars.pgsql;
18 import mars.msg;
19 import mars.server;
20 
21 class BaseServerSideTable(ClientT)
22 {
23     alias ClientType = ClientT;
24 
25     this(immutable(Table) definition){
26         this.definition = definition;
27     }
28 
29     auto createClientSideTable(string clientid){
30         auto cst = new ClientSideTable!ClientT();
31         final switch(cst.strategy) with(Strategy) {
32             case easilySyncAll:
33                 cst.ops ~= new ClientImportValues!ClientT();
34         }
35         // new client, new client side table
36         assert( (clientid in clientSideTables) is null );
37         clientSideTables[clientid] = cst;
38 
39         return cst;
40     }
41 
42     abstract immutable(ubyte)[] packRows(size_t offset = 0, size_t limit = long.max);
43     abstract immutable(ubyte)[] packRows(Database db, size_t offset = 0, size_t limit = long.max);
44     abstract size_t count() const;
45     abstract size_t count(Database) const;
46     abstract size_t countRowsToInsert() const;
47     abstract size_t countRowsToUpdate() const;
48     abstract size_t countRowsToDelete() const;
49     abstract size_t index() const;
50     abstract immutable(ubyte)[] packRowsToInsert();
51     abstract immutable(ubyte)[] packRowsToUpdate();
52     abstract immutable(ubyte)[] packRowsToDelete();
53 
54     abstract immutable(ubyte)[][2] insertRecord(Database, immutable(ubyte)[], ref InsertError);
55     abstract immutable(ubyte)[]    deleteRecord(Database, immutable(ubyte)[], ref DeleteError);
56 
57     abstract void unsafeReset();
58 
59     immutable Table definition; 
60     private {
61 
62         /// Every server table has a collection of the linked client side tables. The key element is the identifier of
63         /// the client, so that the collection can be kept clean when a client connect/disconnects.
64         public ClientSideTable!(ClientT)*[string] clientSideTables;
65         
66         //public SynOp!ClientT[] ops;
67 
68     }
69 }
70 
71 class ServerSideTable(ClientT, immutable(Table) table) : BaseServerSideTable!ClientT
72 {
73     enum Definition = table; 
74     enum Columns = table.columns;
75      
76     alias ColumnsType = asD!Columns; /// an AliasSeq of the D types for the table columns...
77     alias ColumnsStruct = asStruct!table; 
78     alias KeysStruct = asPkStruct!table;
79 
80     this() { super(table); } 
81 
82     // interface needed to handle the records in a generic way ...
83 
84     /// returns the total number of records we are 'talking on' (filters? query?)
85     deprecated override size_t count() const { return fixtures.length; }
86     override size_t count(Database db) const {
87         static if( table.durable ){
88             return db.executeScalarUnsafe!size_t("select count(*) from %s".format(table.name));
89         }
90         else {
91             return fixtures.length;
92         }
93     }
94     //static if( ! table.durable ){ // XXX
95         override size_t countRowsToInsert() const { return toInsert.length; }
96         override size_t countRowsToUpdate() const { return toUpdate.length; }
97         override size_t countRowsToDelete() const { return toDelete.length; }
98     //}
99 
100     /// return the unique index identifier for this table, that's coming from the table definition in the app.d
101     override size_t index() const { return Definition.index; }
102 
103     /// returns 'limit' rows starting from 'offset'.
104     deprecated auto selectRows(size_t offset = 0, size_t limit = long.max) const  {
105         size_t till  = (limit + offset) > count ? count : (limit + offset);
106         return fixtures.values()[offset .. till];
107     }
108     /// returns 'limit' rows starting from 'offset'.
109     auto selectRows(Database db, size_t offset = 0, size_t limit = long.max) const {
110         static if(table.durable){
111             auto resultSet = db.executeQueryUnsafe!(asStruct!table)("select * from %s limit %d offset %d".format(
112                 table.name, limit, offset)
113             );
114             asStruct!table[] rows;
115             foreach(v; resultSet){
116                 rows ~= v;
117                 //import std.stdio; writeln("selectRows:", v);
118                 // XXX
119             }
120             resultSet.close();
121             return rows;
122         }
123         else {
124             size_t till  = (limit + offset) > count(db) ? count(db) : (limit + offset);
125             return fixtures.values()[offset .. till];
126         }
127     }
128 
129     /// insert a new row in the server table, turning clients table out of sync
130     deprecated void insertRow(ColumnsStruct fixture){
131         KeysStruct keys = pkValues!(table)(fixture);
132         fixtures[keys] = fixture;
133         toInsert[keys] = fixture;
134         foreach(ref cst; clientSideTables.values){
135             cst.ops ~= new ClientInsertValues!ClientT();
136         }
137     }
138 
139     /// insert a new row in the server table, turning clients table out of sync
140     ColumnsStruct insertRecord(Database db, ColumnsStruct record, ref InsertError err){
141         static if(table.durable){
142             auto inserted = db.executeInsert!(table, ColumnsStruct)(record, err);
143             KeysStruct keys = pkValues!table(record);
144             toInsert[keys] = record;
145         }
146         else {
147             auto inserted = record;
148             KeysStruct keys = pkValues!table(record);
149             fixtures[keys] = record;
150             toInsert[keys] = record;
151         }
152         foreach(ref cst; clientSideTables.values){
153             cst.ops ~= new ClientInsertValues!ClientT();
154         }
155         return inserted;
156     }
157 
158     override immutable(ubyte)[][2] insertRecord(Database db, immutable(ubyte)[] data, ref InsertError err){
159         import  msgpack : pack, unpack, MessagePackException;
160         ColumnsStruct record;
161         try {
162             record = unpack!(ColumnsStruct, true)(data);
163         }
164         catch(MessagePackException exc){
165             errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name);
166             errorf(exc.toString);
167             err = InsertError.unknownError;
168             return [[], []];
169         }
170         ColumnsStruct inserted = insertRecord(db, record, err);
171         return [
172             inserted.pack!(true).idup, 
173             record.pkParamValues!table().pack!(true).idup // clientKeys
174         ];
175     }
176 
177     override immutable(ubyte)[] deleteRecord(Database db, immutable(ubyte)[] data, ref DeleteError err){
178         import msgpack : pack, unpack, MessagePackException;
179         asStruct!table record;
180         try {
181             record = unpack!(ColumnsStruct, true)(data);
182         }
183         catch(MessagePackException exc){
184             errorf("mars - failed to unpack record to insert in '%s': maybe a wrong type of data in js", table.name);
185             errorf(exc.toString);
186             err = DeleteError.unknownError;
187             return data;
188         }
189         KeysStruct keys = record.pkValues!table();
190         deleteRecord(db, keys, err);
191         if( err != DeleteError.deleted ) return data;
192         return [];
193     }
194 
195     KeysStruct deleteRecord(Database db, KeysStruct keys, ref DeleteError err){
196         static if(table.durable){
197             db.executeDelete!(table, KeysStruct)(keys, err);
198             toDelete[keys] = 0;
199         }
200         else {
201             fixtures.remove(keys);
202             toDelete[keys] = 0;
203         }
204         foreach(ref cst; clientSideTables.values){
205             cst.ops ~= new ClientDeleteValues!ClientT();
206         }
207         return keys;
208     }
209 
210     /// update row in the server table, turning the client tables out of sync
211     deprecated void updateRow(KeysStruct keys, ColumnsStruct record){
212         //KeysStruct keys = pkValues!table(record);
213         auto v = keys in toInsert;
214         if( v !is null ){ 
215             *v = record;
216             assert( (keys in toUpdate) is null ); 
217         }
218         else {
219             v = keys in toUpdate;
220             if( v !is null ){
221                 *v = record;
222             }
223             else {
224                 toUpdate[keys] = record;
225             }
226         }
227         fixtures[keys] = record;
228         foreach(ref cst; clientSideTables.values){
229             cst.ops ~= new ClientUpdateValues!ClientT();
230         }
231     }
232 
233     /// update row in the server table, turning the client tables out of sync
234     void updateRow(Database db, KeysStruct keys, ColumnsStruct record){
235         static if( table.durable ){
236             import msgpack : pack;
237 
238             db.executeUpdate!(table, KeysStruct, ColumnsStruct)(keys, record);
239             auto v = keys in toInsert;
240             if( v !is null ){ 
241                 *v = record;
242                 assert( (keys in toUpdate) is null ); 
243             }
244             else {
245                 v = keys in toUpdate;
246                 if( v !is null ){
247                     *v = record;
248                 }
249                 else {
250                     toUpdate[keys] = record;
251                 }
252             }
253         }
254         else {
255             //KeysStruct keys = pkValues!table(record);
256             auto v = keys in toInsert;
257             if( v !is null ){ 
258                 *v = record;
259                 assert( (keys in toUpdate) is null ); 
260             }
261             else {
262                 v = keys in toUpdate;
263                 if( v !is null ){
264                     *v = record;
265                 }
266                 else {
267                     toUpdate[keys] = record;
268                 }
269             }
270             fixtures[keys] = record;
271         }
272         foreach(ref cst; clientSideTables.values){
273             cst.ops ~= new ClientUpdateValues!ClientT();
274         }
275     }
276 
277     /// returns the packet selected rows
278     override immutable(ubyte)[] packRows(size_t offset = 0, size_t limit = long.max) const {
279         import msgpack : pack;
280         return pack!(true)(selectRows(null, offset, limit)).idup;
281     }
282     /// returns the packet selected rows
283     override immutable(ubyte)[] packRows(Database db, size_t offset = 0, size_t limit = long.max) const {
284         import msgpack : pack;
285         return pack!(true)(selectRows(db, offset, limit)).idup;
286     }
287 
288     /// return the packet rows to insert in the client
289     override immutable(ubyte)[] packRowsToInsert() {
290         import msgpack : pack;
291         auto packed = pack!(true)(toInsert.values()).idup;
292         //toInsert = null; can't reset... this is called for every client
293         return packed;
294     }
295 
296     /// return the packet rows to delete in the client
297     override immutable(ubyte)[] packRowsToDelete() {
298         import msgpack : pack;
299         auto packed = pack!(true)(toDelete.keys()).idup;
300         //toInsert = null; can't reset... this is called for every client
301         return packed;
302     }
303 
304     /// return the packet rows to update in the client
305     override immutable(ubyte)[] packRowsToUpdate() {
306         static struct UpdateRecord {
307             KeysStruct keys;
308             asStruct!table record;
309         }
310         UpdateRecord[] records;
311         foreach(r; toUpdate.keys){
312             records ~= UpdateRecord(r, toUpdate[r]);
313         }
314 
315         import msgpack : pack;
316         auto packed = pack!(true)(records).idup;
317         //toUpdate = null; can't reset... this is called for every client
318         return packed;
319     }
320 
321     void loadFixture(ColumnsStruct fixture){
322         KeysStruct keys = pkValues!table(fixture);
323         fixtures[keys] = fixture;
324     }
325 
326     override void unsafeReset() {
327         //fixtures = null;
328         toInsert = null;
329         toUpdate = null;
330         toDelete = null;
331     }
332 
333     //static if( ! table.durable ){
334         asStruct!(table)[asPkStruct!(table)] fixtures;
335         asStruct!(table)[asPkStruct!(table)] toInsert;
336         asStruct!(table)[asPkStruct!(table)] toUpdate;
337         int[asPkStruct!(table)] toDelete;
338 
339         // ... record inserted client side, already patched and inserted for this client.
340         //asStruct!(table)[string] notToInsert;
341     //}
342 }
343 
344 
345 struct ClientSideTable(ClientT)
346 {
347     private {
348         Strategy strategy = Strategy.easilySyncAll;
349         public SynOp!ClientT[] ops;
350     }
351 }
352 
353 private
354 {
355     enum Strategy { easilySyncAll }
356 
357     class SynOp(MarsClientT) {
358         abstract void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst);
359         abstract void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst);
360     }
361 
362     /// take all the rows in the server table and send them on the client table.
363     class ClientImportValues(MarsClientT) : SynOp!MarsClientT {
364 
365         override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
366         {
367             assert(db !is null);
368 
369             // ... if the table is empty, simply do nothing ...
370             if( sst.count(db) > 0 ){
371                 auto payload = sst.packRows(db);
372 
373                 auto req = ImportRecordsReq(); with(req){
374                     tableIndex = sst.index;
375                     statementIndex = indexStatementFor(sst.index, "insert");
376                     encodedRecords = payload;
377                 }
378                 marsClient.sendRequest(req);
379                 auto rep = marsClient.receiveReply!ImportRecordsRep();
380             }
381         }
382         override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
383         {
384             import mars.msg : ImportValuesRequest;
385             import std.conv : to;
386 
387             // ... if the table is empty, simply do nothing ...
388             if( sst.count > 0 ){
389                 auto payload = sst.packRows();
390 
391                 auto req = ImportRecordsReq();  with(req){
392                     tableIndex =sst.index;
393                     statementIndex = indexStatementFor(sst.index, "insert");
394                     encodedRecords = payload;
395                 }
396                 marsClient.sendRequest(req);
397                 auto rep = marsClient.receiveReply!ImportRecordsRep();
398             }
399         }
400     }
401 
402     class ClientInsertValues(MarsClientT) : SynOp!MarsClientT {
403         
404         override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
405         {
406             if( sst.countRowsToInsert > 0 ){
407                 auto payload = sst.packRowsToInsert();
408                 auto req = InsertRecordsReq(); with(req){
409                     tableIndex = sst.index;
410                     statementIndex = indexStatementFor(sst.index, "insert");
411                     encodedRecords = payload;
412                 }
413                 marsClient.sendRequest(req);
414                 auto rep = marsClient.receiveReply!InsertRecordsRep();
415             }
416         }
417         override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
418         {
419             if( sst.countRowsToInsert > 0 ){
420                 auto payload = sst.packRowsToInsert();
421                 auto req = InsertRecordsReq(); with(req){
422                     tableIndex = sst.index;
423                     statementIndex = indexStatementFor(sst.index, "insert");
424                     encodedRecords = payload;
425                 }
426                 marsClient.sendRequest(req);
427                 auto rep = marsClient.receiveReply!InsertRecordsRep();
428             }
429         }
430     }
431     
432     class ClientDeleteValues(MarsClientT) : SynOp!MarsClientT {
433         
434         override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
435         {
436             if( sst.countRowsToDelete > 0 ){
437                 auto payload = sst.packRowsToDelete();
438                 auto req = DeleteRecordsReq(); with(req){
439                     tableIndex = sst.index;
440                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
441                     encodedRecords = payload;
442                 }
443                 marsClient.sendRequest(req);
444                 auto rep = marsClient.receiveReply!DeleteRecordsRep();
445             }
446         }
447         override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst){
448             if( sst.countRowsToDelete > 0 ){
449                 auto payload = sst.packRowsToDelete();
450                 auto req = DeleteRecordsReq(); with(req){
451                     tableIndex = sst.index;
452                     statementIndex = indexStatementFor(sst.index, "delete").to!int;
453                     encodedRecords = payload;
454                 }
455                 marsClient.sendRequest(req);
456                 auto rep = marsClient.receiveReply!DeleteRecordsRep();
457             }
458         }
459     }
460 
461     class ClientUpdateValues(MarsClientT) : SynOp!MarsClientT {
462 
463         override void execute(MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst)
464         {
465             import mars.msg : UpdateValuesRequest;
466             import std.conv :to;
467 
468             if( sst.countRowsToUpdate > 0 ){
469                 auto payload = sst.packRowsToUpdate();
470                 auto req = UpdateValuesRequest();
471                 req.statementIndex = indexStatementFor(sst.index, "update").to!int;
472                 req.bytes = payload;
473                 marsClient.sendRequest(req);
474             }
475         }
476         override void execute(Database db, MarsClientT marsClient, ClientSideTable!(MarsClientT)* cst, BaseServerSideTable!MarsClientT sst){}
477     }
478 
479     class ServerUpdateValues(MarsClientT) : SynOp!MarsClientT {
480         override void execute(Database db, MarsClientT marsClient, ClientSideTable* cst, BaseServerSideTable!MarsClientT sst){
481             
482         }
483     }
484 }
485 
486 version(unittest)
487 {
488     struct MarsClientMock { void sendRequest(R)(R r){} }
489 }
490 unittest
491 {
492     /+
493     import std.range : zip;
494 
495     
496     auto t1 = immutable(Table)("t1", [Col("c1", Type.integer, false), Col("c2", Type.text, false)], [0], []);
497     auto sst = new ServerSideTable!(MarsClientMock, t1);
498     zip([1, 2, 3], ["a", "b", "c"]).each!( f => sst.loadFixture(sst.ColumnsStruct(f.expand)) );
499     
500     auto cst = sst.createClientSideTable();
501     // ... la strategia più semplice è syncronizzare subito TUTTO il contenuto nella client side ...
502     assert( cst.strategy == Strategy.easilySyncAll );
503     // ... e a questo punto, come minimo deve partire un comando di import di tutti i dati....
504     assert( cast(ClientImportValues!MarsClientMock)(sst.ops[$-1]) !is null );
505     // ... che eseguito si occupa di gestire il socket, e aggiornare client e server side instances.
506     auto op = sst.ops[$-1];
507     op.execute(MarsClientMock(), cst, sst);
508 
509     // ...posso aggiornare uno dei valori con update, in questo caso la primary key è la colonna c1
510     sst.updateRow(sst.KeysStruct(2), sst.ColumnsStruct(2, "z"));
511     assert( sst.fixtures[sst.KeysStruct(2)] == sst.ColumnsStruct(2, "z") );
512     +/
513 }
514 /+
515 unittest
516 {
517     version(starwars){
518         import mars.starwars;
519         enum schema = starwarsSchema();
520 
521         auto people = new ServerSideTable!(MarsClientMock, schema.tables[0]);
522         auto scores = new ServerSideTable!(MarsClientMock, schema.tables[3]);
523         auto databaseService = DatabaseService("127.0.0.1", 5432, "starwars");
524         AuthoriseError err;
525         auto db = databaseService.connect("jedi", "force", err);
526         db.executeUnsafe("begin transaction");
527         
528         auto rows = people.selectRows(db);
529         assert( rows[0] == luke, rows[0].to!string );
530 
531         auto paolo = Person("Paolo", "male", [0x00, 0x01, 0x02, 0x03, 0x04], 1.80);
532         InsertError ierr;
533         auto inserted = people.insertRecord(db, paolo, ierr);
534         assert(inserted == paolo);
535         
536 
537         //import std.stdio;
538         //foreach(row; rows) writeln("---->>>>>", row);
539         //assert(false);
540     }
541 }
542 +/
543