1 2 module mars.client; 3 4 import std.datetime, 5 std.format, 6 std.variant, 7 std.experimental.logger; 8 9 import vibe.core.core; 10 import vibe.core.log; 11 import vibe.data.json; 12 import vibe.http.websockets; 13 14 import mars.msg; 15 import mars.server; 16 import mars.websocket; 17 import mars.protomars : MarsProxyStoC; // XXX per instanziare la variabile per il socket... dovrei fare un interfaccia? 18 // nel momento in cui instanzio il client, non ho ancora il MarsProxy, che invece 19 // instanzio nel protoMars. Dovrei instanziare il client nel protoMars? Maybe yes 20 // visto che è li che, dopo aver stabilito che è un client mars, instanzio il socket. 21 22 import mars.sync; 23 import mars.pgsql; 24 25 struct MarsClient 26 { 27 void connected() in { 28 assert( connectionEvents.length ==0 || connectionEvents[$-1].type == ConnessionEvent.disconnected ); 29 } body { 30 connectionEvents ~= ConnessionEvent(ConnessionEvent.connected, Clock.currTime); 31 } 32 void disconnected() in { 33 assert( connectionEvents[$-1].type == ConnessionEvent.connected, "clientId:%s, events:%s".format(id, connectionEvents) ); 34 } body { 35 connectionEvents ~= ConnessionEvent(ConnessionEvent.disconnected, Clock.currTime); 36 } 37 38 bool isConnected(){ return connectionEvents.length >0 && connectionEvents[$-1].type == ConnessionEvent.connected; } 39 40 SysTime[] reconnections() { 41 import std.algorithm : filter, map; 42 import std.array : array; 43 44 return connectionEvents 45 .filter!( (e) => e.type == ConnessionEvent.connected ) 46 .map!"a.when" 47 .array; 48 } 49 50 private { 51 struct ConnessionEvent { 52 enum { connected, disconnected } 53 int type; 54 SysTime when; 55 } 56 ConnessionEvent[] connectionEvents; 57 } 58 59 this(string id, const DatabaseService databaseService){ 60 this.id_ = id; 61 this.databaseService = databaseService; 62 } 63 64 /** 65 * Push a forward-only message to the client, a reply is not expected. */ 66 void sendBroadcast(M)(M msg) in { assert(isConnected); } body 67 { 68 socket.sendRequest(0, msg); // ... this is really a proxy to the websocket 69 } 70 71 /** 72 * Push a new message, from the server to the client. Used by the server to inform clients about events. */ 73 void sendRequest(M)(M msg) in { assert(isConnected); } body 74 { 75 bool sent = socket.sendRequest(nextId++, msg); 76 if( ! sent ){ 77 disconnected(); 78 } 79 } 80 private int nextId = 1; 81 82 auto receiveReply(M)() in { assert(isConnected); } body 83 { 84 auto msg = socket.receiveMsg!M(); 85 if( msg.status == msg.channelDropped ) disconnected(); 86 return msg; 87 } 88 89 /** 90 * The Helo protocol will wire the active socket here, and will set this to null when disconnecting. */ 91 void wireSocket(MarsProxyStoC!WebSocket socket, Task task) 92 { 93 this.socket = socket; 94 this.stocTask = task; 95 } 96 97 /** 98 * Returns true if the 'server to client' socket was opened and wired to us. */ 99 bool socketWired() { return this.socket != this.socket.init; } 100 101 /** 102 * Called by the authentication protocol. 103 * 104 * Returns: false if PostgreSQL is offline or user in not authorised, or true. */ 105 AuthoriseError authoriseUser(string username, string pgpassword) in { 106 assert(username && pgpassword); 107 } body { 108 this.username = username; 109 110 AuthoriseError err; 111 if( databaseService.host == "" ){ 112 logWarn("S --- C | the database host is not specified, we are operating in offline mode"); 113 err = AuthoriseError.authorised; 114 } 115 else { 116 db = databaseService.connect(username, pgpassword, err ); 117 if(err != AuthoriseError.authorised) this.username = ""; 118 } 119 return err; 120 } 121 void discardAuthorisation() { 122 logWarn("S --- C | discarding previous authorisation"); 123 this.username = ""; 124 } 125 126 bool authorised() { return this.username != ""; } 127 string id() { return id_; } 128 129 string callServerMethod(string method, Json parameters){ 130 if( serverSideMethods !is null ){ 131 return serverSideMethods(this, method, parameters); 132 } 133 assert(false); // catch on server side; 134 } 135 136 immutable(ubyte)[][2] vueInsertRecord(int statementIndex, immutable(ubyte)[] record, ref InsertError err){ 137 immutable(ubyte)[][2] inserted = marsServer.tables[statementIndex].insertRecord(db, record, err, username, id); 138 return inserted; 139 } 140 141 immutable(ubyte)[] vueDeleteRecord(int tableIndex, immutable(ubyte)[] record, ref DeleteError err){ 142 immutable(ubyte)[] deleted = marsServer.tables[tableIndex].deleteRecord(db, record, err, username, id); 143 return deleted; 144 } 145 146 void vueUpdateRecord(ulong tableIndex, immutable(ubyte)[] keys, immutable(ubyte)[] record, ref RequestState state){ 147 marsServer.tables[tableIndex].updateRecord(db, keys, record, state, id); 148 } 149 150 auto vueSubscribe(string select, Variant[string] parameters, ref RequestState state){ 151 import std.typecons : WhiteHole; 152 import mars.defs : Table; 153 // ... sanity check: the client has requested a subscription, but has not completed the login.. (for bugs, for example) 154 if( ! authorised ){ 155 logWarn("mars - rejecting a non authorised subscribe from client %s, client bug?",id); 156 state = RequestState.rejectedAsNotAuthorised; 157 return Json.emptyObject; 158 } 159 auto table = new WhiteHole!(BaseServerSideTable!MarsClient)(Table()); 160 auto json = table.selectAsJson(db, select, parameters); 161 return json; 162 } 163 164 private { 165 string id_; 166 167 string username = ""; 168 //string password; 169 string seed; 170 171 MarsProxyStoC!WebSocket socket; 172 public Task stocTask; 173 174 public typeof(MarsServer.serverSideMethods) serverSideMethods; 175 DatabaseService databaseService; 176 public Database db; 177 } 178 }