1 module mars.sync2;
2 
3 import std.algorithm.comparison,
4        std.algorithm.iteration,
5        std.datetime,
6        std.experimental.logger;
7 
8 import mars.defs;
9 
10 alias Bytes = immutable(ubyte)[];
11 
12 enum Operation { Ins, Committed, Upd }
13 struct DeltaOperation {
14     Operation operation;
15     Bytes key;
16     Bytes record;
17     string by;
18     SysTime when;
19     ulong revision;
20 }
21 alias Delta = DeltaOperation[];
22 
23 struct Row {
24     enum {
25         updated, inserted, optUpdated, optInserted
26     }
27     Bytes keys;
28     Bytes record;
29 
30     int state;
31     SysTime when;
32     string by;
33 
34     ulong revision;
35 }
36 
37 unittest {
38 
39     enum k1 = [0]; enum r1 = [0, 1, 2, 3];
40     enum k2 = [4]; enum r2 = [4, 5, 6, 7];
41                    enum r3 = [8, 9, 0, 1];
42 
43     auto t0 = SysTime(DateTime(2017, 01, 01, 0, 0));
44     auto t1 = SysTime(DateTime(2017, 01, 01, 1, 0));
45     auto t2 = SysTime(DateTime(2017, 01, 01, 2, 0));
46 
47     auto db = new MockDatabase();
48     auto sst = ServerSideTable(db);
49     auto bob = ClientSideTable("Bob");
50     auto alice = ClientSideTable("Alice");
51 
52     sst.insertRow(r1, t0);
53     assert(bob.count == 0 && alice.count == 0);
54     assert(bob.revision == 0 && alice.revision == 0 && sst.revision == 1);
55 
56     auto delta = sst.syncDeltaFor(bob);
57     bob.applyDelta(delta);
58     assert(bob.revision ==1);
59 
60     auto row = bob.row(k1);
61     assert(row.record == r1 && row.by == "Auto" && row.when == t0 && row.state == Row.inserted);
62 
63     // client, arrivano una serie di operazioni da fare. Devo tornare una sequenza di cose da fare lato client.
64     bob.updateRow(k1, r3, t1);
65     bob.insertRow(k2, r2, t1); // XXX per togliere k2, devo dare a bob un db, in modo che sappia estrarre k2 da r2? naaaa..... o si? e se
66     //                            facessi un interfaccia minore per fare questi lavori sui record?
67     //                            valuta una terzo oggetto, che si occupa del pack!() del record
68     assert(bob.row(k1).state == Row.optUpdated && bob.row(k2).state == Row.optInserted);
69     assert(bob.revision ==3);
70 
71     delta = bob.commitOrRollback(sst);
72     // noi ci siamo aggiornati, il server ha fatto, non si torna indietro.
73     assert(sst.row(k1).record == r3 && sst.row(k2).record == r2 && sst.revision ==3);
74     assert(bob.row(k1).state == Row.updated && bob.row(k2).state == Row.inserted);
75 
76     // ... aggiorniamo alice
77     delta = sst.syncDeltaFor(alice);
78     alice.applyDelta(delta);
79     assert(alice.row(k1).record == r3 && sst.row(k1).by == "Bob" && alice.revision ==3);
80     
81     // ... update a row server side
82     sst.updateRow(k1, r1, t2);
83     delta = sst.syncDeltaFor(bob);
84     bob.applyDelta(delta);
85     assert(bob.row(k1).record == r1 && bob.row(k1).by == "Auto" && sst.revision ==4 && bob.revision ==4);
86 
87     // ... purge records that we have already synced
88     sst.purgeRevisions([bob, alice]);
89     assert( (cast(immutable(ubyte)[])k2 in sst.rows) is null);
90 }
91 
92 unittest
93 {
94     import mars.starwars;
95     auto schema = starwarsSchema();
96     auto db = new MockDatabase2!(schema);
97 
98     auto dbPeople = db.table!"people";
99     dbPeople.insertRow("Luke", "male", [0xDE, 0xAD, 0xBE, 0xEF], 1.72);
100     dbPeople.insertRow("Leila", "female", [0xCA, 0xFE, 0xBA, 0xBE], 1.70);
101 
102     auto sst = ServerSideTable(db);
103     auto bob = ClientSideTable("Bob");
104     auto alice = ClientSideTable("Alice");
105 }
106 
107 @safe:
108 
109 interface Database {
110     Bytes keysOf(Bytes) const;
111 }
112 class MockDatabase : Database {
113     Bytes keysOf(Bytes record) const { return record[0 .. 1]; }
114 }
115 
116 class MockDatabase2(immutable(Schema) schema) : Database
117 {
118     enum Schema = schema;
119     
120     auto table(string name)(){ 
121         return DatabaseTable!(typeof(this), name)(this); 
122     }
123 
124     Bytes keysOf(Bytes record) const { return []; }
125 }
126 
127 struct DatabaseTable(D, string N) {
128     alias Name = N;
129     enum Table = D.Schema.tableNamed(Name);
130     enum Columns = Table.columns;
131     alias RecordTypes = asD!Columns;
132 
133     this(D db){ this.db = db; }
134 
135     void insertRow(RecordTypes values){
136         auto record = asStruct!(Table)(values);
137 
138     }
139 
140     D db;
141 }
142 
143 
144 struct ServerSideTable {
145     Database db;
146 
147     void insertRow(Bytes record, SysTime when) {
148         auto keys = db.keysOf(record);
149         rows[keys] = Row(keys, record, Row.inserted, when, "Auto", ++revision);
150     }
151 
152     void updateRow(Bytes keys, Bytes record, SysTime when) {
153         rows[keys] = Row(keys, record, Row.updated, when, "Auto", ++revision);
154     }
155 
156     Delta syncDeltaFor(const ClientSideTable cst) {
157         Delta delta;
158         foreach(keys, srow; rows){
159             auto crow = keys in cst.rows;
160             if( crow is null ){
161                 delta ~= DeltaOperation( Operation.Ins, keys, srow.record, srow.by, srow.when, srow.revision );
162             }
163             else {
164                 if( crow.record != srow.record ){
165                     delta ~= DeltaOperation( Operation.Upd, keys, srow.record, srow.by, srow.when, srow.revision );
166                 }
167             }
168         } 
169         return delta;
170     }
171 
172     void purgeRevisions(ClientSideTable[] csts) @trusted { // byKeyValue is @system
173         ulong rev = revision;
174         foreach(cst; csts) rev = min(rev, revision);
175         Bytes[] toPurge;
176         foreach(kv; rows.byKeyValue){
177             if( kv.value.revision < rev ) toPurge ~= kv.key;
178         }
179         foreach(k; toPurge) rows.remove(k);
180         foreach(cst; csts){
181             toPurge = [];
182             foreach(kv; cst.rows.byKeyValue){
183                 if( kv.value.revision < rev ) toPurge ~= kv.key;
184             }
185             foreach(k; toPurge) cst.rows.remove(k);
186         }
187     }
188 
189 
190     Row row(Bytes keys) const {
191         assert( (keys in rows) !is null);
192         return rows[keys];
193     }
194 
195     Row[Bytes] rows;
196     ulong revision;
197 }
198 
199 struct ClientSideTable {
200     this(string by) {
201         this.by = by;
202         rows[ [0] ] = Row.init; rows.remove([0]);
203     }
204     long count() const { return 0; }
205     void applyDelta(const Delta delta){
206         foreach(op; delta){
207             if(op.operation == Operation.Ins){
208                 assert((op.key in rows) is null);
209                 rows[op.key] = Row(op.key, op.record, Row.inserted, op.when, op.by);
210             }
211             else {
212                 auto row = op.key in rows;
213                 assert(row !is null);
214                 *row = Row(op.key, op.record, Row.updated, op.when, op.by);
215             }
216             revision = max(revision, op.revision);
217         }
218     }
219     Row row(Bytes keys) const { return rows[keys]; }
220 
221     void updateRow(Bytes keys, Bytes record, SysTime when) {
222         auto row = keys in rows;
223         assert(row !is null);
224         *row = Row(keys, record, Row.optUpdated, when, by, ++revision);
225     }
226 
227     void insertRow(Bytes keys, Bytes record, SysTime when) {
228         assert( (keys in rows) is null );
229         rows[keys] = Row(keys, record, Row.optInserted, when, by, ++revision);
230     }
231 
232     Delta commitOrRollback(ref ServerSideTable sst) { 
233         foreach(keys, ref crow; rows){
234             if(crow.state == Row.optUpdated){
235                 auto srow = keys in sst.rows; assert(srow !is null);
236                 *srow = Row(crow.keys, crow.record, Row.updated, crow.when, crow.by, crow.revision);
237                 crow.state = Row.updated;
238             }
239             else if(crow.state == Row.optInserted){
240                 assert( (keys in sst.rows) is null );
241                 sst.rows[keys] = Row(keys, crow.record, Row.inserted, crow.when, crow.by, crow.revision);
242                 crow.state = Row.inserted;
243             }
244             sst.revision = max(sst.revision, crow.revision);
245         }
246         return []; 
247     }
248 
249     string by;
250     Row[Bytes] rows;
251     ulong revision;
252 }