1 2 module mars.client; 3 4 import std.datetime, 5 std.format, 6 std.variant, 7 std.experimental.logger; 8 9 import vibe.core.log; 10 import vibe.data.json; 11 import vibe.http.websockets; 12 13 import mars.msg; 14 import mars.server; 15 import mars.websocket; 16 import mars.protomars : MarsProxyStoC; // XXX per instanziare la variabile per il socket... dovrei fare un interfaccia? 17 // nel momento in cui instanzio il client, non ho ancora il MarsProxy, che invece 18 // instanzio nel protoMars. Dovrei instanziare il client nel protoMars? Maybe yes 19 // visto che è li che, dopo aver stabilito che è un client mars, instanzio il socket. 20 21 import mars.sync; 22 import mars.pgsql; 23 24 struct MarsClient 25 { 26 void connected() in { 27 assert( connectionEvents.length ==0 || connectionEvents[$-1].type == ConnessionEvent.disconnected ); 28 } body { 29 connectionEvents ~= ConnessionEvent(ConnessionEvent.connected, Clock.currTime); 30 } 31 void disconnected() in { 32 assert( connectionEvents[$-1].type == ConnessionEvent.connected, "clientId:%s, events:%s".format(id, connectionEvents) ); 33 } body { 34 connectionEvents ~= ConnessionEvent(ConnessionEvent.disconnected, Clock.currTime); 35 } 36 37 bool isConnected(){ return connectionEvents.length >0 && connectionEvents[$-1].type == ConnessionEvent.connected; } 38 39 SysTime[] reconnections() { 40 import std.algorithm : filter, map; 41 import std.array : array; 42 43 return connectionEvents 44 .filter!( (e) => e.type == ConnessionEvent.connected ) 45 .map!"a.when" 46 .array; 47 } 48 49 private { 50 struct ConnessionEvent { 51 enum { connected, disconnected } 52 int type; 53 SysTime when; 54 } 55 ConnessionEvent[] connectionEvents; 56 } 57 58 this(string id, const DatabaseService databaseService){ 59 this.id_ = id; 60 this.databaseService = databaseService; 61 } 62 63 /** 64 * Push a forward-only message to the client, a reply is not expected. */ 65 void sendBroadcast(M)(M msg) in { assert(isConnected); } body 66 { 67 socket.sendRequest(0, msg); // ... this is really a proxy to the websocket 68 } 69 70 /** 71 * Push a new message, from the server to the client. Used by the server to inform clients about events. */ 72 void sendRequest(M)(M msg) in { assert(isConnected); } body 73 { 74 bool sent = socket.sendRequest(nextId++, msg); 75 if( ! sent ){ 76 disconnected(); 77 } 78 } 79 private int nextId = 1; 80 81 auto receiveReply(M)() in { assert(isConnected); } body 82 { 83 auto msg = socket.receiveMsg!M(); 84 if( msg.status == msg.channelDropped ) disconnected(); 85 return msg; 86 } 87 88 /** 89 * The Helo protocol will wire the active socket here, and will set this to null when disconnecting. */ 90 void wireSocket(MarsProxyStoC!WebSocket socket) 91 { 92 this.socket = socket; 93 } 94 95 /** 96 * Returns true if the 'server to client' socket was opened and wired to us. */ 97 bool socketWired() { return this.socket != this.socket.init; } 98 99 /** 100 * Called by the authentication protocol. 101 * 102 * Returns: false if PostgreSQL is offline or user in not authorised, or true. */ 103 AuthoriseError authoriseUser(string username, string pgpassword) in { 104 assert(username && pgpassword); 105 } body { 106 this.username = username; 107 108 AuthoriseError err; 109 if( databaseService.host == "" ){ 110 logWarn("S --- C | the database host is not specified, we are operating in offline mode"); 111 err = AuthoriseError.authorised; 112 } 113 else { 114 db = databaseService.connect(username, pgpassword, err ); 115 if(err != AuthoriseError.authorised) this.username = ""; 116 } 117 return err; 118 } 119 void discardAuthorisation() { 120 logWarn("S --- C | discarding previous authorisation"); 121 this.username = ""; 122 } 123 124 bool authorised() { return this.username != ""; } 125 string id() { return id_; } 126 127 string callServerMethod(string method, Json parameters){ 128 if( serverSideMethods !is null ){ 129 return serverSideMethods(this, method, parameters); 130 } 131 assert(false); // catch on server side; 132 } 133 134 immutable(ubyte)[][2] vueInsertRecord(int statementIndex, immutable(ubyte)[] record, ref InsertError err){ 135 immutable(ubyte)[][2] inserted = marsServer.tables[statementIndex].insertRecord(db, record, err, username, id); 136 return inserted; 137 } 138 139 immutable(ubyte)[] vueDeleteRecord(int tableIndex, immutable(ubyte)[] record, ref DeleteError err){ 140 immutable(ubyte)[] deleted = marsServer.tables[tableIndex].deleteRecord(db, record, err, username, id); 141 return deleted; 142 } 143 144 void vueUpdateRecord(ulong tableIndex, immutable(ubyte)[] keys, immutable(ubyte)[] record, ref RequestState state){ 145 marsServer.tables[tableIndex].updateRecord(db, keys, record, state); 146 } 147 148 auto vueSubscribe(string select, Variant[string] parameters, ref RequestState state){ 149 import std.typecons : WhiteHole; 150 import mars.defs : Table; 151 // ... sanity check: the client has requested a subscription, but has not completed the login.. (for bugs, for example) 152 if( ! authorised ){ 153 logWarn("mars - rejecting a non authorised subscribe from client %s, client bug?",id); 154 state = RequestState.rejectedAsNotAuthorised; 155 return Json.emptyObject; 156 } 157 auto table = new WhiteHole!(BaseServerSideTable!MarsClient)(Table()); 158 auto json = table.selectAsJson(db, select, parameters); 159 return json; 160 } 161 162 private { 163 string id_; 164 165 string username = ""; 166 //string password; 167 string seed; 168 169 MarsProxyStoC!WebSocket socket; 170 171 public typeof(MarsServer.serverSideMethods) serverSideMethods; 172 DatabaseService databaseService; 173 public Database db; 174 } 175 }