1 /** 2 * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 3 * 4 * Source file ddbc/drivers/pgsqlddbc.d. 5 * 6 * DDBC library attempts to provide implementation independent interface to different databases. 7 * 8 * Set of supported RDBMSs can be extended by writing Drivers for particular DBs. 9 * Currently it only includes MySQL driver. 10 * 11 * JDBC documentation can be found here: 12 * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR) 13 * 14 * This module contains implementation of PostgreSQL Driver 15 * 16 * 17 * You can find usage examples in unittest{} sections. 18 * 19 * Copyright: Copyright 2013 20 * License: $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0). 21 * Author: Vadim Lopatin 22 */ 23 module ddbc.drivers.pgsqlddbc; 24 25 26 version(USE_PGSQL) { 27 28 import std.algorithm; 29 import std.conv; 30 import std.datetime; 31 import std.exception; 32 import std.stdio; 33 import std.string; 34 import std.variant; 35 import core.sync.mutex; 36 37 import ddbc.common; 38 import ddbc.core; 39 import ddbc.drivers.pgsql; 40 import ddbc.drivers.utils; 41 42 version(unittest) { 43 /* 44 To allow unit tests using PostgreSQL server, 45 */ 46 /// change to false to disable tests on real PostgreSQL server 47 immutable bool PGSQL_TESTS_ENABLED = true; 48 /// change parameters if necessary 49 const string PGSQL_UNITTEST_HOST = "localhost"; 50 const int PGSQL_UNITTEST_PORT = 5432; 51 const string PGSQL_UNITTEST_USER = "testuser"; 52 const string PGSQL_UNITTEST_PASSWORD = "testpassword"; 53 const string PGSQL_UNITTEST_DB = "testdb"; 54 55 static if (PGSQL_TESTS_ENABLED) { 56 /// use this data source for tests 57 DataSource createUnitTestPGSQLDataSource() { 58 PGSQLDriver driver = new PGSQLDriver(); 59 string url = PGSQLDriver.generateUrl(PGSQL_UNITTEST_HOST, PGSQL_UNITTEST_PORT, PGSQL_UNITTEST_DB); 60 string[string] params = PGSQLDriver.setUserAndPassword(PGSQL_UNITTEST_USER, PGSQL_UNITTEST_PASSWORD); 61 return new ConnectionPoolDataSourceImpl(driver, url, params); 62 } 63 } 64 } 65 66 67 class PGSQLConnection : ddbc.core.Connection { 68 private: 69 string url; 70 string[string] params; 71 string dbName; 72 string username; 73 string password; 74 string hostname; 75 int port = 5432; 76 PGconn * conn; 77 bool closed; 78 bool autocommit; 79 Mutex mutex; 80 81 82 PGSQLStatement [] activeStatements; 83 84 void closeUnclosedStatements() { 85 PGSQLStatement [] list = activeStatements.dup; 86 foreach(stmt; list) { 87 stmt.close(); 88 } 89 } 90 91 void checkClosed() { 92 if (closed) 93 throw new SQLException("Connection is already closed"); 94 } 95 96 public: 97 98 void lock() { 99 mutex.lock(); 100 } 101 102 void unlock() { 103 mutex.unlock(); 104 } 105 106 PGconn * getConnection() { return conn; } 107 108 109 void onStatementClosed(PGSQLStatement stmt) { 110 foreach(index, item; activeStatements) { 111 if (item == stmt) { 112 remove(activeStatements, index); 113 return; 114 } 115 } 116 } 117 118 this(string url, string[string] params) { 119 mutex = new Mutex(); 120 this.url = url; 121 this.params = params; 122 //writeln("parsing url " ~ url); 123 string urlParams; 124 ptrdiff_t qmIndex = std..string.indexOf(url, '?'); 125 if (qmIndex >=0 ) { 126 urlParams = url[qmIndex + 1 .. $]; 127 url = url[0 .. qmIndex]; 128 // TODO: parse params 129 } 130 string dbName = ""; 131 ptrdiff_t firstSlashes = std..string.indexOf(url, "//"); 132 ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/'); 133 ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0; 134 ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length; 135 if (hostNameEnd < url.length - 1) { 136 dbName = url[hostNameEnd + 1 .. $]; 137 } 138 hostname = url[hostNameStart..hostNameEnd]; 139 if (hostname.length == 0) 140 hostname = "localhost"; 141 ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":"); 142 if (portDelimiter >= 0) { 143 string portString = hostname[portDelimiter + 1 .. $]; 144 hostname = hostname[0 .. portDelimiter]; 145 if (portString.length > 0) 146 port = to!int(portString); 147 if (port < 1 || port > 65535) 148 port = 5432; 149 } 150 username = params["user"]; 151 password = params["password"]; 152 153 //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password); 154 155 const char ** keywords = [std..string.toStringz("host"), std..string.toStringz("port"), std..string.toStringz("dbname"), std..string.toStringz("user"), std..string.toStringz("password"), null].ptr; 156 const char ** values = [std..string.toStringz(hostname), std..string.toStringz(to!string(port)), std..string.toStringz(dbName), std..string.toStringz(username), std..string.toStringz(password), null].ptr; 157 //writeln("trying to connect"); 158 conn = PQconnectdbParams(keywords, values, 0); 159 if(conn is null) 160 throw new SQLException("Cannot get Postgres connection"); 161 if(PQstatus(conn) != CONNECTION_OK) 162 throw new SQLException(copyCString(PQerrorMessage(conn))); 163 closed = false; 164 setAutoCommit(true); 165 updateConnectionParams(); 166 } 167 168 void updateConnectionParams() { 169 Statement stmt = createStatement(); 170 scope(exit) stmt.close(); 171 stmt.executeUpdate("SET NAMES 'utf8'"); 172 } 173 174 override void close() { 175 checkClosed(); 176 177 lock(); 178 scope(exit) unlock(); 179 180 closeUnclosedStatements(); 181 182 PQfinish(conn); 183 closed = true; 184 } 185 186 override void commit() { 187 checkClosed(); 188 189 lock(); 190 scope(exit) unlock(); 191 192 Statement stmt = createStatement(); 193 scope(exit) stmt.close(); 194 stmt.executeUpdate("COMMIT"); 195 } 196 197 override Statement createStatement() { 198 checkClosed(); 199 200 lock(); 201 scope(exit) unlock(); 202 203 PGSQLStatement stmt = new PGSQLStatement(this); 204 activeStatements ~= stmt; 205 return stmt; 206 } 207 208 PreparedStatement prepareStatement(string sql) { 209 checkClosed(); 210 211 lock(); 212 scope(exit) unlock(); 213 214 PGSQLPreparedStatement stmt = new PGSQLPreparedStatement(this, sql); 215 activeStatements ~= stmt; 216 return stmt; 217 } 218 219 override string getCatalog() { 220 return dbName; 221 } 222 223 /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work. 224 override void setCatalog(string catalog) { 225 checkClosed(); 226 if (dbName == catalog) 227 return; 228 229 lock(); 230 scope(exit) unlock(); 231 232 //conn.selectDB(catalog); 233 dbName = catalog; 234 // TODO: 235 236 throw new SQLException("Not implemented"); 237 } 238 239 override bool isClosed() { 240 return closed; 241 } 242 243 override void rollback() { 244 checkClosed(); 245 246 lock(); 247 scope(exit) unlock(); 248 249 Statement stmt = createStatement(); 250 scope(exit) stmt.close(); 251 stmt.executeUpdate("ROLLBACK"); 252 } 253 override bool getAutoCommit() { 254 return autocommit; 255 } 256 override void setAutoCommit(bool autoCommit) { 257 checkClosed(); 258 if (this.autocommit == autoCommit) 259 return; 260 lock(); 261 scope(exit) unlock(); 262 263 Statement stmt = createStatement(); 264 scope(exit) stmt.close(); 265 stmt.executeUpdate("SET autocommit = " ~ (autoCommit ? "ON" : "OFF")); 266 this.autocommit = autoCommit; 267 } 268 } 269 270 class PGSQLStatement : Statement { 271 private: 272 PGSQLConnection conn; 273 // Command * cmd; 274 // ddbc.drivers.mysql.ResultSet rs; 275 PGSQLResultSet resultSet; 276 277 bool closed; 278 279 public: 280 void checkClosed() { 281 enforceEx!SQLException(!closed, "Statement is already closed"); 282 } 283 284 void lock() { 285 conn.lock(); 286 } 287 288 void unlock() { 289 conn.unlock(); 290 } 291 292 this(PGSQLConnection conn) { 293 this.conn = conn; 294 } 295 296 ResultSetMetaData createMetadata(PGresult * res) { 297 int rows = PQntuples(res); 298 int fieldCount = PQnfields(res); 299 ColumnMetadataItem[] list = new ColumnMetadataItem[fieldCount]; 300 for(int i = 0; i < fieldCount; i++) { 301 ColumnMetadataItem item = new ColumnMetadataItem(); 302 //item.schemaName = field.db; 303 item.name = copyCString(PQfname(res, i)); 304 //item.tableName = copyCString(PQftable(res, i)); 305 int fmt = PQfformat(res, i); 306 ulong t = PQftype(res, i); 307 item.label = copyCString(PQfname(res, i)); 308 //item.precision = field.length; 309 //item.scale = field.scale; 310 //item.isNullable = !field.notNull; 311 //item.isSigned = !field.unsigned; 312 //item.type = fromPGSQLType(field.type); 313 // // TODO: fill more params 314 list[i] = item; 315 } 316 return new ResultSetMetaDataImpl(list); 317 } 318 ParameterMetaData createParameterMetadata(int paramCount) { 319 ParameterMetaDataItem[] res = new ParameterMetaDataItem[paramCount]; 320 for(int i = 0; i < paramCount; i++) { 321 ParameterMetaDataItem item = new ParameterMetaDataItem(); 322 item.precision = 0; 323 item.scale = 0; 324 item.isNullable = true; 325 item.isSigned = true; 326 item.type = SqlType.VARCHAR; 327 res[i] = item; 328 } 329 return new ParameterMetaDataImpl(res); 330 } 331 public: 332 PGSQLConnection getConnection() { 333 checkClosed(); 334 return conn; 335 } 336 337 private void fillData(PGresult * res, ref Variant[][] data) { 338 int rows = PQntuples(res); 339 int fieldCount = PQnfields(res); 340 int[] fmts = new int[fieldCount]; 341 int[] types = new int[fieldCount]; 342 for (int col = 0; col < fieldCount; col++) { 343 fmts[col] = PQfformat(res, col); 344 types[col] = cast(int)PQftype(res, col); 345 } 346 for (int row = 0; row < rows; row++) { 347 Variant[] v = new Variant[fieldCount]; 348 for (int col = 0; col < fieldCount; col++) { 349 int n = PQgetisnull(res, row, col); 350 if (n != 0) { 351 v[col] = null; 352 } else { 353 int len = PQgetlength(res, row, col); 354 const char * value = PQgetvalue(res, row, col); 355 int t = types[col]; 356 //writeln("[" ~ to!string(row) ~ "][" ~ to!string(col) ~ "] type = " ~ to!string(t) ~ " len = " ~ to!string(len)); 357 if (fmts[col] == 0) { 358 // text 359 string s = copyCString(value, len); 360 //writeln("text: " ~ s); 361 switch(t) { 362 case INT4OID: 363 v[col] = parse!int(s); 364 break; 365 case BOOLOID: 366 v[col] = s == "true" ? true : (s == "false" ? false : parse!int(s) != 0); 367 break; 368 case CHAROID: 369 v[col] = cast(char)(s.length > 0 ? s[0] : 0); 370 break; 371 case INT8OID: 372 v[col] = parse!long(s); 373 break; 374 case INT2OID: 375 v[col] = parse!short(s); 376 break; 377 case FLOAT4OID: 378 v[col] = parse!float(s); 379 break; 380 case FLOAT8OID: 381 v[col] = parse!double(s); 382 break; 383 case VARCHAROID: 384 case TEXTOID: 385 case NAMEOID: 386 v[col] = s; 387 break; 388 case BYTEAOID: 389 v[col] = byteaToUbytes(s); 390 break; 391 default: 392 throw new SQLException("Unsupported column type " ~ to!string(t)); 393 } 394 } else { 395 // binary 396 //writeln("binary:"); 397 byte[] b = new byte[len]; 398 for (int i=0; i<len; i++) 399 b[i] = value[i]; 400 v[col] = b; 401 } 402 } 403 } 404 data ~= v; 405 } 406 } 407 408 override ddbc.core.ResultSet executeQuery(string query) { 409 //throw new SQLException("Not implemented"); 410 checkClosed(); 411 lock(); 412 scope(exit) unlock(); 413 414 PGresult * res = PQexec(conn.getConnection(), std..string.toStringz(query)); 415 enforceEx!SQLException(res !is null, "Failed to execute statement " ~ query); 416 auto status = PQresultStatus(res); 417 enforceEx!SQLException(status == PGRES_TUPLES_OK, getError()); 418 scope(exit) PQclear(res); 419 420 // cmd = new Command(conn.getConnection(), query); 421 // rs = cmd.execSQLResult(); 422 auto metadata = createMetadata(res); 423 int rows = PQntuples(res); 424 int fieldCount = PQnfields(res); 425 Variant[][] data; 426 fillData(res, data); 427 resultSet = new PGSQLResultSet(this, data, metadata); 428 return resultSet; 429 } 430 431 string getError() { 432 return copyCString(PQerrorMessage(conn.getConnection())); 433 } 434 435 override int executeUpdate(string query) { 436 Variant dummy; 437 return executeUpdate(query, dummy); 438 } 439 440 void readInsertId(PGresult * res, ref Variant insertId) { 441 int rows = PQntuples(res); 442 int fieldCount = PQnfields(res); 443 //writeln("readInsertId - rows " ~ to!string(rows) ~ " " ~ to!string(fieldCount)); 444 if (rows == 1 && fieldCount == 1) { 445 int len = PQgetlength(res, 0, 0); 446 const char * value = PQgetvalue(res, 0, 0); 447 string s = copyCString(value, len); 448 insertId = parse!long(s); 449 } 450 } 451 452 override int executeUpdate(string query, out Variant insertId) { 453 checkClosed(); 454 lock(); 455 scope(exit) unlock(); 456 PGresult * res = PQexec(conn.getConnection(), std..string.toStringz(query)); 457 enforceEx!SQLException(res !is null, "Failed to execute statement " ~ query); 458 auto status = PQresultStatus(res); 459 enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, getError()); 460 scope(exit) PQclear(res); 461 462 string rowsAffected = copyCString(PQcmdTuples(res)); 463 464 readInsertId(res, insertId); 465 // auto lastid = PQoidValue(res); 466 // writeln("lastId = " ~ to!string(lastid)); 467 int affected = rowsAffected.length > 0 ? to!int(rowsAffected) : 0; 468 // insertId = Variant(cast(long)lastid); 469 return affected; 470 } 471 472 override void close() { 473 checkClosed(); 474 lock(); 475 scope(exit) unlock(); 476 closeResultSet(); 477 closed = true; 478 } 479 480 void closeResultSet() { 481 //throw new SQLException("Not implemented"); 482 // if (cmd == null) { 483 // return; 484 // } 485 // cmd.releaseStatement(); 486 // delete cmd; 487 // cmd = null; 488 // if (resultSet !is null) { 489 // resultSet.onStatementClosed(); 490 // resultSet = null; 491 // } 492 } 493 } 494 495 ulong preparedStatementIndex = 1; 496 497 class PGSQLPreparedStatement : PGSQLStatement, PreparedStatement { 498 string query; 499 int paramCount; 500 ResultSetMetaData metadata; 501 ParameterMetaData paramMetadata; 502 string stmtName; 503 bool[] paramIsSet; 504 string[] paramValue; 505 //PGresult * rs; 506 507 string convertParams(string query) { 508 string res; 509 int count = 0; 510 bool insideString = false; 511 char lastChar = 0; 512 foreach(ch; query) { 513 if (ch == '\'') { 514 if (insideString) { 515 if (lastChar != '\\') 516 insideString = false; 517 } else { 518 insideString = true; 519 } 520 res ~= ch; 521 } else if (ch == '?') { 522 if (!insideString) { 523 count++; 524 res ~= "$" ~ to!string(count); 525 } else { 526 res ~= ch; 527 } 528 } else { 529 res ~= ch; 530 } 531 lastChar = ch; 532 } 533 paramCount = count; 534 return res; 535 } 536 537 this(PGSQLConnection conn, string query) { 538 super(conn); 539 query = convertParams(query); 540 this.query = query; 541 paramMetadata = createParameterMetadata(paramCount); 542 stmtName = "ddbcstmt" ~ to!string(preparedStatementIndex); 543 paramIsSet = new bool[paramCount]; 544 paramValue = new string[paramCount]; 545 // rs = PQprepare(conn.getConnection(), 546 // toStringz(stmtName), 547 // toStringz(query), 548 // paramCount, 549 // null); 550 // enforceEx!SQLException(rs !is null, "Error while preparing statement " ~ query); 551 // auto status = PQresultStatus(rs); 552 //writeln("prepare paramCount = " ~ to!string(paramCount)); 553 // enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, "Error while preparing statement " ~ query ~ " : " ~ getError(rs)); 554 // metadata = createMetadata(rs); 555 //scope(exit) PQclear(rs); 556 } 557 string getError(PGresult * res) { 558 return copyCString(PQresultErrorMessage(res)); 559 } 560 void checkIndex(int index) { 561 if (index < 1 || index > paramCount) 562 throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range"); 563 } 564 void checkParams() { 565 foreach(i, b; paramIsSet) 566 enforceEx!SQLException(b, "Parameter " ~ to!string(i) ~ " is not set"); 567 } 568 void setParam(int index, string value) { 569 checkIndex(index); 570 paramValue[index - 1] = value; 571 paramIsSet[index - 1] = true; 572 } 573 574 PGresult * exec() { 575 checkParams(); 576 const (char) * [] values = new const(char)*[paramCount]; 577 int[] lengths = new int[paramCount]; 578 int[] formats = new int[paramCount]; 579 for (int i=0; i<paramCount; i++) { 580 if (paramValue[i] is null) 581 values[i] = null; 582 else 583 values[i] = toStringz(paramValue[i]); 584 lengths[i] = cast(int)paramValue[i].length; 585 } 586 // PGresult * res = PQexecPrepared(conn.getConnection(), 587 // toStringz(stmtName), 588 // paramCount, 589 // cast(const char * *)values.ptr, 590 // cast(const int *)lengths.ptr, 591 // cast(const int *)formats.ptr, 592 // 0); 593 PGresult * res = PQexecParams(conn.getConnection(), 594 toStringz(query), 595 paramCount, 596 null, 597 cast(const char * *)values.ptr, 598 cast(const int *)lengths.ptr, 599 cast(const int *)formats.ptr, 600 0); 601 enforceEx!SQLException(res !is null, "Error while executing prepared statement " ~ query); 602 metadata = createMetadata(res); 603 return res; 604 } 605 606 public: 607 608 override void close() { 609 checkClosed(); 610 lock(); 611 scope(exit) unlock(); 612 //PQclear(rs); 613 closeResultSet(); 614 closed = true; 615 } 616 617 /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed. 618 override ResultSetMetaData getMetaData() { 619 checkClosed(); 620 lock(); 621 scope(exit) unlock(); 622 return metadata; 623 } 624 625 /// Retrieves the number, types and properties of this PreparedStatement object's parameters. 626 override ParameterMetaData getParameterMetaData() { 627 //throw new SQLException("Not implemented"); 628 checkClosed(); 629 lock(); 630 scope(exit) unlock(); 631 return paramMetadata; 632 } 633 634 override int executeUpdate() { 635 Variant dummy; 636 return executeUpdate(dummy); 637 } 638 639 override int executeUpdate(out Variant insertId) { 640 checkClosed(); 641 lock(); 642 scope(exit) unlock(); 643 PGresult * res = exec(); 644 scope(exit) PQclear(res); 645 auto status = PQresultStatus(res); 646 enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, getError(res)); 647 648 string rowsAffected = copyCString(PQcmdTuples(res)); 649 //auto lastid = PQoidValue(res); 650 readInsertId(res, insertId); 651 //writeln("lastId = " ~ to!string(lastid)); 652 int affected = rowsAffected.length > 0 ? to!int(rowsAffected) : 0; 653 //insertId = Variant(cast(long)lastid); 654 return affected; 655 } 656 657 override ddbc.core.ResultSet executeQuery() { 658 checkClosed(); 659 lock(); 660 scope(exit) unlock(); 661 PGresult * res = exec(); 662 scope(exit) PQclear(res); 663 int rows = PQntuples(res); 664 int fieldCount = PQnfields(res); 665 Variant[][] data; 666 fillData(res, data); 667 resultSet = new PGSQLResultSet(this, data, metadata); 668 return resultSet; 669 } 670 671 override void clearParameters() { 672 throw new SQLException("Not implemented"); 673 // checkClosed(); 674 // lock(); 675 // scope(exit) unlock(); 676 // for (int i = 1; i <= paramCount; i++) 677 // setNull(i); 678 } 679 680 override void setFloat(int parameterIndex, float x) { 681 checkClosed(); 682 lock(); 683 scope(exit) unlock(); 684 setParam(parameterIndex, to!string(x)); 685 } 686 override void setDouble(int parameterIndex, double x){ 687 checkClosed(); 688 lock(); 689 scope(exit) unlock(); 690 setParam(parameterIndex, to!string(x)); 691 } 692 override void setBoolean(int parameterIndex, bool x) { 693 checkClosed(); 694 lock(); 695 scope(exit) unlock(); 696 setParam(parameterIndex, x ? "true" : "false"); 697 } 698 override void setLong(int parameterIndex, long x) { 699 checkClosed(); 700 lock(); 701 scope(exit) unlock(); 702 setParam(parameterIndex, to!string(x)); 703 } 704 705 override void setUlong(int parameterIndex, ulong x) { 706 checkClosed(); 707 lock(); 708 scope(exit) unlock(); 709 setParam(parameterIndex, to!string(x)); 710 } 711 712 override void setInt(int parameterIndex, int x) { 713 checkClosed(); 714 lock(); 715 scope(exit) unlock(); 716 setParam(parameterIndex, to!string(x)); 717 } 718 719 override void setUint(int parameterIndex, uint x) { 720 checkClosed(); 721 lock(); 722 scope(exit) unlock(); 723 setParam(parameterIndex, to!string(x)); 724 } 725 726 override void setShort(int parameterIndex, short x) { 727 checkClosed(); 728 lock(); 729 scope(exit) unlock(); 730 setParam(parameterIndex, to!string(x)); 731 } 732 733 override void setUshort(int parameterIndex, ushort x) { 734 checkClosed(); 735 lock(); 736 scope(exit) unlock(); 737 setParam(parameterIndex, to!string(x)); 738 } 739 740 override void setByte(int parameterIndex, byte x) { 741 checkClosed(); 742 lock(); 743 scope(exit) unlock(); 744 setParam(parameterIndex, to!string(x)); 745 } 746 747 override void setUbyte(int parameterIndex, ubyte x) { 748 checkClosed(); 749 lock(); 750 scope(exit) unlock(); 751 checkIndex(parameterIndex); 752 setParam(parameterIndex, to!string(x)); 753 } 754 755 override void setBytes(int parameterIndex, byte[] x) { 756 setString(parameterIndex, bytesToBytea(x)); 757 } 758 override void setUbytes(int parameterIndex, ubyte[] x) { 759 setString(parameterIndex, ubytesToBytea(x)); 760 } 761 override void setString(int parameterIndex, string x) { 762 checkClosed(); 763 lock(); 764 scope(exit) unlock(); 765 setParam(parameterIndex, x); 766 } 767 override void setDateTime(int parameterIndex, DateTime x) { 768 setString(parameterIndex, x.toISOString()); 769 } 770 override void setDate(int parameterIndex, Date x) { 771 setString(parameterIndex, x.toISOString()); 772 } 773 override void setTime(int parameterIndex, TimeOfDay x) { 774 setString(parameterIndex, x.toISOString()); 775 } 776 777 override void setVariant(int parameterIndex, Variant x) { 778 checkClosed(); 779 lock(); 780 scope(exit) unlock(); 781 if (x.convertsTo!DateTime) 782 setDateTime(parameterIndex, x.get!DateTime); 783 else if (x.convertsTo!Date) 784 setDate(parameterIndex, x.get!Date); 785 else if (x.convertsTo!TimeOfDay) 786 setTime(parameterIndex, x.get!TimeOfDay); 787 else if (x.convertsTo!(byte[])) 788 setBytes(parameterIndex, x.get!(byte[])); 789 else if (x.convertsTo!(ubyte[])) 790 setUbytes(parameterIndex, x.get!(ubyte[])); 791 else 792 setParam(parameterIndex, x.toString()); 793 } 794 795 override void setNull(int parameterIndex) { 796 checkClosed(); 797 lock(); 798 scope(exit) unlock(); 799 setParam(parameterIndex, null); 800 } 801 802 override void setNull(int parameterIndex, int sqlType) { 803 checkClosed(); 804 lock(); 805 scope(exit) unlock(); 806 setParam(parameterIndex, null); 807 } 808 } 809 810 class PGSQLResultSet : ResultSetImpl { 811 private PGSQLStatement stmt; 812 private Variant[][] data; 813 ResultSetMetaData metadata; 814 private bool closed; 815 private int currentRowIndex; 816 private int rowCount; 817 private int[string] columnMap; 818 private bool lastIsNull; 819 private int columnCount; 820 821 Variant getValue(int columnIndex) { 822 checkClosed(); 823 enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex)); 824 enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set"); 825 Variant res = data[currentRowIndex][columnIndex - 1]; 826 lastIsNull = (res == null); 827 return res; 828 } 829 830 void checkClosed() { 831 if (closed) 832 throw new SQLException("Result set is already closed"); 833 } 834 835 public: 836 837 void lock() { 838 stmt.lock(); 839 } 840 841 void unlock() { 842 stmt.unlock(); 843 } 844 845 this(PGSQLStatement stmt, Variant[][] data, ResultSetMetaData metadata) { 846 this.stmt = stmt; 847 this.data = data; 848 this.metadata = metadata; 849 closed = false; 850 rowCount = cast(int)data.length; 851 currentRowIndex = -1; 852 columnCount = metadata.getColumnCount(); 853 for (int i=0; i<columnCount; i++) { 854 columnMap[metadata.getColumnName(i + 1)] = i; 855 } 856 //writeln("created result set: " ~ to!string(rowCount) ~ " rows, " ~ to!string(columnCount) ~ " cols"); 857 } 858 859 void onStatementClosed() { 860 closed = true; 861 } 862 863 // ResultSet interface implementation 864 865 //Retrieves the number, types and properties of this ResultSet object's columns 866 override ResultSetMetaData getMetaData() { 867 checkClosed(); 868 lock(); 869 scope(exit) unlock(); 870 return metadata; 871 } 872 873 override void close() { 874 checkClosed(); 875 lock(); 876 scope(exit) unlock(); 877 stmt.closeResultSet(); 878 closed = true; 879 } 880 override bool first() { 881 checkClosed(); 882 lock(); 883 scope(exit) unlock(); 884 currentRowIndex = 0; 885 return currentRowIndex >= 0 && currentRowIndex < rowCount; 886 } 887 override bool isFirst() { 888 checkClosed(); 889 lock(); 890 scope(exit) unlock(); 891 return rowCount > 0 && currentRowIndex == 0; 892 } 893 override bool isLast() { 894 checkClosed(); 895 lock(); 896 scope(exit) unlock(); 897 return rowCount > 0 && currentRowIndex == rowCount - 1; 898 } 899 override bool next() { 900 checkClosed(); 901 lock(); 902 scope(exit) unlock(); 903 if (currentRowIndex + 1 >= rowCount) 904 return false; 905 currentRowIndex++; 906 return true; 907 } 908 909 override int findColumn(string columnName) { 910 checkClosed(); 911 lock(); 912 scope(exit) unlock(); 913 int * p = (columnName in columnMap); 914 if (!p) 915 throw new SQLException("Column " ~ columnName ~ " not found"); 916 return *p + 1; 917 } 918 919 override bool getBoolean(int columnIndex) { 920 checkClosed(); 921 lock(); 922 scope(exit) unlock(); 923 Variant v = getValue(columnIndex); 924 if (lastIsNull) 925 return false; 926 if (v.convertsTo!(bool)) 927 return v.get!(bool); 928 if (v.convertsTo!(int)) 929 return v.get!(int) != 0; 930 if (v.convertsTo!(long)) 931 return v.get!(long) != 0; 932 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean"); 933 } 934 override ubyte getUbyte(int columnIndex) { 935 checkClosed(); 936 lock(); 937 scope(exit) unlock(); 938 Variant v = getValue(columnIndex); 939 if (lastIsNull) 940 return 0; 941 if (v.convertsTo!(ubyte)) 942 return v.get!(ubyte); 943 if (v.convertsTo!(long)) 944 return to!ubyte(v.get!(long)); 945 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte"); 946 } 947 override byte getByte(int columnIndex) { 948 checkClosed(); 949 lock(); 950 scope(exit) unlock(); 951 Variant v = getValue(columnIndex); 952 if (lastIsNull) 953 return 0; 954 if (v.convertsTo!(byte)) 955 return v.get!(byte); 956 if (v.convertsTo!(long)) 957 return to!byte(v.get!(long)); 958 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte"); 959 } 960 override short getShort(int columnIndex) { 961 checkClosed(); 962 lock(); 963 scope(exit) unlock(); 964 Variant v = getValue(columnIndex); 965 if (lastIsNull) 966 return 0; 967 if (v.convertsTo!(short)) 968 return v.get!(short); 969 if (v.convertsTo!(long)) 970 return to!short(v.get!(long)); 971 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short"); 972 } 973 override ushort getUshort(int columnIndex) { 974 checkClosed(); 975 lock(); 976 scope(exit) unlock(); 977 Variant v = getValue(columnIndex); 978 if (lastIsNull) 979 return 0; 980 if (v.convertsTo!(ushort)) 981 return v.get!(ushort); 982 if (v.convertsTo!(long)) 983 return to!ushort(v.get!(long)); 984 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort"); 985 } 986 override int getInt(int columnIndex) { 987 checkClosed(); 988 lock(); 989 scope(exit) unlock(); 990 Variant v = getValue(columnIndex); 991 if (lastIsNull) 992 return 0; 993 if (v.convertsTo!(int)) 994 return v.get!(int); 995 if (v.convertsTo!(long)) 996 return to!int(v.get!(long)); 997 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int"); 998 } 999 override uint getUint(int columnIndex) { 1000 checkClosed(); 1001 lock(); 1002 scope(exit) unlock(); 1003 Variant v = getValue(columnIndex); 1004 if (lastIsNull) 1005 return 0; 1006 if (v.convertsTo!(uint)) 1007 return v.get!(uint); 1008 if (v.convertsTo!(ulong)) 1009 return to!uint(v.get!(ulong)); 1010 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint"); 1011 } 1012 override long getLong(int columnIndex) { 1013 checkClosed(); 1014 lock(); 1015 scope(exit) unlock(); 1016 Variant v = getValue(columnIndex); 1017 if (lastIsNull) 1018 return 0; 1019 if (v.convertsTo!(long)) 1020 return v.get!(long); 1021 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long"); 1022 } 1023 override ulong getUlong(int columnIndex) { 1024 checkClosed(); 1025 lock(); 1026 scope(exit) unlock(); 1027 Variant v = getValue(columnIndex); 1028 if (lastIsNull) 1029 return 0; 1030 if (v.convertsTo!(ulong)) 1031 return v.get!(ulong); 1032 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong"); 1033 } 1034 override double getDouble(int columnIndex) { 1035 checkClosed(); 1036 lock(); 1037 scope(exit) unlock(); 1038 Variant v = getValue(columnIndex); 1039 if (lastIsNull) 1040 return 0; 1041 if (v.convertsTo!(double)) 1042 return v.get!(double); 1043 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double"); 1044 } 1045 override float getFloat(int columnIndex) { 1046 checkClosed(); 1047 lock(); 1048 scope(exit) unlock(); 1049 Variant v = getValue(columnIndex); 1050 if (lastIsNull) 1051 return 0; 1052 if (v.convertsTo!(float)) 1053 return v.get!(float); 1054 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float"); 1055 } 1056 override byte[] getBytes(int columnIndex) { 1057 checkClosed(); 1058 lock(); 1059 scope(exit) unlock(); 1060 Variant v = getValue(columnIndex); 1061 if (lastIsNull) 1062 return null; 1063 if (v.convertsTo!(byte[])) { 1064 return v.get!(byte[]); 1065 } 1066 return byteaToBytes(v.toString()); 1067 } 1068 override ubyte[] getUbytes(int columnIndex) { 1069 checkClosed(); 1070 lock(); 1071 scope(exit) unlock(); 1072 Variant v = getValue(columnIndex); 1073 if (lastIsNull) 1074 return null; 1075 if (v.convertsTo!(ubyte[])) { 1076 return v.get!(ubyte[]); 1077 } 1078 return byteaToUbytes(v.toString()); 1079 } 1080 override string getString(int columnIndex) { 1081 checkClosed(); 1082 lock(); 1083 scope(exit) unlock(); 1084 Variant v = getValue(columnIndex); 1085 if (lastIsNull) 1086 return null; 1087 // if (v.convertsTo!(ubyte[])) { 1088 // // assume blob encoding is utf-8 1089 // // TODO: check field encoding 1090 // return decodeTextBlob(v.get!(ubyte[])); 1091 // } 1092 return v.toString(); 1093 } 1094 override std.datetime.DateTime getDateTime(int columnIndex) { 1095 checkClosed(); 1096 lock(); 1097 scope(exit) unlock(); 1098 Variant v = getValue(columnIndex); 1099 if (lastIsNull) 1100 return DateTime(); 1101 if (v.convertsTo!(DateTime)) { 1102 return v.get!DateTime(); 1103 } 1104 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime"); 1105 } 1106 override std.datetime.Date getDate(int columnIndex) { 1107 checkClosed(); 1108 lock(); 1109 scope(exit) unlock(); 1110 Variant v = getValue(columnIndex); 1111 if (lastIsNull) 1112 return Date(); 1113 if (v.convertsTo!(Date)) { 1114 return v.get!Date(); 1115 } 1116 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date"); 1117 } 1118 override std.datetime.TimeOfDay getTime(int columnIndex) { 1119 checkClosed(); 1120 lock(); 1121 scope(exit) unlock(); 1122 Variant v = getValue(columnIndex); 1123 if (lastIsNull) 1124 return TimeOfDay(); 1125 if (v.convertsTo!(TimeOfDay)) { 1126 return v.get!TimeOfDay(); 1127 } 1128 throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay"); 1129 } 1130 1131 override Variant getVariant(int columnIndex) { 1132 checkClosed(); 1133 lock(); 1134 scope(exit) unlock(); 1135 Variant v = getValue(columnIndex); 1136 if (lastIsNull) { 1137 Variant vnull = null; 1138 return vnull; 1139 } 1140 return v; 1141 } 1142 override bool wasNull() { 1143 checkClosed(); 1144 lock(); 1145 scope(exit) unlock(); 1146 return lastIsNull; 1147 } 1148 override bool isNull(int columnIndex) { 1149 checkClosed(); 1150 lock(); 1151 scope(exit) unlock(); 1152 enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex)); 1153 enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set"); 1154 return data[currentRowIndex][columnIndex - 1] == null; 1155 } 1156 1157 //Retrieves the Statement object that produced this ResultSet object. 1158 override Statement getStatement() { 1159 checkClosed(); 1160 lock(); 1161 scope(exit) unlock(); 1162 return stmt; 1163 } 1164 1165 //Retrieves the current row number 1166 override int getRow() { 1167 checkClosed(); 1168 lock(); 1169 scope(exit) unlock(); 1170 if (currentRowIndex <0 || currentRowIndex >= rowCount) 1171 return 0; 1172 return currentRowIndex + 1; 1173 } 1174 1175 //Retrieves the fetch size for this ResultSet object. 1176 override int getFetchSize() { 1177 checkClosed(); 1178 lock(); 1179 scope(exit) unlock(); 1180 return rowCount; 1181 } 1182 } 1183 1184 1185 // sample URL: 1186 // mysql://localhost:3306/DatabaseName 1187 1188 //String url = "jdbc:postgresql://localhost/test"; 1189 //Properties props = new Properties(); 1190 //props.setProperty("user","fred"); 1191 //props.setProperty("password","secret"); 1192 //props.setProperty("ssl","true"); 1193 //Connection conn = DriverManager.getConnection(url, props); 1194 class PGSQLDriver : Driver { 1195 // helper function 1196 public static string generateUrl(string host, ushort port, string dbname) { 1197 return "postgresql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname; 1198 } 1199 public static string[string] setUserAndPassword(string username, string password) { 1200 string[string] params; 1201 params["user"] = username; 1202 params["password"] = password; 1203 params["ssl"] = "true"; 1204 return params; 1205 } 1206 override ddbc.core.Connection connect(string url, string[string] params) { 1207 //writeln("PGSQLDriver.connect " ~ url); 1208 return new PGSQLConnection(url, params); 1209 } 1210 } 1211 1212 unittest { 1213 static if (PGSQL_TESTS_ENABLED) { 1214 1215 import std.conv; 1216 DataSource ds = createUnitTestPGSQLDataSource(); 1217 1218 auto conn = ds.getConnection(); 1219 assert(conn !is null); 1220 scope(exit) conn.close(); 1221 { 1222 //writeln("dropping table"); 1223 Statement stmt = conn.createStatement(); 1224 scope(exit) stmt.close(); 1225 stmt.executeUpdate("DROP TABLE IF EXISTS t1"); 1226 } 1227 { 1228 //writeln("creating table"); 1229 Statement stmt = conn.createStatement(); 1230 scope(exit) stmt.close(); 1231 stmt.executeUpdate("CREATE TABLE IF NOT EXISTS t1 (id SERIAL, name VARCHAR(255) NOT NULL, flags int null)"); 1232 //writeln("populating table"); 1233 Variant id = 0; 1234 assert(stmt.executeUpdate("INSERT INTO t1 (name) VALUES ('test1') returning id", id) == 1); 1235 assert(id.get!long > 0); 1236 } 1237 { 1238 PreparedStatement stmt = conn.prepareStatement("INSERT INTO t1 (name) VALUES ('test2') returning id"); 1239 scope(exit) stmt.close(); 1240 Variant id = 0; 1241 assert(stmt.executeUpdate(id) == 1); 1242 assert(id.get!long > 0); 1243 } 1244 { 1245 //writeln("reading table"); 1246 Statement stmt = conn.createStatement(); 1247 scope(exit) stmt.close(); 1248 ResultSet rs = stmt.executeQuery("SELECT id, name, flags FROM t1"); 1249 assert(rs.getMetaData().getColumnCount() == 3); 1250 assert(rs.getMetaData().getColumnName(1) == "id"); 1251 assert(rs.getMetaData().getColumnName(2) == "name"); 1252 assert(rs.getMetaData().getColumnName(3) == "flags"); 1253 scope(exit) rs.close(); 1254 //writeln("id" ~ "\t" ~ "name"); 1255 while (rs.next()) { 1256 long id = rs.getLong(1); 1257 string name = rs.getString(2); 1258 assert(rs.isNull(3)); 1259 //writeln("" ~ to!string(id) ~ "\t" ~ name); 1260 } 1261 } 1262 { 1263 //writeln("reading table"); 1264 Statement stmt = conn.createStatement(); 1265 scope(exit) stmt.close(); 1266 ResultSet rs = stmt.executeQuery("SELECT id, name, flags FROM t1"); 1267 assert(rs.getMetaData().getColumnCount() == 3); 1268 assert(rs.getMetaData().getColumnName(1) == "id"); 1269 assert(rs.getMetaData().getColumnName(2) == "name"); 1270 assert(rs.getMetaData().getColumnName(3) == "flags"); 1271 scope(exit) rs.close(); 1272 //writeln("id" ~ "\t" ~ "name"); 1273 while (rs.next()) { 1274 //writeln("calling getLong"); 1275 long id = rs.getLong(1); 1276 //writeln("done getLong"); 1277 string name = rs.getString(2); 1278 assert(rs.isNull(3)); 1279 //writeln("" ~ to!string(id) ~ "\t" ~ name); 1280 } 1281 } 1282 { 1283 //writeln("reading table with parameter id=1"); 1284 PreparedStatement stmt = conn.prepareStatement("SELECT id, name, flags FROM t1 WHERE id = ?"); 1285 scope(exit) stmt.close(); 1286 // assert(stmt.getMetaData().getColumnCount() == 3); 1287 // assert(stmt.getMetaData().getColumnName(1) == "id"); 1288 // assert(stmt.getMetaData().getColumnName(2) == "name"); 1289 // assert(stmt.getMetaData().getColumnName(3) == "flags"); 1290 //writeln("calling setLong"); 1291 stmt.setLong(1, 1); 1292 //writeln("done setLong"); 1293 { 1294 ResultSet rs = stmt.executeQuery(); 1295 scope(exit) rs.close(); 1296 //writeln("id" ~ "\t" ~ "name"); 1297 while (rs.next()) { 1298 long id = rs.getLong(1); 1299 string name = rs.getString(2); 1300 assert(rs.isNull(3)); 1301 //writeln("" ~ to!string(id) ~ "\t" ~ name); 1302 } 1303 } 1304 //writeln("changing parameter id=2"); 1305 //writeln("calling setLong"); 1306 stmt.setLong(1, 2); 1307 //writeln("done setLong"); 1308 { 1309 ResultSet rs = stmt.executeQuery(); 1310 scope(exit) rs.close(); 1311 //writeln("id" ~ "\t" ~ "name"); 1312 while (rs.next()) { 1313 long id = rs.getLong(1); 1314 string name = rs.getString(2); 1315 //writeln("" ~ to!string(id) ~ "\t" ~ name); 1316 } 1317 } 1318 } 1319 } 1320 } 1321 1322 } else { // version(USE_PGSQL) 1323 immutable bool PGSQL_TESTS_ENABLED = false; 1324 }