1 module mars.sync2; 2 3 import std.algorithm.comparison, 4 std.algorithm.iteration, 5 std.datetime, 6 std.experimental.logger; 7 8 import mars.defs; 9 10 alias Bytes = immutable(ubyte)[]; 11 12 enum Operation { Ins, Committed, Upd, ConfirmIns } 13 struct DeltaOperation { 14 Operation operation; 15 Bytes key; 16 Bytes record; 17 string by; 18 SysTime when; 19 ulong revision; 20 } 21 alias Delta = DeltaOperation[]; 22 23 struct Row { 24 enum { 25 updated, inserted, optUpdated, optInserted 26 } 27 Bytes keys; 28 Bytes record; 29 30 int state; 31 SysTime when; 32 string by; 33 34 ulong revision; 35 } 36 37 unittest { 38 39 enum k1 = [0]; enum r1 = [0, 1, 2, 3]; 40 enum k2 = [4]; enum r2 = [4, 5, 6, 7]; 41 enum r3 = [8, 9, 0, 1]; 42 43 auto t0 = SysTime(DateTime(2017, 01, 01, 0, 0)); 44 auto t1 = SysTime(DateTime(2017, 01, 01, 1, 0)); 45 auto t2 = SysTime(DateTime(2017, 01, 01, 2, 0)); 46 47 auto db = new MockSimpleDatabaseTable(); 48 auto sst = ServerSideTable(db); 49 auto bob = ClientSideTable(db, "Bob"); 50 auto alice = ClientSideTable(db, "Alice"); 51 52 sst.insertRow(r1, t0); 53 assert(bob.count == 0 && alice.count == 0); 54 assert(bob.revision == 0 && alice.revision == 0 && sst.revision == 1); 55 56 // ... il delta è qualcosa che noi inviamo al client, via websocket ...` 57 auto delta = sst.syncDeltaFor(bob); 58 // ... una volta che il client ci ha risposto, possiamo aggiornare la sua rappresentazione qua ... 59 bob.applyDelta(delta); 60 assert(bob.revision ==1); 61 62 auto row = bob.row(k1); 63 assert(row.record == r1 && row.by == "Auto" && row.when == t0 && row.state == Row.inserted); 64 65 // client, arrivano una serie di operazioni da fare. Devo tornare una sequenza di cose da fare lato client. 66 bob.updateRow(k1, r3, t1); 67 bob.insertRow( r2, t1); 68 assert(bob.row(k1).state == Row.optUpdated && bob.row(k2).state == Row.optInserted); 69 assert(bob.revision ==3); 70 71 // applichiamo al server quanto abbiamo ricevuto dal web, via websocket: il delta racchiude le operazioni da inviare al client per 72 // conferma o aggiornamento .... 73 delta = bob.commitOrRollback(sst); 74 // noi ci siamo aggiornati, il server ha fatto, non si torna indietro. 75 assert(sst.row(k1).record == r3 && sst.row(k2).record == r2 && sst.revision ==3); 76 // inviamo il delta a bob... quanto lo ha eseguito, aggiorniamoci 77 bob.applyDelta(delta); 78 assert(bob.row(k1).state == Row.updated && bob.row(k2).state == Row.inserted); 79 80 // ... aggiorniamo alice 81 delta = sst.syncDeltaFor(alice); 82 // ... alice ha eseguito il delta, prendiamone atto ... 83 alice.applyDelta(delta); 84 assert(alice.row(k1).record == r3 && sst.row(k1).by == "Bob" && alice.revision ==3); 85 86 // ... update a row server side 87 sst.updateRow(k1, r1, t2); 88 delta = sst.syncDeltaFor(bob); 89 bob.applyDelta(delta); 90 assert(bob.row(k1).record == r1 && bob.row(k1).by == "Auto" && sst.revision ==4 && bob.revision ==4); 91 92 // ... purge records that we have already synced 93 sst.purgeRevisions([bob, alice]); 94 assert( (cast(immutable(ubyte)[])k2 in sst.rows) is null); 95 } 96 97 unittest 98 { 99 import mars.starwars; 100 auto schema = starwarsSchema(); 101 auto db = new MockDatabase!(schema); 102 103 auto dbPeople = db.table!"people"; 104 dbPeople.insertRow("Luke", "male", [0xDE, 0xAD, 0xBE, 0xEF], 1.72); 105 dbPeople.insertRow("Leila", "female", [0xCA, 0xFE, 0xBA, 0xBE], 1.70); 106 107 auto sst = ServerSideTable(dbPeople); 108 auto bob = ClientSideTable(dbPeople, "Bob"); 109 auto alice = ClientSideTable(dbPeople, "Alice"); 110 } 111 112 @safe: 113 114 interface DatabaseTable { 115 Bytes keysOf(Bytes) const; 116 } 117 class MockSimpleDatabaseTable : DatabaseTable { 118 Bytes keysOf(Bytes record) const { return record[0 .. 1]; } 119 } 120 121 class MockDatabase(immutable(Schema) schema) 122 { 123 enum Schema = schema; 124 125 auto table(string name)(){ 126 return new MockDatabaseTable!(typeof(this), name)(this); 127 } 128 } 129 130 class MockDatabaseTable(D, string N) : DatabaseTable { 131 alias Name = N; 132 enum Table = D.Schema.tableNamed(Name); 133 enum Columns = Table.columns; 134 alias RecordTypes = asD!Columns; 135 136 this(D db){ this.db = db; } 137 138 void insertRow(RecordTypes values){ 139 auto record = asStruct!(Table)(values); 140 asPkStruct!Table keys; 141 assignCommonFields(keys, record); 142 assert((keys in fixtures) is null); 143 fixtures[keys] = record; 144 } 145 146 Bytes keysOf(Bytes record) const { return record[0 .. 1]; } 147 148 D db; 149 asStruct!Table[asPkStruct!Table] fixtures; 150 } 151 152 153 struct ServerSideTable { 154 DatabaseTable db; 155 156 void insertRow(Bytes record, SysTime when) { 157 auto keys = db.keysOf(record); 158 rows[keys] = Row(keys, record, Row.inserted, when, "Auto", ++revision); 159 } 160 161 void updateRow(Bytes keys, Bytes record, SysTime when) { 162 rows[keys] = Row(keys, record, Row.updated, when, "Auto", ++revision); 163 } 164 165 Delta syncDeltaFor(const ClientSideTable cst) { 166 Delta delta; 167 foreach(keys, srow; rows){ 168 auto crow = keys in cst.rows; 169 if( crow is null ){ 170 delta ~= DeltaOperation( Operation.Ins, keys, srow.record, srow.by, srow.when, srow.revision ); 171 } 172 else { 173 if( crow.record != srow.record ){ 174 delta ~= DeltaOperation( Operation.Upd, keys, srow.record, srow.by, srow.when, srow.revision ); 175 } 176 } 177 } 178 return delta; 179 } 180 181 void purgeRevisions(ClientSideTable[] csts) @trusted { // byKeyValue is @system 182 ulong rev = revision; 183 foreach(cst; csts) rev = min(rev, revision); 184 Bytes[] toPurge; 185 foreach(kv; rows.byKeyValue){ 186 if( kv.value.revision < rev ) toPurge ~= kv.key; 187 } 188 foreach(k; toPurge) rows.remove(k); 189 foreach(cst; csts){ 190 toPurge = []; 191 foreach(kv; cst.rows.byKeyValue){ 192 if( kv.value.revision < rev ) toPurge ~= kv.key; 193 } 194 foreach(k; toPurge) cst.rows.remove(k); 195 } 196 } 197 198 199 Row row(Bytes keys) const { 200 assert( (keys in rows) !is null); 201 return rows[keys]; 202 } 203 204 Row[Bytes] rows; 205 ulong revision; 206 } 207 208 struct ClientSideTable { 209 this(DatabaseTable db, string by) { 210 this.db = db; 211 this.by = by; 212 rows[ [0] ] = Row.init; rows.remove([0]); 213 } 214 long count() const { return 0; } 215 void applyDelta(const Delta delta){ 216 foreach(op; delta){ 217 if(op.operation == Operation.Ins){ 218 assert((op.key in rows) is null); 219 rows[op.key] = Row(op.key, op.record, Row.inserted, op.when, op.by); 220 } 221 else { 222 auto row = op.key in rows; 223 assert(row !is null); 224 if(op.operation == Operation.ConfirmIns) 225 *row = Row(op.key, op.record, Row.inserted, op.when, op.by); 226 else 227 *row = Row(op.key, op.record, Row.updated, op.when, op.by); 228 } 229 revision = max(revision, op.revision); 230 } 231 } 232 Row row(Bytes keys) const { return rows[keys]; } 233 234 void updateRow(Bytes keys, Bytes record, SysTime when) { 235 auto row = keys in rows; 236 assert(row !is null); 237 *row = Row(keys, record, Row.optUpdated, when, by, ++revision); 238 } 239 240 void insertRow(Bytes record, SysTime when) { 241 Bytes keys = this.db.keysOf(record); 242 assert( (keys in rows) is null ); 243 rows[keys] = Row(keys, record, Row.optInserted, when, by, ++revision); 244 } 245 246 Delta commitOrRollback(ref ServerSideTable sst) { 247 Delta delta; 248 foreach(keys, ref crow; rows){ 249 if(crow.state == Row.optUpdated){ 250 auto srow = keys in sst.rows; assert(srow !is null); 251 *srow = Row(crow.keys, crow.record, Row.updated, crow.when, crow.by, crow.revision); 252 delta ~= DeltaOperation(Operation.Upd, crow.keys, crow.record, crow.by, crow.when, crow.revision); 253 } 254 else if(crow.state == Row.optInserted){ 255 assert( (keys in sst.rows) is null ); 256 sst.rows[keys] = Row(keys, crow.record, Row.inserted, crow.when, crow.by, crow.revision); 257 delta ~= DeltaOperation(Operation.ConfirmIns, crow.keys, crow.record, crow.by, crow.when, crow.revision); 258 } 259 sst.revision = max(sst.revision, crow.revision); 260 } 261 return delta; 262 } 263 264 DatabaseTable db; 265 string by; 266 Row[Bytes] rows; 267 ulong revision; 268 }