1 module mars.sync2;
2 
3 import std.algorithm.comparison,
4        std.algorithm.iteration,
5        std.datetime,
6        std.experimental.logger;
7 
8 import mars.defs;
9 
10 alias Bytes = immutable(ubyte)[];
11 
12 enum Operation { Ins, Committed, Upd, ConfirmIns }
13 struct DeltaOperation {
14     Operation operation;
15     Bytes key;
16     Bytes record;
17     string by;
18     SysTime when;
19     ulong revision;
20 }
21 alias Delta = DeltaOperation[];
22 
23 struct Row {
24     enum {
25         updated, inserted, optUpdated, optInserted
26     }
27     Bytes keys;
28     Bytes record;
29 
30     int state;
31     SysTime when;
32     string by;
33 
34     ulong revision;
35 }
36 
37 unittest {
38 
39     enum k1 = [0]; enum r1 = [0, 1, 2, 3];
40     enum k2 = [4]; enum r2 = [4, 5, 6, 7];
41                    enum r3 = [8, 9, 0, 1];
42 
43     auto t0 = SysTime(DateTime(2017, 01, 01, 0, 0));
44     auto t1 = SysTime(DateTime(2017, 01, 01, 1, 0));
45     auto t2 = SysTime(DateTime(2017, 01, 01, 2, 0));
46 
47     auto db = new MockSimpleDatabaseTable();
48     auto sst = ServerSideTable(db);
49     auto bob = ClientSideTable(db, "Bob");
50     auto alice = ClientSideTable(db, "Alice");
51 
52     sst.insertRow(r1, t0);
53     assert(bob.count == 0 && alice.count == 0);
54     assert(bob.revision == 0 && alice.revision == 0 && sst.revision == 1);
55 
56     // ... il delta è qualcosa che noi inviamo al client, via websocket ...`
57     auto delta = sst.syncDeltaFor(bob);
58     // ... una volta che il client ci ha risposto, possiamo aggiornare la sua rappresentazione qua ...
59     bob.applyDelta(delta);
60     assert(bob.revision ==1);
61 
62     auto row = bob.row(k1);
63     assert(row.record == r1 && row.by == "Auto" && row.when == t0 && row.state == Row.inserted);
64 
65     // client, arrivano una serie di operazioni da fare. Devo tornare una sequenza di cose da fare lato client.
66     bob.updateRow(k1, r3, t1);
67     bob.insertRow(    r2, t1);                      
68     assert(bob.row(k1).state == Row.optUpdated && bob.row(k2).state == Row.optInserted);
69     assert(bob.revision ==3);
70 
71     // applichiamo al server quanto abbiamo ricevuto dal web, via websocket: il delta racchiude le operazioni da inviare al client per
72     // conferma o aggiornamento ....
73     delta = bob.commitOrRollback(sst);
74     // noi ci siamo aggiornati, il server ha fatto, non si torna indietro.
75     assert(sst.row(k1).record == r3 && sst.row(k2).record == r2 && sst.revision ==3);
76     // inviamo il delta a bob... quanto lo ha eseguito, aggiorniamoci 
77     bob.applyDelta(delta);
78     assert(bob.row(k1).state == Row.updated && bob.row(k2).state == Row.inserted);
79 
80     // ... aggiorniamo alice
81     delta = sst.syncDeltaFor(alice);
82     // ... alice ha eseguito il delta, prendiamone atto ...
83     alice.applyDelta(delta);
84     assert(alice.row(k1).record == r3 && sst.row(k1).by == "Bob" && alice.revision ==3);
85     
86     // ... update a row server side
87     sst.updateRow(k1, r1, t2);
88     delta = sst.syncDeltaFor(bob);
89     bob.applyDelta(delta);
90     assert(bob.row(k1).record == r1 && bob.row(k1).by == "Auto" && sst.revision ==4 && bob.revision ==4);
91 
92     // ... purge records that we have already synced
93     sst.purgeRevisions([bob, alice]);
94     assert( (cast(immutable(ubyte)[])k2 in sst.rows) is null);
95 }
96 
97 unittest
98 {
99     import mars.starwars;
100     auto schema = starwarsSchema();
101     auto db = new MockDatabase!(schema);
102 
103     auto dbPeople = db.table!"people";
104     dbPeople.insertRow("Luke", "male", [0xDE, 0xAD, 0xBE, 0xEF], 1.72);
105     dbPeople.insertRow("Leila", "female", [0xCA, 0xFE, 0xBA, 0xBE], 1.70);
106 
107     auto sst = ServerSideTable(dbPeople);
108     auto bob = ClientSideTable(dbPeople, "Bob");
109     auto alice = ClientSideTable(dbPeople, "Alice");
110 }
111 
112 @safe:
113 
114 interface DatabaseTable {
115     Bytes keysOf(Bytes) const;
116 }
117 class MockSimpleDatabaseTable : DatabaseTable {
118     Bytes keysOf(Bytes record) const { return record[0 .. 1]; }
119 }
120 
121 class MockDatabase(immutable(Schema) schema)
122 {
123     enum Schema = schema;
124     
125     auto table(string name)(){ 
126         return new MockDatabaseTable!(typeof(this), name)(this); 
127     }
128 }
129 
130 class MockDatabaseTable(D, string N) : DatabaseTable {
131     alias Name = N;
132     enum Table = D.Schema.tableNamed(Name);
133     enum Columns = Table.columns;
134     alias RecordTypes = asD!Columns;
135 
136     this(D db){ this.db = db; }
137 
138     void insertRow(RecordTypes values){
139         auto record = asStruct!(Table)(values);
140         asPkStruct!Table keys;
141         assignCommonFields(keys, record);
142         assert((keys in fixtures) is null);
143         fixtures[keys] = record;
144     }
145 
146     Bytes keysOf(Bytes record) const { return record[0 .. 1]; }
147 
148     D db;
149     asStruct!Table[asPkStruct!Table] fixtures;
150 }
151 
152 
153 struct ServerSideTable {
154     DatabaseTable db;
155 
156     void insertRow(Bytes record, SysTime when) {
157         auto keys = db.keysOf(record);
158         rows[keys] = Row(keys, record, Row.inserted, when, "Auto", ++revision);
159     }
160 
161     void updateRow(Bytes keys, Bytes record, SysTime when) {
162         rows[keys] = Row(keys, record, Row.updated, when, "Auto", ++revision);
163     }
164 
165     Delta syncDeltaFor(const ClientSideTable cst) {
166         Delta delta;
167         foreach(keys, srow; rows){
168             auto crow = keys in cst.rows;
169             if( crow is null ){
170                 delta ~= DeltaOperation( Operation.Ins, keys, srow.record, srow.by, srow.when, srow.revision );
171             }
172             else {
173                 if( crow.record != srow.record ){
174                     delta ~= DeltaOperation( Operation.Upd, keys, srow.record, srow.by, srow.when, srow.revision );
175                 }
176             }
177         } 
178         return delta;
179     }
180 
181     void purgeRevisions(ClientSideTable[] csts) @trusted { // byKeyValue is @system
182         ulong rev = revision;
183         foreach(cst; csts) rev = min(rev, revision);
184         Bytes[] toPurge;
185         foreach(kv; rows.byKeyValue){
186             if( kv.value.revision < rev ) toPurge ~= kv.key;
187         }
188         foreach(k; toPurge) rows.remove(k);
189         foreach(cst; csts){
190             toPurge = [];
191             foreach(kv; cst.rows.byKeyValue){
192                 if( kv.value.revision < rev ) toPurge ~= kv.key;
193             }
194             foreach(k; toPurge) cst.rows.remove(k);
195         }
196     }
197 
198 
199     Row row(Bytes keys) const {
200         assert( (keys in rows) !is null);
201         return rows[keys];
202     }
203 
204     Row[Bytes] rows;
205     ulong revision;
206 }
207 
208 struct ClientSideTable {
209     this(DatabaseTable db, string by) {
210         this.db = db;
211         this.by = by;
212         rows[ [0] ] = Row.init; rows.remove([0]);
213     }
214     long count() const { return 0; }
215     void applyDelta(const Delta delta){
216         foreach(op; delta){
217             if(op.operation == Operation.Ins){
218                 assert((op.key in rows) is null);
219                 rows[op.key] = Row(op.key, op.record, Row.inserted, op.when, op.by);
220             }
221             else {
222                 auto row = op.key in rows;
223                 assert(row !is null);
224                 if(op.operation == Operation.ConfirmIns)
225                     *row = Row(op.key, op.record, Row.inserted, op.when, op.by);
226                 else 
227                     *row = Row(op.key, op.record, Row.updated, op.when, op.by);
228             }
229             revision = max(revision, op.revision);
230         }
231     }
232     Row row(Bytes keys) const { return rows[keys]; }
233 
234     void updateRow(Bytes keys, Bytes record, SysTime when) {
235         auto row = keys in rows;
236         assert(row !is null);
237         *row = Row(keys, record, Row.optUpdated, when, by, ++revision);
238     }
239 
240     void insertRow(Bytes record, SysTime when) {
241         Bytes keys = this.db.keysOf(record);
242         assert( (keys in rows) is null );
243         rows[keys] = Row(keys, record, Row.optInserted, when, by, ++revision);
244     }
245 
246     Delta commitOrRollback(ref ServerSideTable sst) {
247         Delta delta;
248         foreach(keys, ref crow; rows){
249             if(crow.state == Row.optUpdated){
250                 auto srow = keys in sst.rows; assert(srow !is null);
251                 *srow = Row(crow.keys, crow.record, Row.updated, crow.when, crow.by, crow.revision);
252                 delta ~= DeltaOperation(Operation.Upd, crow.keys, crow.record, crow.by, crow.when, crow.revision);
253             }
254             else if(crow.state == Row.optInserted){
255                 assert( (keys in sst.rows) is null );
256                 sst.rows[keys] = Row(keys, crow.record, Row.inserted, crow.when, crow.by, crow.revision);
257                 delta ~= DeltaOperation(Operation.ConfirmIns, crow.keys, crow.record, crow.by, crow.when, crow.revision);
258             }
259             sst.revision = max(sst.revision, crow.revision);
260         }
261         return delta; 
262     }
263 
264     DatabaseTable db;
265     string by;
266     Row[Bytes] rows;
267     ulong revision;
268 }