1 module mars.sync2; 2 3 import std.algorithm.comparison, 4 std.algorithm.iteration, 5 std.datetime, 6 std.experimental.logger; 7 8 import mars.defs; 9 10 alias Bytes = immutable(ubyte)[]; 11 12 enum Operation { Ins, Committed, Upd } 13 struct DeltaOperation { 14 Operation operation; 15 Bytes key; 16 Bytes record; 17 string by; 18 SysTime when; 19 ulong revision; 20 } 21 alias Delta = DeltaOperation[]; 22 23 struct Row { 24 enum { 25 updated, inserted, optUpdated, optInserted 26 } 27 Bytes keys; 28 Bytes record; 29 30 int state; 31 SysTime when; 32 string by; 33 34 ulong revision; 35 } 36 37 unittest { 38 39 enum k1 = [0]; enum r1 = [0, 1, 2, 3]; 40 enum k2 = [4]; enum r2 = [4, 5, 6, 7]; 41 enum r3 = [8, 9, 0, 1]; 42 43 auto t0 = SysTime(DateTime(2017, 01, 01, 0, 0)); 44 auto t1 = SysTime(DateTime(2017, 01, 01, 1, 0)); 45 auto t2 = SysTime(DateTime(2017, 01, 01, 2, 0)); 46 47 auto db = new MockDatabase(); 48 auto sst = ServerSideTable(db); 49 auto bob = ClientSideTable("Bob"); 50 auto alice = ClientSideTable("Alice"); 51 52 sst.insertRow(r1, t0); 53 assert(bob.count == 0 && alice.count == 0); 54 assert(bob.revision == 0 && alice.revision == 0 && sst.revision == 1); 55 56 auto delta = sst.syncDeltaFor(bob); 57 bob.applyDelta(delta); 58 assert(bob.revision ==1); 59 60 auto row = bob.row(k1); 61 assert(row.record == r1 && row.by == "Auto" && row.when == t0 && row.state == Row.inserted); 62 63 // client, arrivano una serie di operazioni da fare. Devo tornare una sequenza di cose da fare lato client. 64 bob.updateRow(k1, r3, t1); 65 bob.insertRow(k2, r2, t1); // XXX per togliere k2, devo dare a bob un db, in modo che sappia estrarre k2 da r2? naaaa..... o si? e se 66 // facessi un interfaccia minore per fare questi lavori sui record? 67 // valuta una terzo oggetto, che si occupa del pack!() del record 68 assert(bob.row(k1).state == Row.optUpdated && bob.row(k2).state == Row.optInserted); 69 assert(bob.revision ==3); 70 71 delta = bob.commitOrRollback(sst); 72 // noi ci siamo aggiornati, il server ha fatto, non si torna indietro. 73 assert(sst.row(k1).record == r3 && sst.row(k2).record == r2 && sst.revision ==3); 74 assert(bob.row(k1).state == Row.updated && bob.row(k2).state == Row.inserted); 75 76 // ... aggiorniamo alice 77 delta = sst.syncDeltaFor(alice); 78 alice.applyDelta(delta); 79 assert(alice.row(k1).record == r3 && sst.row(k1).by == "Bob" && alice.revision ==3); 80 81 // ... update a row server side 82 sst.updateRow(k1, r1, t2); 83 delta = sst.syncDeltaFor(bob); 84 bob.applyDelta(delta); 85 assert(bob.row(k1).record == r1 && bob.row(k1).by == "Auto" && sst.revision ==4 && bob.revision ==4); 86 87 // ... purge records that we have already synced 88 sst.purgeRevisions([bob, alice]); 89 assert( (cast(immutable(ubyte)[])k2 in sst.rows) is null); 90 } 91 92 unittest 93 { 94 import mars.starwars; 95 auto schema = starwarsSchema(); 96 auto db = new MockDatabase2!(schema); 97 98 auto dbPeople = db.table!"people"; 99 dbPeople.insertRow("Luke", "male", [0xDE, 0xAD, 0xBE, 0xEF], 1.72); 100 dbPeople.insertRow("Leila", "female", [0xCA, 0xFE, 0xBA, 0xBE], 1.70); 101 102 auto sst = ServerSideTable(db); 103 auto bob = ClientSideTable("Bob"); 104 auto alice = ClientSideTable("Alice"); 105 } 106 107 @safe: 108 109 interface Database { 110 Bytes keysOf(Bytes) const; 111 } 112 class MockDatabase : Database { 113 Bytes keysOf(Bytes record) const { return record[0 .. 1]; } 114 } 115 116 class MockDatabase2(immutable(Schema) schema) : Database 117 { 118 enum Schema = schema; 119 120 auto table(string name)(){ 121 return DatabaseTable!(typeof(this), name)(this); 122 } 123 124 Bytes keysOf(Bytes record) const { return []; } 125 } 126 127 struct DatabaseTable(D, string N) { 128 alias Name = N; 129 enum Table = D.Schema.tableNamed(Name); 130 enum Columns = Table.columns; 131 alias RecordTypes = asD!Columns; 132 133 this(D db){ this.db = db; } 134 135 void insertRow(RecordTypes values){ 136 auto record = asStruct!(Table)(values); 137 138 } 139 140 D db; 141 } 142 143 144 struct ServerSideTable { 145 Database db; 146 147 void insertRow(Bytes record, SysTime when) { 148 auto keys = db.keysOf(record); 149 rows[keys] = Row(keys, record, Row.inserted, when, "Auto", ++revision); 150 } 151 152 void updateRow(Bytes keys, Bytes record, SysTime when) { 153 rows[keys] = Row(keys, record, Row.updated, when, "Auto", ++revision); 154 } 155 156 Delta syncDeltaFor(const ClientSideTable cst) { 157 Delta delta; 158 foreach(keys, srow; rows){ 159 auto crow = keys in cst.rows; 160 if( crow is null ){ 161 delta ~= DeltaOperation( Operation.Ins, keys, srow.record, srow.by, srow.when, srow.revision ); 162 } 163 else { 164 if( crow.record != srow.record ){ 165 delta ~= DeltaOperation( Operation.Upd, keys, srow.record, srow.by, srow.when, srow.revision ); 166 } 167 } 168 } 169 return delta; 170 } 171 172 void purgeRevisions(ClientSideTable[] csts) @trusted { // byKeyValue is @system 173 ulong rev = revision; 174 foreach(cst; csts) rev = min(rev, revision); 175 Bytes[] toPurge; 176 foreach(kv; rows.byKeyValue){ 177 if( kv.value.revision < rev ) toPurge ~= kv.key; 178 } 179 foreach(k; toPurge) rows.remove(k); 180 foreach(cst; csts){ 181 toPurge = []; 182 foreach(kv; cst.rows.byKeyValue){ 183 if( kv.value.revision < rev ) toPurge ~= kv.key; 184 } 185 foreach(k; toPurge) cst.rows.remove(k); 186 } 187 } 188 189 190 Row row(Bytes keys) const { 191 assert( (keys in rows) !is null); 192 return rows[keys]; 193 } 194 195 Row[Bytes] rows; 196 ulong revision; 197 } 198 199 struct ClientSideTable { 200 this(string by) { 201 this.by = by; 202 rows[ [0] ] = Row.init; rows.remove([0]); 203 } 204 long count() const { return 0; } 205 void applyDelta(const Delta delta){ 206 foreach(op; delta){ 207 if(op.operation == Operation.Ins){ 208 assert((op.key in rows) is null); 209 rows[op.key] = Row(op.key, op.record, Row.inserted, op.when, op.by); 210 } 211 else { 212 auto row = op.key in rows; 213 assert(row !is null); 214 *row = Row(op.key, op.record, Row.updated, op.when, op.by); 215 } 216 revision = max(revision, op.revision); 217 } 218 } 219 Row row(Bytes keys) const { return rows[keys]; } 220 221 void updateRow(Bytes keys, Bytes record, SysTime when) { 222 auto row = keys in rows; 223 assert(row !is null); 224 *row = Row(keys, record, Row.optUpdated, when, by, ++revision); 225 } 226 227 void insertRow(Bytes keys, Bytes record, SysTime when) { 228 assert( (keys in rows) is null ); 229 rows[keys] = Row(keys, record, Row.optInserted, when, by, ++revision); 230 } 231 232 Delta commitOrRollback(ref ServerSideTable sst) { 233 foreach(keys, ref crow; rows){ 234 if(crow.state == Row.optUpdated){ 235 auto srow = keys in sst.rows; assert(srow !is null); 236 *srow = Row(crow.keys, crow.record, Row.updated, crow.when, crow.by, crow.revision); 237 crow.state = Row.updated; 238 } 239 else if(crow.state == Row.optInserted){ 240 assert( (keys in sst.rows) is null ); 241 sst.rows[keys] = Row(keys, crow.record, Row.inserted, crow.when, crow.by, crow.revision); 242 crow.state = Row.inserted; 243 } 244 sst.revision = max(sst.revision, crow.revision); 245 } 246 return []; 247 } 248 249 string by; 250 Row[Bytes] rows; 251 ulong revision; 252 }