1 2 module mars.client; 3 4 import std.datetime, 5 std.format, 6 std.variant, 7 std.experimental.logger; 8 9 import vibe.core.core; 10 import vibe.core.log; 11 import vibe.data.json; 12 import vibe.http.websockets; 13 14 import mars.msg; 15 import mars.server; 16 import mars.websocket; 17 import mars.protomars : MarsProxyStoC; // XXX per instanziare la variabile per il socket... dovrei fare un interfaccia? 18 // nel momento in cui instanzio il client, non ho ancora il MarsProxy, che invece 19 // instanzio nel protoMars. Dovrei instanziare il client nel protoMars? Maybe yes 20 // visto che è li che, dopo aver stabilito che è un client mars, instanzio il socket. 21 22 import mars.sync; 23 import mars.pgsql; 24 25 struct MarsClient 26 { 27 void connected() in { 28 assert( connectionEvents.length ==0 || connectionEvents[$-1].type == ConnessionEvent.disconnected ); 29 } body { 30 connectionEvents ~= ConnessionEvent(ConnessionEvent.connected, Clock.currTime); 31 } 32 void disconnected() in { 33 assert( connectionEvents[$-1].type == ConnessionEvent.connected, "clientId:%s, events:%s".format(id, connectionEvents) ); 34 } body { 35 connectionEvents ~= ConnessionEvent(ConnessionEvent.disconnected, Clock.currTime); 36 } 37 38 bool isConnected(){ return connectionEvents.length >0 && connectionEvents[$-1].type == ConnessionEvent.connected; } 39 40 SysTime[] reconnections() { 41 import std.algorithm : filter, map; 42 import std.array : array; 43 44 return connectionEvents 45 .filter!( (e) => e.type == ConnessionEvent.connected ) 46 .map!"a.when" 47 .array; 48 } 49 50 private { 51 struct ConnessionEvent { 52 enum { connected, disconnected } 53 int type; 54 SysTime when; 55 } 56 ConnessionEvent[] connectionEvents; 57 } 58 59 this(string id, const DatabaseService databaseService){ 60 this.id_ = id; 61 this.databaseService = databaseService; 62 } 63 64 /** 65 * Push a forward-only message to the client, a reply is not expected. */ 66 void sendBroadcast(M)(M msg) in { assert(isConnected); } body 67 { 68 socket.sendRequest(0, msg); // ... this is really a proxy to the websocket 69 } 70 71 /** 72 * Push a new message, from the server to the client. Used by the server to inform clients about events. */ 73 void sendRequest(M)(M msg) in { assert(isConnected); } body 74 { 75 bool sent = socket.sendRequest(nextId++, msg); 76 if( ! sent ){ 77 disconnected(); 78 } 79 } 80 private int nextId = 1; 81 82 auto receiveReply(M)() in { assert(isConnected); } body 83 { 84 auto msg = socket.receiveMsg!M(); 85 if( msg.status == msg.channelDropped ) disconnected(); 86 return msg; 87 } 88 89 /** 90 * The Helo protocol will wire the active socket here, and will set this to null when disconnecting. */ 91 void wireSocket(ref MarsProxyStoC!ResilientWebSocket socket, Task task) 92 { 93 this.socket = &socket; 94 this.stocTask = task; 95 } 96 97 /** 98 * Returns true if the 'server to client' socket was opened and wired to us. */ 99 bool socketWired() { return this.socket != this.socket.init; } 100 101 /** 102 * Called by the authentication protocol. 103 * 104 * Returns: false if PostgreSQL is offline or user in not authorised, or true. */ 105 AuthoriseError authoriseUser(string username, string pgpassword) in { 106 assert(username && pgpassword); 107 } body { 108 this.username = username; 109 110 AuthoriseError err; 111 if( databaseService.host == "" ){ 112 logWarn("S --- C | the database host is not specified, we are operating in offline mode"); 113 err = AuthoriseError.authorised; 114 } 115 else { 116 db = databaseService.connect(username, pgpassword, err ); 117 if(err != AuthoriseError.authorised) this.username = ""; 118 } 119 return err; 120 } 121 void discardAuthorisation() { 122 logWarn("S --- C | discarding previous authorisation"); 123 this.username = ""; 124 } 125 126 bool authorised() { return this.username != ""; } 127 string id() { return id_; } 128 129 bool pingWebClient(){ 130 return socket.sendRequest(0, PingReq()); 131 } 132 133 string callServerMethod(string method, Json parameters){ 134 if( serverSideMethods !is null ){ 135 return serverSideMethods(this, method, parameters); 136 } 137 assert(false); // catch on server side; 138 } 139 140 immutable(ubyte)[][2] vueInsertRecord(int statementIndex, immutable(ubyte)[] record, ref InsertError err){ 141 immutable(ubyte)[][2] inserted = marsServer.tables[statementIndex].insertRecord(db, record, err, username, id); 142 return inserted; 143 } 144 145 immutable(ubyte)[] vueDeleteRecord(int tableIndex, immutable(ubyte)[] record, ref DeleteError err){ 146 immutable(ubyte)[] deleted = marsServer.tables[tableIndex].deleteRecord(db, record, err, username, id); 147 return deleted; 148 } 149 150 void vueUpdateRecord(ulong tableIndex, immutable(ubyte)[] keys, immutable(ubyte)[] record, ref RequestState state){ 151 marsServer.tables[tableIndex].updateRecord(db, keys, record, state, id); 152 } 153 154 auto vueSubscribe(string select, Variant[string] parameters, ref RequestState state){ 155 import std.typecons : WhiteHole; 156 import mars.defs : Table; 157 // ... sanity check: the client has requested a subscription, but has not completed the login.. (for bugs, for example) 158 if( ! authorised ){ 159 logWarn("mars - rejecting a non authorised subscribe from client %s, client bug?",id); 160 state = RequestState.rejectedAsNotAuthorised; 161 return Json.emptyObject; 162 } 163 auto table = new WhiteHole!(BaseServerSideTable!MarsClient)(Table()); 164 auto json = table.selectAsJson(db, select, parameters, state); 165 return json; 166 } 167 168 private { 169 string id_; 170 171 string username = ""; 172 //string password; 173 string seed; 174 175 MarsProxyStoC!(ResilientWebSocket)* socket; 176 public Task stocTask; 177 178 public typeof(MarsServer.serverSideMethods) serverSideMethods; 179 DatabaseService databaseService; 180 public Database db; 181 } 182 }