1 2 module mars.pgsql; 3 4 import std.algorithm; 5 import std.conv; 6 import std.format; 7 import std..string; 8 import std.range; 9 import std.typecons; 10 import std.variant; 11 12 import mars.defs; 13 import mars.msg : AuthoriseError, InsertError, DeleteError, RequestState; 14 version(unittest) import mars.starwars; 15 16 import ddb.postgres; 17 import ddb.db; 18 import vibe.core.log; 19 import vibe.data.json; 20 21 string insertIntoReturningParameter(const(Table) table) 22 { 23 int i = 1; 24 return "insert into %s values (%s) returning *".format( 25 table.name, 26 table.columns.map!( (c) => c.type == Type.serial? "default" : "$" ~ (i++).to!string).join(", ")); 27 } 28 unittest { 29 auto sql = Table("bar", [Col("foo", Type.text, false), Col("baz", Type.text, false)],[],[]).insertIntoReturningParameter(); 30 assert( sql == "insert into bar values ($1, $2) returning *", sql ); 31 auto sql2 = Table("bar", [Col("w_id", Type.serial), Col("w", Type.text)], [0], []).insertIntoReturningParameter(); 32 assert( sql2 == "insert into bar values (default, $1) returning *", sql2); 33 } 34 35 string deleteFromParameter(const(Table) table) 36 { 37 return "delete from %s where %s".format( 38 table.name, 39 zip(iota(0, table.pkCols.length), table.pkCols) 40 .map!( (t) => t[1].name ~ " = $" ~ (t[0]+1).to!string) 41 .join(" and ")); 42 } 43 unittest { 44 auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0, 1], []).deleteFromParameter(); 45 assert( sql == "delete from bar where foo = $1 and bar = $2", sql); 46 } 47 48 string updateFromParameters(const(Table) table) 49 { 50 immutable(Col)[] whereCols = table.pkCols.length >0? table.pkCols : table.columns; 51 int dollarIndex =1; 52 return "update %s set %s where %s".format( 53 table.name, 54 table.columns.map!( (t) => t.name ~ " = $" ~ (dollarIndex++).to!string).join(", "), 55 whereCols.map!( (t) => t.name ~ " = $" ~ (dollarIndex++).to!string).join(" and ")); 56 } 57 unittest { 58 auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0], []).updateFromParameters(); 59 assert( sql == "update bar set foo = $1, bar = $2, baz = $3 where foo = $4", sql ); 60 } 61 62 string selectFromParameters(const(Table) table) 63 { 64 return "select * from %s where %s".format( 65 table.name, 66 zip(iota(0, table.pkCols.length), table.pkCols) 67 .map!( (t) => t[1].name ~ " = $" ~ (t[0]+1).to!string) 68 .join(" and ")); 69 } 70 unittest { 71 auto sql = Table("bar", [Col("foo", Type.text, false), Col("bar", Type.text, false), Col("baz", Type.text, false)], [0, 1], []).deleteFromParameter(); 72 assert( sql == "select * from bar where foo = $1 and bar = $2", sql); 73 } 74 75 76 struct DatabaseService { 77 string host; 78 ushort port; 79 string database; 80 81 /** 82 * Returns: an instance of `Database` of null if can't connect or authenticate. Errors details in 'err' */ 83 Database connect(string user, string password, ref AuthoriseError err) in { 84 assert(user && password); 85 } body { 86 Database db; 87 try { 88 db = new Database(host, database, user, password); 89 err = AuthoriseError.authorised; 90 } 91 catch(ServerErrorException e){ 92 switch(e.code){ 93 case "28000": // role "user" does not exist 94 logInfo("PostgreSQL role does not exist"); 95 err = AuthoriseError.wrongUsernameOrPassword; 96 break; 97 case "28P01": // password authentication failed for user "user" 98 logInfo("PostgreSQL password authentication failed for user"); 99 err = AuthoriseError.wrongUsernameOrPassword; 100 break; 101 default: 102 logWarn("S --- C | Unhandled PostgreSQL server error during connection!"); 103 logInfo("S --- C | PostgreSQL server error: %s", e.toString); 104 err = AuthoriseError.unknownError; 105 } 106 } 107 catch(Exception e){ 108 logWarn("S --- C | exception connecting to the PostgreSQL!"); 109 logWarn("S --- C | %s", e); 110 err = AuthoriseError.unknownError; 111 } 112 assert( err != AuthoriseError.assertCheck); 113 return db; 114 } 115 } 116 117 class Database 118 { 119 private this(string host, string database, string user, string password){ 120 this.username_ = user; 121 if( db is null ){ 122 db = new PostgresDB(["host": host, "database": database, "user": user, "password": password]); 123 } 124 conn = db.lockConnection(); 125 } 126 127 void execute(const Select select) 128 { 129 string s = `select %s from %s`.format(select.cols[0].name, select.tables[0].name); 130 auto q = conn.executeQuery(s); 131 } 132 133 void executeUnsafe(string sql){ 134 auto q = conn.executeQuery(sql); 135 foreach(v; q){ 136 import std.stdio; writeln("-->", v); 137 } 138 } 139 T executeScalarUnsafe(T)(string sql){ 140 return conn.executeScalar!T(sql); 141 } 142 143 // usato da sync per la sottoscrizione di query complesse 144 auto executeQueryUnsafe(string sql){ 145 return conn.executeQuery(sql); 146 } 147 148 // usato da sync per la sottoscrizione di query complesse, con parametri 149 auto executeQueryUnsafe(string sql, Variant[string] parameters){ 150 // ... sort param names, transform names into a sequence of $1, $2 151 auto pgargs = xxx(sql, parameters); 152 // ... prepare the statement 153 auto cmd = new PGCommand(conn, pgargs[0]); 154 foreach(j, param; pgargs[1]){ 155 // ... try to guess the PGType from the Variant typeinfo ... 156 auto pgType = toPGType(param.type); 157 switch(pgType) with (PGType){ 158 case TEXT: 159 cmd.parameters.add((j+1).to!short, pgType).value = param.get!string; 160 break; 161 case INT2: 162 cmd.parameters.add((j+1).to!short, pgType).value = param.get!short; 163 break; 164 case INT4: 165 cmd.parameters.add((j+1).to!short, pgType).value = param.get!int; 166 break; 167 default: 168 assert(false, pgType.to!string); 169 } 170 } 171 return cmd.executeQuery(); 172 } 173 version(unittest_starwars){ unittest { 174 auto db = new Database("127.0.0.1", "starwars", "jedi", "force"); 175 auto recordSet = db.executeQueryUnsafe("select * from planets where name = $name", ["name": Variant("Tatooine")]); 176 scope(exit) recordSet.close(); 177 assert(recordSet.front[1].get!long == 120_000); 178 }} 179 180 auto executeQueryUnsafe(Row)(string sql){ 181 return conn.executeQuery!Row(sql); 182 } 183 184 auto executeInsert(immutable(Table) table, Row, )(Row record, ref InsertError err){ 185 enum sql = insertIntoReturningParameter(table); 186 auto cmd = new PGCommand(conn, sql); 187 addParameters!(table, Row, true)(cmd, record); // skip serial parameters 188 Row result; 189 try { 190 auto querySet = cmd.executeQuery!Row(); 191 scope(exit) querySet.close(); 192 result = querySet.front; 193 err = InsertError.inserted; 194 } 195 catch(ServerErrorException e){ 196 switch(e.code){ 197 case "23505": // duplicate key value violates unique constraint "<constraintname>" (for example in primary keys) 198 err = InsertError.duplicateKeyViolations; 199 break; 200 default: 201 logWarn("S --- C | Unhandled PostgreSQL server error during insertion!"); 202 logInfo("S --- C | PostgreSQL server error: %s", e.toString); 203 err = InsertError.unknownError; 204 } 205 } 206 if( table.journal && err == InsertError.inserted ){ 207 logWarn("S --- C | Journaling"); 208 try { 209 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post) values ($1, 'record_inserted', $2, $3)`); 210 cmd.parameters.add(1, PGType.TEXT).value = this.username; 211 cmd.parameters.add(2, PGType.TEXT).value = table.name; 212 cmd.parameters.add(3, PGType.JSON).value = result.serializeToJson(); 213 cmd.executeNonQuery(); 214 } 215 catch(Exception e){ 216 logWarn("Catch Exception during journaling:%s", e.toString()); 217 } 218 logWarn("S --- C | Journaling done"); 219 } 220 return result; 221 } 222 223 void executeDelete(immutable(Table) table, Pk)(Pk pk, ref DeleteError err){ 224 asStruct!table pre; 225 if( table.journal ){ 226 pre = selectFromPk!(table, Pk)(pk); 227 } 228 229 enum sql = deleteFromParameter(table); 230 auto cmd = new PGCommand(conn, sql); 231 232 addParameters!table(cmd, pk); 233 try { 234 cmd.executeNonQuery(); 235 err = DeleteError.deleted; 236 } 237 catch(ServerErrorException e){ 238 switch(e.code){ 239 case "23503": // update or delete on table "<table>" violates foreign key constraint "<constraint>" on table "<othertable>" 240 err = DeleteError.rejectedAsForeignKeyViolation; 241 break; 242 default: 243 logWarn("S --- C | Unhandled PostgreSQL server error during deletion!"); 244 logInfo("S --- C | PostgreSQL server error: %s", e.toString); 245 err = DeleteError.unknownError; 246 } 247 } 248 if( table.journal && err == DeleteError.deleted ){ 249 logWarn("S --- C | Journaling"); 250 try { 251 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post) values ($1, 'record_deleted', $2, $3)`); 252 cmd.parameters.add(1, PGType.TEXT).value = this.username; 253 cmd.parameters.add(2, PGType.TEXT).value = table.name; 254 cmd.parameters.add(3, PGType.JSON).value = pre.serializeToJson(); 255 cmd.executeNonQuery(); 256 } 257 catch(Exception e){ 258 logWarn("Catch Exception during journaling:%s", e.toString()); 259 } 260 logWarn("S --- C | Journaling done"); 261 } 262 } 263 264 void executeUpdate(immutable(Table) table, Pk, Row)(Pk pk, Row record, ref RequestState state) 265 { 266 267 asStruct!table pre; 268 if( table.journal ){ 269 pre = selectFromPk!(table, Pk)(pk); 270 } 271 272 enum sql = updateFromParameters(table); 273 auto cmd = new PGCommand(conn, sql); 274 addParameters!(table)(cmd, record); 275 short i = record.tupleof.length +1; 276 addParameters!table(cmd, pk, i); 277 try { 278 cmd.executeNonQuery(); 279 } 280 catch(ServerErrorException e){ 281 switch(e.code){ 282 case "23503": 283 logInfo("S --- C | PostgreSQL can't update the primary key as still referenced (maybe add an update cascade?)."); 284 state = RequestState.rejectedAsForeignKeyViolation; 285 break; 286 default: 287 logWarn("S --- C | Unhandled PostgreSQL server error during update!"); 288 logInfo("S --- C | PostgreSQL server error: %s", e.toString); 289 state = RequestState.rejectedAsPGSqlError; 290 } 291 } 292 if( table.journal && state == RequestState.executed ){ 293 logWarn("S --- C | Journaling"); 294 try { 295 cmd = new PGCommand(conn, `insert into journal (username, operation, tablename, post, pre) values ($1, 'record_updated', $2, $3, $4)`); 296 cmd.parameters.add(1, PGType.TEXT).value = this.username; 297 cmd.parameters.add(2, PGType.TEXT).value = table.name; 298 cmd.parameters.add(3, PGType.JSON).value = record.serializeToJson(); 299 cmd.parameters.add(4, PGType.JSON).value = pre.serializeToJson(); 300 cmd.executeNonQuery(); 301 } 302 catch(Exception e){ 303 logWarn("Catch Exception during journaling:%s", e.toString()); 304 } 305 logWarn("S --- C | Journaling done"); 306 } 307 } 308 309 PGConnection conn; 310 private { 311 312 auto selectFromPk(immutable(Table) table, Pk)(Pk pk) 313 { 314 logWarn("S --- C | Selecting the record to be updated for the journal"); 315 enum sql = selectFromParameters(table); 316 auto cmd = new PGCommand(conn, sql); 317 addParameters!table(cmd, pk); 318 //try { 319 auto querySet = cmd.executeQuery!(asStruct!table)(); 320 scope(exit) querySet.close(); 321 auto pre = querySet.front; 322 //} 323 //catch(ServerErrorException e){ 324 //} 325 logWarn("S --- C | Select done, record is %s", pre.serializeToJson()); 326 return pre; 327 } 328 329 private PostgresDB db; 330 private string username_; 331 public @property username() { return username_; } 332 } 333 } 334 335 336 private { 337 import mars.lexer; 338 import mars.sqldb; 339 340 PGType toPGType(TypeInfo t){ 341 if(t == typeid(bool)) return PGType.BOOLEAN; 342 if(t == typeid(int)) return PGType.INT4; 343 if(t == typeid(short)) return PGType.INT2; 344 if(t == typeid(string)) return PGType.TEXT; 345 if(t == typeid(float)) return PGType.FLOAT4; 346 if(t == typeid(double)) return PGType.FLOAT8; 347 if(t == typeid(ubyte[])) return PGType.BYTEA; 348 assert(false, t.to!string); 349 } 350 351 PGType toPGType(Type t){ 352 final switch(t) with(Type) { 353 case boolean: return PGType.BOOLEAN; 354 case integer: return PGType.INT4; // XXX check 355 case bigint: return PGType.INT8; 356 case smallint: return PGType.INT2; // XXX check 357 case text: return PGType.TEXT; 358 case real_: return PGType.FLOAT4; 359 case doublePrecision: return PGType.FLOAT8; 360 case bytea: return PGType.BYTEA; 361 case smallserial: return PGType.INT2; // XXX check 362 case serial: return PGType.INT4; // there's not really a serial type in postgres 363 case date: return PGType.DATE; // XXX temptative 364 365 case unknown: 366 case varchar: // varchar(n), tbd as column 367 assert(false, t.to!string); // not implemented right now, catch at CT 368 } 369 } 370 371 void addParameters(immutable(Table) table, Struct, bool noSerials = false, short tupleofIndex =0)(PGCommand cmd, Struct s, short paramIndex =1){ 372 static if( is(Struct : asStruct!table) || Struct.tupleof.length == asStruct!(table).tupleof.length ) 373 { 374 auto type = table.columns[tupleofIndex].type; 375 static if( noSerials ) auto mustAdd = type != Type.serial && type != Type.smallserial; 376 else bool mustAdd = true; 377 if( mustAdd ) cmd.parameters.add(paramIndex, table.columns[tupleofIndex].type.toPGType).value = s.tupleof[tupleofIndex]; 378 } 379 else static if( is(Struct : asPkStruct!table) || Struct.tupleof.length == asPkStruct!(table).tupleof.length ) 380 { 381 auto type = table.columns[tupleofIndex].type; 382 static if( noSerials ) auto mustAdd = type != Type.serial && type != Type.smallserial; 383 else bool mustAdd = true; 384 if( mustAdd ) cmd.parameters.add(paramIndex, table.pkCols[tupleofIndex].type.toPGType).value = s.tupleof[tupleofIndex]; 385 } 386 else static assert(false); 387 388 static if( s.tupleof.length > tupleofIndex+1 ) addParameters!(table, Struct, noSerials, tupleofIndex +1)(cmd, s, ++paramIndex); 389 } 390 391 392 string select(const(Select) stat){ 393 return `select %s from %s`.format( 394 stat.cols.map!((c) => c.name).join(", "), 395 stat.tables.map!( (t) => t.name ).join(", "), /// XXX ho bisogno del nome dello schema QUA... refactory necessario 396 ); 397 } 398 unittest { 399 auto s = starwarsSchema(); 400 const sql = cast(Select)Parser([s], scan("select name from sw.people")).parse(); 401 assert(select(sql) == "select name from people", select(sql)); 402 } 403 version(unittest_starwars){ 404 unittest { 405 enum pub = starwarsSchema(); 406 enum tokens = scan("select name from sw.people"); 407 static const stat = Parser([pub], tokens).parse(); 408 auto db = new Database("127.0.0.1", "starwars", "jedi", "force"); 409 db.execute(cast(Select)stat); 410 } 411 unittest { 412 // check bigint select 413 enum pub = starwarsSchema(); 414 enum tokens = scan("select population from sw.planets"); 415 static const stat = Parser([pub], tokens).parse(); 416 auto db = new Database("127.0.0.1", "starwars", "jedi", "force"); 417 db.execute(cast(Select)stat); 418 } 419 } 420 else { 421 version(unittest){ 422 pragma(msg, "compile with version 'unittest_starwars' to activate postgresql starwars tests."); 423 } 424 } 425 426 auto xxx(string sql, Variant[string] parameters){ 427 auto names = sort(parameters.keys); 428 Variant[] pgparam; 429 foreach(name; names){ 430 pgparam ~= parameters[name]; 431 sql = sql.replace("$"~name, "$"~(pgparam.length).to!string); // they are starting from $1, and not from $0 432 } 433 return tuple(sql, pgparam); 434 } 435 unittest { 436 auto r = xxx("select * from planets where name=$name", ["name": Variant("Tatooine")]); 437 } 438 } 439