1 
2 module mars.pgsql;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.format;
7 import std..string;
8 import std.range;
9 import std.typecons;
10 import std.variant;
11 
12 import mars.defs;
13 import mars.msg : AuthoriseError, InsertError, DeleteError, RequestState;
14 version(unittest) import mars.starwars;
15 
16 import ddb.postgres;
17 import ddb.db;
18 import vibe.core.log;
19 import vibe.data.json;
20 
21 string insertIntoReturningParameter(const(Table) table)
22 {
23     int i = 1;
24     return "insert into %s values (%s) returning *".format(
25         table.name,
26         table.columns.map!( (c) => c.type == Type.serial? "default" : "$" ~ (i++).to!string).join(", "));
27 }
28 unittest {
29     auto sql = Table("bar", [Col("foo", Type.text, false), Col("baz", Type.text, false)],[],[]).insertIntoReturningParameter();
30     assert( sql == "insert into bar values ($1, $2) returning *", sql );
31     auto sql2 = Table("bar", [Col("w_id", Type.serial), Col("w", Type.text)], [0], []).insertIntoReturningParameter();
32     assert( sql2 == "insert into bar values (default, $1) returning *", sql2);
33 }
34 
35 string deleteFromParameter(const(Table) table)
36 {
37     return "delete from %s where %s".format(
38             table.name, 
39             zip(iota(0, table.pkCols.length), table.pkCols)
40                 .map!( (t) => t[1].name ~ " = $" ~ (t[0]+1).to!string)
41                 .join(" and "));
42 }
43 unittest {
44     auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0, 1], []).deleteFromParameter();
45     assert( sql == "delete from bar where foo = $1 and bar = $2", sql);
46 }
47 
48 string updateFromParameters(const(Table) table)
49 {
50     immutable(Col)[] whereCols = table.pkCols.length >0? table.pkCols : table.columns;
51     int dollarIndex =1;
52     return "update %s set %s where %s".format(
53         table.name,
54         table.columns.map!( (t) => t.name ~ " = $" ~ (dollarIndex++).to!string).join(", "),
55         whereCols.map!( (t) => t.name ~ " = $" ~ (dollarIndex++).to!string).join(" and "));
56 }
57 unittest {
58     auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0], []).updateFromParameters();
59     assert( sql == "update bar set foo = $1, bar = $2, baz = $3 where foo = $4", sql );
60 }
61 
62 string selectFromParameters(const(Table) table)
63 {
64     return "select * from %s where %s".format(
65             table.name, 
66             zip(iota(0, table.pkCols.length), table.pkCols)
67                 .map!( (t) => t[1].name ~ " = $" ~ (t[0]+1).to!string)
68                 .join(" and "));
69 }
70 unittest {
71     auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0, 1], []).deleteFromParameter();
72     assert( sql == "select * from bar where foo = $1 and bar = $2", sql);
73 }
74 
75 
76 struct DatabaseService {
77     string host;
78     ushort port;
79     string database;
80  
81     /**
82      * Returns: an instance of `Database` of null if can't connect or authenticate. Errors details in 'err' */
83     Database connect(string user, string password, ref AuthoriseError err) in {
84         assert(user && password);
85     } body {
86         Database db;
87         try {
88             db = new Database(host, database, user, password);
89             err = AuthoriseError.authorised;
90         }
91         catch(ServerErrorException e){
92             switch(e.code){
93                 case "28000": // role "user" does not exist
94                     logInfo("PostgreSQL role does not exist");
95                     err = AuthoriseError.wrongUsernameOrPassword;
96                     break;
97                 case "28P01": // password authentication failed for user "user"
98                     logInfo("PostgreSQL password authentication failed for user");
99                     err = AuthoriseError.wrongUsernameOrPassword;
100                     break;
101                 default:
102                     logWarn("S --- C | Unhandled PostgreSQL server error during connection!");
103                     logInfo("S --- C | PostgreSQL server error: %s", e.toString);
104                     err = AuthoriseError.unknownError;
105             }
106         }
107         catch(Exception e){
108             logWarn("S --- C | exception connecting to the PostgreSQL!");
109             logWarn("S --- C | %s", e);
110             err = AuthoriseError.unknownError;
111         }
112         assert( err != AuthoriseError.assertCheck);
113         return db;
114     }
115 }
116 
117 class Database
118 {
119     private this(string host, string database, string user, string password){
120         this.username_ = user;
121         if( db is null ){
122             db = new PostgresDB(["host": host, "database": database, "user": user, "password": password]);
123         }
124         conn = db.lockConnection();
125     }
126 
127     void execute(const Select select)
128     {
129         string s = `select %s from %s`.format(select.cols[0].name, select.tables[0].name);
130         auto q = conn.executeQuery(s); 
131     }
132 
133     void executeUnsafe(string sql){
134         auto q = conn.executeQuery(sql);
135         foreach(v; q){
136             import std.stdio; writeln("-->", v);
137         }
138     }
139     T executeScalarUnsafe(T)(string sql){
140         return conn.executeScalar!T(sql);
141     }
142 
143     // usato da sync per la sottoscrizione di query complesse
144     auto executeQueryUnsafe(string sql){
145         return conn.executeQuery(sql);
146     }
147 
148     // usato da sync per la sottoscrizione di query complesse, con parametri
149     auto executeQueryUnsafe(string sql, Variant[string] parameters){
150         // ... sort param names, transform names into a sequence of $1, $2
151         auto pgargs = xxx(sql, parameters);
152         // ... prepare the statement
153         auto cmd = new PGCommand(conn, pgargs[0]);
154         foreach(j, param; pgargs[1]){
155             // ... try to guess the PGType from the Variant typeinfo ...
156             auto pgType = toPGType(param.type);
157             switch(pgType) with (PGType){
158                 case TEXT:
159                     cmd.parameters.add((j+1).to!short, pgType).value = param.get!string;
160                     break;
161                 case INT2:
162                     cmd.parameters.add((j+1).to!short, pgType).value = param.get!short;
163                     break;
164                 case INT4:
165                     cmd.parameters.add((j+1).to!short, pgType).value = param.get!int;
166                     break;
167                 default:
168                     assert(false, pgType.to!string);
169             }
170         }
171         return cmd.executeQuery();
172     }
173     version(unittest_starwars){ unittest {
174         auto db = new Database("127.0.0.1", "starwars", "jedi", "force");
175         auto recordSet = db.executeQueryUnsafe("select * from planets where name = $name", ["name": Variant("Tatooine")]);
176         scope(exit) recordSet.close();
177         assert(recordSet.front[1].get!long == 120_000);
178     }}
179 
180     auto executeQueryUnsafe(Row)(string sql){
181         return conn.executeQuery!Row(sql);
182     }
183 
184     auto executeInsert(immutable(Table) table, Row, )(Row record, ref InsertError err){
185         enum sql = insertIntoReturningParameter(table);
186         auto cmd = new PGCommand(conn, sql);
187         addParameters!(table, Row, true)(cmd, record); // skip serial parameters
188         Row result;
189         try {
190             auto querySet = cmd.executeQuery!Row();
191             scope(exit) querySet.close();
192             result = querySet.front;
193             err = InsertError.inserted;
194         }
195         catch(ServerErrorException e){
196             switch(e.code){
197                 case "23505": //  duplicate key value violates unique constraint "<constraintname>" (for example in primary keys)
198                     err = InsertError.duplicateKeyViolations;
199                     break;
200                 default:
201                     logWarn("S --- C | Unhandled PostgreSQL server error during insertion!");
202                     logInfo("S --- C | PostgreSQL server error: %s", e.toString);
203                     err = InsertError.unknownError;
204             }
205         }
206         if( table.journal && err == InsertError.inserted ){
207             logWarn("S --- C | Journaling");
208             try {
209                 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post) values ($1, 'record_inserted', $2, $3)`);
210                 cmd.parameters.add(1, PGType.TEXT).value = this.username;
211                 cmd.parameters.add(2, PGType.TEXT).value = table.name;
212                 cmd.parameters.add(3, PGType.JSON).value = result.serializeToJson();
213                 cmd.executeNonQuery();
214             }
215             catch(Exception e){
216                 logWarn("Catch Exception during journaling:%s", e.toString());
217             }
218             logWarn("S --- C | Journaling done");
219         }
220         return result;
221     }
222 
223     void executeDelete(immutable(Table) table, Pk)(Pk pk, ref DeleteError err){
224         asStruct!table pre;
225         if( table.journal ){
226             pre = selectFromPk!(table, Pk)(pk);
227         }
228 
229         enum sql = deleteFromParameter(table);
230         auto cmd = new PGCommand(conn, sql);
231 
232         addParameters!table(cmd, pk);
233         try {
234             cmd.executeNonQuery();
235             err = DeleteError.deleted;
236         }
237         catch(ServerErrorException e){
238             switch(e.code){
239                 case "23503": // update or delete on table "<table>" violates foreign key constraint "<constraint>" on table "<othertable>"
240                     err = DeleteError.rejectedAsForeignKeyViolation;
241                     break;
242                 default:
243                     logWarn("S --- C | Unhandled PostgreSQL server error during deletion!");
244                     logInfo("S --- C | PostgreSQL server error: %s", e.toString);
245                     err = DeleteError.unknownError;
246             }
247         }
248         if( table.journal && err == DeleteError.deleted ){
249             logWarn("S --- C | Journaling");
250             try {
251                 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post) values ($1, 'record_deleted', $2, $3)`);
252                 cmd.parameters.add(1, PGType.TEXT).value = this.username;
253                 cmd.parameters.add(2, PGType.TEXT).value = table.name;
254                 cmd.parameters.add(3, PGType.JSON).value = pre.serializeToJson();
255                 cmd.executeNonQuery();
256             }
257             catch(Exception e){
258                 logWarn("Catch Exception during journaling:%s", e.toString());
259             }
260             logWarn("S --- C | Journaling done");
261         }
262     }
263 
264     void executeUpdate(immutable(Table) table, Pk, Row)(Pk pk, Row record, ref RequestState state)
265     {
266         
267         asStruct!table pre;
268         if( table.journal ){
269             pre = selectFromPk!(table, Pk)(pk);
270         }
271 
272         enum sql = updateFromParameters(table);
273         auto cmd = new PGCommand(conn, sql);
274         addParameters!(table)(cmd, record);
275         short i = record.tupleof.length +1;
276         addParameters!table(cmd, pk, i);
277         try {
278             cmd.executeNonQuery();
279         }
280         catch(ServerErrorException e){
281             switch(e.code){
282                 case "23503":
283                     logInfo("S --- C | PostgreSQL can't update the primary key as still referenced (maybe add an update cascade?).");
284                     state = RequestState.rejectedAsForeignKeyViolation;
285                     break;
286                 default:
287                     logWarn("S --- C | Unhandled PostgreSQL server error during update!");
288                     logInfo("S --- C | PostgreSQL server error: %s", e.toString);
289                     state = RequestState.rejectedAsPGSqlError;
290             }
291         }
292         if( table.journal && state == RequestState.executed ){
293             logWarn("S --- C | Journaling");
294             try {
295                 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post, pre) values ($1, 'record_updated', $2, $3, $4)`);
296                 cmd.parameters.add(1, PGType.TEXT).value = this.username;
297                 cmd.parameters.add(2, PGType.TEXT).value = table.name;
298                 cmd.parameters.add(3, PGType.JSON).value = record.serializeToJson();
299                 cmd.parameters.add(4, PGType.JSON).value = pre.serializeToJson();
300                 cmd.executeNonQuery();
301             }
302             catch(Exception e){
303                 logWarn("Catch Exception during journaling:%s", e.toString());
304             }
305             logWarn("S --- C | Journaling done");
306         }
307     }
308 
309     PGConnection conn;
310     private {
311 
312         auto selectFromPk(immutable(Table) table, Pk)(Pk pk)
313         {
314             logWarn("S --- C | Selecting the record to be updated for the journal");
315             enum sql = selectFromParameters(table);
316             auto cmd = new PGCommand(conn, sql);
317             addParameters!table(cmd, pk);
318             //try {
319                 auto querySet = cmd.executeQuery!(asStruct!table)();
320                 scope(exit) querySet.close();
321                 auto pre = querySet.front;
322             //}
323             //catch(ServerErrorException e){
324             //}
325             logWarn("S --- C | Select done, record is %s", pre.serializeToJson());
326             return pre;
327         }
328 
329         private PostgresDB db;
330         private string username_;
331         public @property username() { return username_; }
332     }
333 }
334 
335 
336 private {
337     import mars.lexer;
338     import mars.sqldb;
339 
340     PGType toPGType(TypeInfo t){
341         if(t == typeid(bool)) return PGType.BOOLEAN;
342         if(t == typeid(int)) return PGType.INT4;
343         if(t == typeid(short)) return PGType.INT2;
344         if(t == typeid(string)) return PGType.TEXT;
345         if(t == typeid(float)) return PGType.FLOAT4;
346         if(t == typeid(double)) return PGType.FLOAT8;
347         if(t == typeid(ubyte[])) return PGType.BYTEA;
348         assert(false, t.to!string);
349     }
350 
351     PGType toPGType(Type t){
352         final switch(t) with(Type) {
353             case boolean: return PGType.BOOLEAN;
354             case integer: return PGType.INT4; // XXX check
355             case bigint: return PGType.INT8;
356             case smallint: return PGType.INT2; // XXX check 
357             case text: return PGType.TEXT;
358             case real_: return PGType.FLOAT4;
359             case doublePrecision: return PGType.FLOAT8;
360             case bytea: return PGType.BYTEA;
361             case smallserial: return PGType.INT2; // XXX check
362             case serial: return PGType.INT4; // there's not really a serial type in postgres
363             case date: return PGType.DATE; // XXX temptative
364 
365             case unknown:
366             case varchar: // varchar(n), tbd as column
367                 assert(false, t.to!string); // not implemented right now, catch at CT
368         }
369     }
370 
371     void addParameters(immutable(Table) table, Struct, bool noSerials = false, short tupleofIndex =0)(PGCommand cmd, Struct s, short paramIndex =1){
372         static if( is(Struct : asStruct!table) || Struct.tupleof.length == asStruct!(table).tupleof.length )
373         {
374             auto type =  table.columns[tupleofIndex].type;
375             static if( noSerials ) auto mustAdd = type != Type.serial && type != Type.smallserial;
376             else bool mustAdd = true;
377             if( mustAdd ) cmd.parameters.add(paramIndex, table.columns[tupleofIndex].type.toPGType).value = s.tupleof[tupleofIndex];
378         }
379         else static if( is(Struct : asPkStruct!table) || Struct.tupleof.length == asPkStruct!(table).tupleof.length )
380         {
381             auto type =  table.columns[tupleofIndex].type;
382             static if( noSerials ) auto mustAdd = type != Type.serial && type != Type.smallserial;
383             else bool mustAdd = true;
384             if( mustAdd ) cmd.parameters.add(paramIndex, table.pkCols[tupleofIndex].type.toPGType).value = s.tupleof[tupleofIndex];
385         }
386         else static assert(false);
387 
388         static if( s.tupleof.length > tupleofIndex+1 ) addParameters!(table, Struct, noSerials, tupleofIndex +1)(cmd, s, ++paramIndex);
389     }
390 
391 
392     string select(const(Select) stat){
393         return `select %s from %s`.format(
394             stat.cols.map!((c) => c.name).join(", "),
395             stat.tables.map!( (t) => t.name ).join(", "), /// XXX ho bisogno del nome dello schema QUA... refactory necessario
396             );
397     }
398     unittest {
399         auto s = starwarsSchema();
400         const sql = cast(Select)Parser([s], scan("select name from sw.people")).parse();
401         assert(select(sql) == "select name from people", select(sql));
402     }
403     version(unittest_starwars){
404         unittest {
405             enum pub = starwarsSchema();
406             enum tokens = scan("select name from sw.people");
407             static const stat = Parser([pub], tokens).parse();
408             auto db = new Database("127.0.0.1", "starwars", "jedi", "force");
409             db.execute(cast(Select)stat);
410         }
411         unittest {
412             // check bigint select
413             enum pub = starwarsSchema();
414             enum tokens = scan("select population from sw.planets");
415             static const stat = Parser([pub], tokens).parse();
416             auto db = new Database("127.0.0.1", "starwars", "jedi", "force");
417             db.execute(cast(Select)stat);
418         }
419     }
420     else {
421         version(unittest){
422             pragma(msg, "compile with version 'unittest_starwars' to activate postgresql starwars tests.");
423         }
424     }
425 
426     auto xxx(string sql, Variant[string] parameters){
427         auto names = sort(parameters.keys);
428         Variant[] pgparam;
429         foreach(name; names){
430             pgparam ~= parameters[name];
431             sql = sql.replace("$"~name, "$"~(pgparam.length).to!string); // they are starting from $1, and not from $0
432         }
433         return tuple(sql, pgparam);
434     }
435     unittest {
436         auto r = xxx("select * from planets where name=$name", ["name": Variant("Tatooine")]);
437     }
438 }
439