1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/pgsqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of PostgreSQL Driver
15  * 
16  *
17  * You can find usage examples in unittest{} sections.
18  *
19  * Copyright: Copyright 2013
20  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
21  * Author:   Vadim Lopatin
22  */
23 module ddbc.drivers.pgsqlddbc;
24 
25 
26 version(USE_PGSQL) {
27 
28     import std.algorithm;
29     import std.conv;
30     import std.datetime;
31     import std.exception;
32     import std.stdio;
33     import std.string;
34     import std.variant;
35     import core.sync.mutex;
36     
37     import ddbc.common;
38     import ddbc.core;
39     import ddbc.drivers.pgsql;
40     import ddbc.drivers.utils;
41 
42     version(unittest) {
43     	/*
44             To allow unit tests using PostgreSQL server,
45          */
46     	/// change to false to disable tests on real PostgreSQL server
47     	immutable bool PGSQL_TESTS_ENABLED = true;
48     	/// change parameters if necessary
49     	const string PGSQL_UNITTEST_HOST = "localhost";
50     	const int    PGSQL_UNITTEST_PORT = 5432;
51     	const string PGSQL_UNITTEST_USER = "testuser";
52     	const string PGSQL_UNITTEST_PASSWORD = "testpassword";
53     	const string PGSQL_UNITTEST_DB = "testdb";
54     	
55     	static if (PGSQL_TESTS_ENABLED) {
56     		/// use this data source for tests
57     		DataSource createUnitTestPGSQLDataSource() {
58     			PGSQLDriver driver = new PGSQLDriver();
59     			string url = PGSQLDriver.generateUrl(PGSQL_UNITTEST_HOST, PGSQL_UNITTEST_PORT, PGSQL_UNITTEST_DB);
60     			string[string] params = PGSQLDriver.setUserAndPassword(PGSQL_UNITTEST_USER, PGSQL_UNITTEST_PASSWORD);
61     			return new ConnectionPoolDataSourceImpl(driver, url, params);
62     		}
63     	}
64     }
65 
66 
67     class PGSQLConnection : ddbc.core.Connection {
68     private:
69     	string url;
70     	string[string] params;
71     	string dbName;
72     	string username;
73     	string password;
74     	string hostname;
75     	int port = 5432;
76     	PGconn * conn;
77     	bool closed;
78     	bool autocommit;
79     	Mutex mutex;
80     	
81     	
82     	PGSQLStatement [] activeStatements;
83     	
84     	void closeUnclosedStatements() {
85     		PGSQLStatement [] list = activeStatements.dup;
86     		foreach(stmt; list) {
87     			stmt.close();
88     		}
89     	}
90     	
91     	void checkClosed() {
92     		if (closed)
93     			throw new SQLException("Connection is already closed");
94     	}
95     	
96     public:
97     	
98     	void lock() {
99     		mutex.lock();
100     	}
101     	
102     	void unlock() {
103     		mutex.unlock();
104     	}
105     	
106     	PGconn * getConnection() { return conn; }
107     	
108     	
109     	void onStatementClosed(PGSQLStatement stmt) {
110     		foreach(index, item; activeStatements) {
111     			if (item == stmt) {
112     				remove(activeStatements, index);
113     				return;
114     			}
115     		}
116     	}
117     	
118     	this(string url, string[string] params) {
119     		mutex = new Mutex();
120     		this.url = url;
121     		this.params = params;
122     		//writeln("parsing url " ~ url);
123     		string urlParams;
124     		ptrdiff_t qmIndex = std..string.indexOf(url, '?');
125     		if (qmIndex >=0 ) {
126     			urlParams = url[qmIndex + 1 .. $];
127     			url = url[0 .. qmIndex];
128     			// TODO: parse params
129     		}
130     		string dbName = "";
131     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
132     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
133     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
134     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
135     		if (hostNameEnd < url.length - 1) {
136     			dbName = url[hostNameEnd + 1 .. $];
137     		}
138     		hostname = url[hostNameStart..hostNameEnd];
139     		if (hostname.length == 0)
140     			hostname = "localhost";
141     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
142     		if (portDelimiter >= 0) {
143     			string portString = hostname[portDelimiter + 1 .. $];
144     			hostname = hostname[0 .. portDelimiter];
145     			if (portString.length > 0)
146     				port = to!int(portString);
147     			if (port < 1 || port > 65535)
148     				port = 5432;
149     		}
150     		username = params["user"];
151     		password = params["password"];
152     		
153     		//writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
154 
155     		const char ** keywords = [std..string.toStringz("host"), std..string.toStringz("port"), std..string.toStringz("dbname"), std..string.toStringz("user"), std..string.toStringz("password"), null].ptr;
156     		const char ** values = [std..string.toStringz(hostname), std..string.toStringz(to!string(port)), std..string.toStringz(dbName), std..string.toStringz(username), std..string.toStringz(password), null].ptr;
157     		//writeln("trying to connect");
158     		conn = PQconnectdbParams(keywords, values, 0);
159     		if(conn is null)
160     			throw new SQLException("Cannot get Postgres connection");
161     		if(PQstatus(conn) != CONNECTION_OK)
162     			throw new SQLException(copyCString(PQerrorMessage(conn)));
163     		closed = false;
164     		setAutoCommit(true);
165     		updateConnectionParams();
166     	}
167 
168     	void updateConnectionParams() {
169     		Statement stmt = createStatement();
170     		scope(exit) stmt.close();
171     		stmt.executeUpdate("SET NAMES 'utf8'");
172     	}
173 
174     	override void close() {
175     		checkClosed();
176     		
177     		lock();
178     		scope(exit) unlock();
179     		
180     		closeUnclosedStatements();
181     		
182     		PQfinish(conn);
183     		closed = true;
184     	}
185 
186     	override void commit() {
187     		checkClosed();
188     		
189     		lock();
190     		scope(exit) unlock();
191     		
192     		Statement stmt = createStatement();
193     		scope(exit) stmt.close();
194     		stmt.executeUpdate("COMMIT");
195     	}
196 
197     	override Statement createStatement() {
198     		checkClosed();
199     		
200     		lock();
201     		scope(exit) unlock();
202     		
203     		PGSQLStatement stmt = new PGSQLStatement(this);
204     		activeStatements ~= stmt;
205     		return stmt;
206     	}
207     	
208     	PreparedStatement prepareStatement(string sql) {
209     		checkClosed();
210     		
211     		lock();
212     		scope(exit) unlock();
213     		
214     		PGSQLPreparedStatement stmt = new PGSQLPreparedStatement(this, sql);
215     		activeStatements ~= stmt;
216     		return stmt;
217     	}
218     	
219     	override string getCatalog() {
220     		return dbName;
221     	}
222     	
223     	/// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
224     	override void setCatalog(string catalog) {
225     		checkClosed();
226     		if (dbName == catalog)
227     			return;
228     		
229     		lock();
230     		scope(exit) unlock();
231 
232     		//conn.selectDB(catalog);
233     		dbName = catalog;
234     		// TODO:
235 				
236     		throw new SQLException("Not implemented");
237     	}
238     	
239     	override bool isClosed() {
240     		return closed;
241     	}
242     	
243     	override void rollback() {
244     		checkClosed();
245     		
246     		lock();
247     		scope(exit) unlock();
248     		
249     		Statement stmt = createStatement();
250     		scope(exit) stmt.close();
251     		stmt.executeUpdate("ROLLBACK");
252     	}
253     	override bool getAutoCommit() {
254     		return autocommit;
255     	}
256     	override void setAutoCommit(bool autoCommit) {
257     		checkClosed();
258     		if (this.autocommit == autoCommit)
259     			return;
260     		lock();
261     		scope(exit) unlock();
262     		
263     		Statement stmt = createStatement();
264     		scope(exit) stmt.close();
265     		stmt.executeUpdate("SET autocommit = " ~ (autoCommit ? "ON" : "OFF"));
266     		this.autocommit = autoCommit;
267     	}
268     }
269 
270     class PGSQLStatement : Statement {
271     private:
272     	PGSQLConnection conn;
273     //	Command * cmd;
274     //	ddbc.drivers.mysql.ResultSet rs;
275     	PGSQLResultSet resultSet;
276     	
277     	bool closed;
278     	
279     public:
280     	void checkClosed() {
281     		enforceEx!SQLException(!closed, "Statement is already closed");
282     	}
283     	
284     	void lock() {
285     		conn.lock();
286     	}
287     	
288     	void unlock() {
289     		conn.unlock();
290     	}
291     	
292     	this(PGSQLConnection conn) {
293     		this.conn = conn;
294     	}
295     	
296     	ResultSetMetaData createMetadata(PGresult * res) {
297     		int rows = PQntuples(res);
298     		int fieldCount = PQnfields(res);
299     		ColumnMetadataItem[] list = new ColumnMetadataItem[fieldCount];
300     		for(int i = 0; i < fieldCount; i++) {
301     			ColumnMetadataItem item = new ColumnMetadataItem();
302     			//item.schemaName = field.db;
303     			item.name = copyCString(PQfname(res, i));
304                 //item.tableName = copyCString(PQftable(res, i));
305     			int fmt = PQfformat(res, i);
306     			ulong t = PQftype(res, i);
307     			item.label = copyCString(PQfname(res, i));
308     			//item.precision = field.length;
309     			//item.scale = field.scale;
310     			//item.isNullable = !field.notNull;
311     			//item.isSigned = !field.unsigned;
312     			//item.type = fromPGSQLType(field.type);
313     //			// TODO: fill more params
314     			list[i] = item;
315     		}
316     		return new ResultSetMetaDataImpl(list);
317     	}
318     	ParameterMetaData createParameterMetadata(int paramCount) {
319             ParameterMetaDataItem[] res = new ParameterMetaDataItem[paramCount];
320             for(int i = 0; i < paramCount; i++) {
321     			ParameterMetaDataItem item = new ParameterMetaDataItem();
322     			item.precision = 0;
323     			item.scale = 0;
324     			item.isNullable = true;
325     			item.isSigned = true;
326     			item.type = SqlType.VARCHAR;
327     			res[i] = item;
328     		}
329     		return new ParameterMetaDataImpl(res);
330     	}
331     public:
332     	PGSQLConnection getConnection() {
333     		checkClosed();
334     		return conn;
335     	}
336 
337         private void fillData(PGresult * res, ref Variant[][] data) {
338             int rows = PQntuples(res);
339             int fieldCount = PQnfields(res);
340             int[] fmts = new int[fieldCount];
341             int[] types = new int[fieldCount];
342             for (int col = 0; col < fieldCount; col++) {
343                 fmts[col] = PQfformat(res, col);
344                 types[col] = cast(int)PQftype(res, col);
345             }
346             for (int row = 0; row < rows; row++) {
347                 Variant[] v = new Variant[fieldCount];
348                 for (int col = 0; col < fieldCount; col++) {
349                     int n = PQgetisnull(res, row, col);
350                     if (n != 0) {
351                         v[col] = null;
352                     } else {
353                         int len = PQgetlength(res, row, col);
354                         const char * value = PQgetvalue(res, row, col);
355                         int t = types[col];
356                         //writeln("[" ~ to!string(row) ~ "][" ~ to!string(col) ~ "] type = " ~ to!string(t) ~ " len = " ~ to!string(len));
357                         if (fmts[col] == 0) {
358                             // text
359                             string s = copyCString(value, len);
360                             //writeln("text: " ~ s);
361                             switch(t) {
362                                 case INT4OID:
363                                     v[col] = parse!int(s);
364                                     break;
365                                 case BOOLOID:
366                                     v[col] = s == "true" ? true : (s == "false" ? false : parse!int(s) != 0);
367                                     break;
368                                 case CHAROID:
369                                     v[col] = cast(char)(s.length > 0 ? s[0] : 0);
370                                     break;
371                                 case INT8OID:
372                                     v[col] = parse!long(s);
373                                     break;
374                                 case INT2OID:
375                                     v[col] = parse!short(s);
376                                     break;
377                                 case FLOAT4OID:
378                                     v[col] = parse!float(s);
379                                     break;
380                                 case FLOAT8OID:
381                                     v[col] = parse!double(s);
382                                     break;
383                                 case VARCHAROID:
384                                 case TEXTOID:
385                                 case NAMEOID:
386                                     v[col] = s;
387                                     break;
388                                 case BYTEAOID:
389                                     v[col] = byteaToUbytes(s);
390                                     break;
391                                 default:
392                                     throw new SQLException("Unsupported column type " ~ to!string(t));
393                             }
394                         } else {
395                             // binary
396                             //writeln("binary:");
397                             byte[] b = new byte[len];
398                             for (int i=0; i<len; i++)
399                                 b[i] = value[i];
400                             v[col] = b;
401                         }
402                     }
403                 }
404                 data ~= v;
405             }
406         }
407 
408     	override ddbc.core.ResultSet executeQuery(string query) {
409     		//throw new SQLException("Not implemented");
410     		checkClosed();
411     		lock();
412     		scope(exit) unlock();
413 
414     		PGresult * res = PQexec(conn.getConnection(), std..string.toStringz(query));
415     		enforceEx!SQLException(res !is null, "Failed to execute statement " ~ query);
416     		auto status = PQresultStatus(res);
417     		enforceEx!SQLException(status == PGRES_TUPLES_OK, getError());
418     		scope(exit) PQclear(res);
419 
420     //		cmd = new Command(conn.getConnection(), query);
421     //		rs = cmd.execSQLResult();
422             auto metadata = createMetadata(res);
423             int rows = PQntuples(res);
424             int fieldCount = PQnfields(res);
425             Variant[][] data;
426             fillData(res, data);
427             resultSet = new PGSQLResultSet(this, data, metadata);
428     		return resultSet;
429     	}
430 
431     	string getError() {
432     		return copyCString(PQerrorMessage(conn.getConnection()));
433     	}
434 
435     	override int executeUpdate(string query) {
436     		Variant dummy;
437     		return executeUpdate(query, dummy);
438     	}
439 
440         void readInsertId(PGresult * res, ref Variant insertId) {
441             int rows = PQntuples(res);
442             int fieldCount = PQnfields(res);
443             //writeln("readInsertId - rows " ~ to!string(rows) ~ " " ~ to!string(fieldCount));
444             if (rows == 1 && fieldCount == 1) {
445                 int len = PQgetlength(res, 0, 0);
446                 const char * value = PQgetvalue(res, 0, 0);
447                 string s = copyCString(value, len);
448                 insertId = parse!long(s);
449             }
450         }
451 
452     	override int executeUpdate(string query, out Variant insertId) {
453     		checkClosed();
454     		lock();
455     		scope(exit) unlock();
456     		PGresult * res = PQexec(conn.getConnection(), std..string.toStringz(query));
457     		enforceEx!SQLException(res !is null, "Failed to execute statement " ~ query);
458     		auto status = PQresultStatus(res);
459     		enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, getError());
460     		scope(exit) PQclear(res);
461     		
462     		string rowsAffected = copyCString(PQcmdTuples(res));
463 
464             readInsertId(res, insertId);
465 //    		auto lastid = PQoidValue(res);
466 //            writeln("lastId = " ~ to!string(lastid));
467             int affected = rowsAffected.length > 0 ? to!int(rowsAffected) : 0;
468 //    		insertId = Variant(cast(long)lastid);
469     		return affected;
470     	}
471 
472     	override void close() {
473     		checkClosed();
474     		lock();
475     		scope(exit) unlock();
476     		closeResultSet();
477     		closed = true;
478     	}
479 
480     	void closeResultSet() {
481     		//throw new SQLException("Not implemented");
482     //		if (cmd == null) {
483     //			return;
484     //		}
485     //		cmd.releaseStatement();
486     //		delete cmd;
487     //		cmd = null;
488     //		if (resultSet !is null) {
489     //			resultSet.onStatementClosed();
490     //			resultSet = null;
491     //		}
492     	}
493     }
494 
495     ulong preparedStatementIndex = 1;
496 
497     class PGSQLPreparedStatement : PGSQLStatement, PreparedStatement {
498     	string query;
499     	int paramCount;
500     	ResultSetMetaData metadata;
501     	ParameterMetaData paramMetadata;
502         string stmtName;
503         bool[] paramIsSet;
504         string[] paramValue;
505         //PGresult * rs;
506 
507         string convertParams(string query) {
508             string res;
509             int count = 0;
510             bool insideString = false;
511             char lastChar = 0;
512             foreach(ch; query) {
513                 if (ch == '\'') {
514                     if (insideString) {
515                         if (lastChar != '\\')
516                             insideString = false;
517                     } else {
518                         insideString = true;
519                     }
520                     res ~= ch;
521                 } else if (ch == '?') {
522                     if (!insideString) {
523                         count++;
524                         res ~= "$" ~ to!string(count);
525                     } else {
526                         res ~= ch;
527                     }
528                 } else {
529                     res ~= ch;
530                 }
531                 lastChar = ch;
532             }
533             paramCount = count;
534             return res;
535         }
536 
537     	this(PGSQLConnection conn, string query) {
538     		super(conn);
539             query = convertParams(query);
540             this.query = query;
541             paramMetadata = createParameterMetadata(paramCount);
542             stmtName = "ddbcstmt" ~ to!string(preparedStatementIndex);
543             paramIsSet = new bool[paramCount];
544             paramValue = new string[paramCount];
545 //            rs = PQprepare(conn.getConnection(),
546 //                                toStringz(stmtName),
547 //                                toStringz(query),
548 //                                paramCount,
549 //                                null);
550 //            enforceEx!SQLException(rs !is null, "Error while preparing statement " ~ query);
551 //            auto status = PQresultStatus(rs);
552             //writeln("prepare paramCount = " ~ to!string(paramCount));
553 //            enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, "Error while preparing statement " ~ query ~ " : " ~ getError(rs));
554 //            metadata = createMetadata(rs);
555             //scope(exit) PQclear(rs);
556         }
557         string getError(PGresult * res) {
558             return copyCString(PQresultErrorMessage(res));
559         }
560     	void checkIndex(int index) {
561     		if (index < 1 || index > paramCount)
562     			throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
563     	}
564         void checkParams() {
565             foreach(i, b; paramIsSet)
566                 enforceEx!SQLException(b, "Parameter " ~ to!string(i) ~ " is not set");
567         }
568     	void setParam(int index, string value) {
569     		checkIndex(index);
570     		paramValue[index - 1] = value;
571             paramIsSet[index - 1] = true;
572     	}
573 
574         PGresult * exec() {
575             checkParams();
576             const (char) * [] values = new const(char)*[paramCount];
577             int[] lengths = new int[paramCount];
578             int[] formats = new int[paramCount];
579             for (int i=0; i<paramCount; i++) {
580                 if (paramValue[i] is null)
581                     values[i] = null;
582                 else
583                     values[i] = toStringz(paramValue[i]);
584                 lengths[i] = cast(int)paramValue[i].length;
585             }
586 //            PGresult * res = PQexecPrepared(conn.getConnection(),
587 //                                            toStringz(stmtName),
588 //                                            paramCount,
589 //                                            cast(const char * *)values.ptr,
590 //                                            cast(const int *)lengths.ptr,
591 //                                            cast(const int *)formats.ptr,
592 //                                            0);
593             PGresult * res = PQexecParams(conn.getConnection(),
594                                  toStringz(query),
595                                  paramCount,
596                                  null,
597                                  cast(const char * *)values.ptr,
598                                  cast(const int *)lengths.ptr,
599                                  cast(const int *)formats.ptr,
600                                  0);
601             enforceEx!SQLException(res !is null, "Error while executing prepared statement " ~ query);
602             metadata = createMetadata(res);
603             return res;
604         }
605 
606     public:
607     	
608         override void close() {
609             checkClosed();
610             lock();
611             scope(exit) unlock();
612             //PQclear(rs);
613             closeResultSet();
614             closed = true;
615         }
616 
617     	/// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
618     	override ResultSetMetaData getMetaData() {
619     		checkClosed();
620     		lock();
621     		scope(exit) unlock();
622     		return metadata;
623     	}
624     	
625     	/// Retrieves the number, types and properties of this PreparedStatement object's parameters.
626     	override ParameterMetaData getParameterMetaData() {
627     		//throw new SQLException("Not implemented");
628     		checkClosed();
629     		lock();
630     		scope(exit) unlock();
631     		return paramMetadata;
632     	}
633     	
634     	override int executeUpdate() {
635             Variant dummy;
636             return executeUpdate(dummy);
637     	}
638     	
639     	override int executeUpdate(out Variant insertId) {
640     		checkClosed();
641     		lock();
642     		scope(exit) unlock();
643             PGresult * res = exec();
644             scope(exit) PQclear(res);
645             auto status = PQresultStatus(res);
646             enforceEx!SQLException(status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK, getError(res));
647 
648             string rowsAffected = copyCString(PQcmdTuples(res));
649             //auto lastid = PQoidValue(res);
650             readInsertId(res, insertId);
651             //writeln("lastId = " ~ to!string(lastid));
652             int affected = rowsAffected.length > 0 ? to!int(rowsAffected) : 0;
653             //insertId = Variant(cast(long)lastid);
654             return affected;
655         }
656     	
657     	override ddbc.core.ResultSet executeQuery() {
658     		checkClosed();
659     		lock();
660     		scope(exit) unlock();
661             PGresult * res = exec();
662             scope(exit) PQclear(res);
663             int rows = PQntuples(res);
664             int fieldCount = PQnfields(res);
665             Variant[][] data;
666             fillData(res, data);
667             resultSet = new PGSQLResultSet(this, data, metadata);
668             return resultSet;
669         }
670     	
671     	override void clearParameters() {
672     		throw new SQLException("Not implemented");
673     //		checkClosed();
674     //		lock();
675     //		scope(exit) unlock();
676     //		for (int i = 1; i <= paramCount; i++)
677     //			setNull(i);
678     	}
679     	
680     	override void setFloat(int parameterIndex, float x) {
681             checkClosed();
682             lock();
683             scope(exit) unlock();
684             setParam(parameterIndex, to!string(x));
685         }
686     	override void setDouble(int parameterIndex, double x){
687             checkClosed();
688             lock();
689             scope(exit) unlock();
690             setParam(parameterIndex, to!string(x));
691         }
692     	override void setBoolean(int parameterIndex, bool x) {
693             checkClosed();
694             lock();
695             scope(exit) unlock();
696             setParam(parameterIndex, x ? "true" : "false");
697         }
698     	override void setLong(int parameterIndex, long x) {
699     		checkClosed();
700     		lock();
701     		scope(exit) unlock();
702             setParam(parameterIndex, to!string(x));
703     	}
704 
705         override void setUlong(int parameterIndex, ulong x) {
706             checkClosed();
707             lock();
708             scope(exit) unlock();
709             setParam(parameterIndex, to!string(x));
710         }
711 
712         override void setInt(int parameterIndex, int x) {
713             checkClosed();
714             lock();
715             scope(exit) unlock();
716             setParam(parameterIndex, to!string(x));
717         }
718 
719         override void setUint(int parameterIndex, uint x) {
720             checkClosed();
721             lock();
722             scope(exit) unlock();
723             setParam(parameterIndex, to!string(x));
724         }
725 
726         override void setShort(int parameterIndex, short x) {
727             checkClosed();
728             lock();
729             scope(exit) unlock();
730             setParam(parameterIndex, to!string(x));
731         }
732 
733         override void setUshort(int parameterIndex, ushort x) {
734             checkClosed();
735             lock();
736             scope(exit) unlock();
737             setParam(parameterIndex, to!string(x));
738         }
739   
740         override void setByte(int parameterIndex, byte x) {
741             checkClosed();
742             lock();
743             scope(exit) unlock();
744             setParam(parameterIndex, to!string(x));
745         }
746  
747         override void setUbyte(int parameterIndex, ubyte x) {
748             checkClosed();
749             lock();
750             scope(exit) unlock();
751             checkIndex(parameterIndex);
752             setParam(parameterIndex, to!string(x));
753         }
754    
755         override void setBytes(int parameterIndex, byte[] x) {
756             setString(parameterIndex, bytesToBytea(x));
757         }
758     	override void setUbytes(int parameterIndex, ubyte[] x) {
759             setString(parameterIndex, ubytesToBytea(x));
760         }
761     	override void setString(int parameterIndex, string x) {
762             checkClosed();
763             lock();
764             scope(exit) unlock();
765             setParam(parameterIndex, x);
766         }
767     	override void setDateTime(int parameterIndex, DateTime x) {
768             setString(parameterIndex, x.toISOString());
769         }
770     	override void setDate(int parameterIndex, Date x) {
771             setString(parameterIndex, x.toISOString());
772         }
773     	override void setTime(int parameterIndex, TimeOfDay x) {
774             setString(parameterIndex, x.toISOString());
775         }
776 
777     	override void setVariant(int parameterIndex, Variant x) {
778             checkClosed();
779             lock();
780             scope(exit) unlock();
781             if (x.convertsTo!DateTime)
782                 setDateTime(parameterIndex, x.get!DateTime);
783             else if (x.convertsTo!Date)
784                 setDate(parameterIndex, x.get!Date);
785             else if (x.convertsTo!TimeOfDay)
786                 setTime(parameterIndex, x.get!TimeOfDay);
787             else if (x.convertsTo!(byte[]))
788                 setBytes(parameterIndex, x.get!(byte[]));
789             else if (x.convertsTo!(ubyte[]))
790                 setUbytes(parameterIndex, x.get!(ubyte[]));
791             else
792                 setParam(parameterIndex, x.toString());
793         }
794 
795         override void setNull(int parameterIndex) {
796             checkClosed();
797             lock();
798             scope(exit) unlock();
799             setParam(parameterIndex, null);
800         }
801 
802         override void setNull(int parameterIndex, int sqlType) {
803             checkClosed();
804             lock();
805             scope(exit) unlock();
806             setParam(parameterIndex, null);
807         }
808     }
809 
810     class PGSQLResultSet : ResultSetImpl {
811     	private PGSQLStatement stmt;
812         private Variant[][] data;
813     	ResultSetMetaData metadata;
814     	private bool closed;
815     	private int currentRowIndex;
816     	private int rowCount;
817     	private int[string] columnMap;
818     	private bool lastIsNull;
819     	private int columnCount;
820     	
821     	Variant getValue(int columnIndex) {
822     		checkClosed();
823     		enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
824     		enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
825     		Variant res = data[currentRowIndex][columnIndex - 1];
826             lastIsNull = (res == null);
827     		return res;
828     	}
829     	
830     	void checkClosed() {
831     		if (closed)
832     			throw new SQLException("Result set is already closed");
833     	}
834     	
835     public:
836     	
837     	void lock() {
838     		stmt.lock();
839     	}
840     	
841     	void unlock() {
842     		stmt.unlock();
843     	}
844     	
845     	this(PGSQLStatement stmt, Variant[][] data, ResultSetMetaData metadata) {
846     		this.stmt = stmt;
847     		this.data = data;
848     		this.metadata = metadata;
849     		closed = false;
850     		rowCount = cast(int)data.length;
851     		currentRowIndex = -1;
852     		columnCount = metadata.getColumnCount();
853             for (int i=0; i<columnCount; i++) {
854                 columnMap[metadata.getColumnName(i + 1)] = i;
855             }
856             //writeln("created result set: " ~ to!string(rowCount) ~ " rows, " ~ to!string(columnCount) ~ " cols");
857         }
858     	
859     	void onStatementClosed() {
860     		closed = true;
861     	}
862 
863         // ResultSet interface implementation
864     	
865     	//Retrieves the number, types and properties of this ResultSet object's columns
866     	override ResultSetMetaData getMetaData() {
867     		checkClosed();
868     		lock();
869     		scope(exit) unlock();
870     		return metadata;
871     	}
872     	
873     	override void close() {
874     		checkClosed();
875     		lock();
876     		scope(exit) unlock();
877     		stmt.closeResultSet();
878     		closed = true;
879     	}
880     	override bool first() {
881     		checkClosed();
882     		lock();
883     		scope(exit) unlock();
884     		currentRowIndex = 0;
885     		return currentRowIndex >= 0 && currentRowIndex < rowCount;
886     	}
887     	override bool isFirst() {
888     		checkClosed();
889     		lock();
890     		scope(exit) unlock();
891     		return rowCount > 0 && currentRowIndex == 0;
892     	}
893     	override bool isLast() {
894     		checkClosed();
895     		lock();
896     		scope(exit) unlock();
897     		return rowCount > 0 && currentRowIndex == rowCount - 1;
898     	}
899     	override bool next() {
900     		checkClosed();
901     		lock();
902     		scope(exit) unlock();
903     		if (currentRowIndex + 1 >= rowCount)
904     			return false;
905     		currentRowIndex++;
906     		return true;
907     	}
908     	
909     	override int findColumn(string columnName) {
910     		checkClosed();
911     		lock();
912     		scope(exit) unlock();
913     		int * p = (columnName in columnMap);
914     		if (!p)
915     			throw new SQLException("Column " ~ columnName ~ " not found");
916     		return *p + 1;
917     	}
918     	
919     	override bool getBoolean(int columnIndex) {
920     		checkClosed();
921     		lock();
922     		scope(exit) unlock();
923     		Variant v = getValue(columnIndex);
924     		if (lastIsNull)
925     			return false;
926     		if (v.convertsTo!(bool))
927     			return v.get!(bool);
928     		if (v.convertsTo!(int))
929     			return v.get!(int) != 0;
930     		if (v.convertsTo!(long))
931     			return v.get!(long) != 0;
932     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
933     	}
934     	override ubyte getUbyte(int columnIndex) {
935     		checkClosed();
936     		lock();
937     		scope(exit) unlock();
938     		Variant v = getValue(columnIndex);
939     		if (lastIsNull)
940     			return 0;
941     		if (v.convertsTo!(ubyte))
942     			return v.get!(ubyte);
943     		if (v.convertsTo!(long))
944     			return to!ubyte(v.get!(long));
945     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
946     	}
947     	override byte getByte(int columnIndex) {
948     		checkClosed();
949     		lock();
950     		scope(exit) unlock();
951     		Variant v = getValue(columnIndex);
952     		if (lastIsNull)
953     			return 0;
954     		if (v.convertsTo!(byte))
955     			return v.get!(byte);
956     		if (v.convertsTo!(long))
957     			return to!byte(v.get!(long));
958     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
959     	}
960     	override short getShort(int columnIndex) {
961     		checkClosed();
962     		lock();
963     		scope(exit) unlock();
964     		Variant v = getValue(columnIndex);
965     		if (lastIsNull)
966     			return 0;
967     		if (v.convertsTo!(short))
968     			return v.get!(short);
969     		if (v.convertsTo!(long))
970     			return to!short(v.get!(long));
971     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
972     	}
973     	override ushort getUshort(int columnIndex) {
974     		checkClosed();
975     		lock();
976     		scope(exit) unlock();
977     		Variant v = getValue(columnIndex);
978     		if (lastIsNull)
979     			return 0;
980     		if (v.convertsTo!(ushort))
981     			return v.get!(ushort);
982     		if (v.convertsTo!(long))
983     			return to!ushort(v.get!(long));
984     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
985     	}
986     	override int getInt(int columnIndex) {
987     		checkClosed();
988     		lock();
989     		scope(exit) unlock();
990     		Variant v = getValue(columnIndex);
991     		if (lastIsNull)
992     			return 0;
993     		if (v.convertsTo!(int))
994     			return v.get!(int);
995     		if (v.convertsTo!(long))
996     			return to!int(v.get!(long));
997     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
998     	}
999     	override uint getUint(int columnIndex) {
1000     		checkClosed();
1001     		lock();
1002     		scope(exit) unlock();
1003     		Variant v = getValue(columnIndex);
1004     		if (lastIsNull)
1005     			return 0;
1006     		if (v.convertsTo!(uint))
1007     			return v.get!(uint);
1008     		if (v.convertsTo!(ulong))
1009     			return to!uint(v.get!(ulong));
1010     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
1011     	}
1012     	override long getLong(int columnIndex) {
1013     		checkClosed();
1014     		lock();
1015     		scope(exit) unlock();
1016     		Variant v = getValue(columnIndex);
1017     		if (lastIsNull)
1018     			return 0;
1019     		if (v.convertsTo!(long))
1020     			return v.get!(long);
1021     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1022     	}
1023     	override ulong getUlong(int columnIndex) {
1024     		checkClosed();
1025     		lock();
1026     		scope(exit) unlock();
1027     		Variant v = getValue(columnIndex);
1028     		if (lastIsNull)
1029     			return 0;
1030     		if (v.convertsTo!(ulong))
1031     			return v.get!(ulong);
1032     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1033     	}
1034     	override double getDouble(int columnIndex) {
1035     		checkClosed();
1036     		lock();
1037     		scope(exit) unlock();
1038     		Variant v = getValue(columnIndex);
1039     		if (lastIsNull)
1040     			return 0;
1041     		if (v.convertsTo!(double))
1042     			return v.get!(double);
1043     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1044     	}
1045     	override float getFloat(int columnIndex) {
1046     		checkClosed();
1047     		lock();
1048     		scope(exit) unlock();
1049     		Variant v = getValue(columnIndex);
1050     		if (lastIsNull)
1051     			return 0;
1052     		if (v.convertsTo!(float))
1053     			return v.get!(float);
1054     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1055     	}
1056     	override byte[] getBytes(int columnIndex) {
1057     		checkClosed();
1058     		lock();
1059     		scope(exit) unlock();
1060     		Variant v = getValue(columnIndex);
1061     		if (lastIsNull)
1062     			return null;
1063     		if (v.convertsTo!(byte[])) {
1064     			return v.get!(byte[]);
1065     		}
1066             return byteaToBytes(v.toString());
1067     	}
1068     	override ubyte[] getUbytes(int columnIndex) {
1069     		checkClosed();
1070     		lock();
1071     		scope(exit) unlock();
1072     		Variant v = getValue(columnIndex);
1073     		if (lastIsNull)
1074     			return null;
1075     		if (v.convertsTo!(ubyte[])) {
1076     			return v.get!(ubyte[]);
1077     		}
1078             return byteaToUbytes(v.toString());
1079         }
1080     	override string getString(int columnIndex) {
1081     		checkClosed();
1082     		lock();
1083     		scope(exit) unlock();
1084     		Variant v = getValue(columnIndex);
1085     		if (lastIsNull)
1086     			return null;
1087 //    		if (v.convertsTo!(ubyte[])) {
1088 //    			// assume blob encoding is utf-8
1089 //    			// TODO: check field encoding
1090 //    			return decodeTextBlob(v.get!(ubyte[]));
1091 //    		}
1092     		return v.toString();
1093     	}
1094     	override std.datetime.DateTime getDateTime(int columnIndex) {
1095     		checkClosed();
1096     		lock();
1097     		scope(exit) unlock();
1098     		Variant v = getValue(columnIndex);
1099     		if (lastIsNull)
1100     			return DateTime();
1101     		if (v.convertsTo!(DateTime)) {
1102     			return v.get!DateTime();
1103     		}
1104     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1105     	}
1106     	override std.datetime.Date getDate(int columnIndex) {
1107     		checkClosed();
1108     		lock();
1109     		scope(exit) unlock();
1110     		Variant v = getValue(columnIndex);
1111     		if (lastIsNull)
1112     			return Date();
1113     		if (v.convertsTo!(Date)) {
1114     			return v.get!Date();
1115     		}
1116     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1117     	}
1118     	override std.datetime.TimeOfDay getTime(int columnIndex) {
1119     		checkClosed();
1120     		lock();
1121     		scope(exit) unlock();
1122     		Variant v = getValue(columnIndex);
1123     		if (lastIsNull)
1124     			return TimeOfDay();
1125     		if (v.convertsTo!(TimeOfDay)) {
1126     			return v.get!TimeOfDay();
1127     		}
1128     		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1129     	}
1130     	
1131     	override Variant getVariant(int columnIndex) {
1132     		checkClosed();
1133     		lock();
1134     		scope(exit) unlock();
1135     		Variant v = getValue(columnIndex);
1136     		if (lastIsNull) {
1137     			Variant vnull = null;
1138     			return vnull;
1139     		}
1140     		return v;
1141     	}
1142     	override bool wasNull() {
1143     		checkClosed();
1144     		lock();
1145     		scope(exit) unlock();
1146     		return lastIsNull;
1147     	}
1148     	override bool isNull(int columnIndex) {
1149     		checkClosed();
1150     		lock();
1151     		scope(exit) unlock();
1152     		enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1153     		enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1154     		return data[currentRowIndex][columnIndex - 1] == null;
1155     	}
1156     	
1157     	//Retrieves the Statement object that produced this ResultSet object.
1158     	override Statement getStatement() {
1159     		checkClosed();
1160     		lock();
1161     		scope(exit) unlock();
1162     		return stmt;
1163     	}
1164     	
1165     	//Retrieves the current row number
1166     	override int getRow() {
1167     		checkClosed();
1168     		lock();
1169     		scope(exit) unlock();
1170     		if (currentRowIndex <0 || currentRowIndex >= rowCount)
1171     			return 0;
1172     		return currentRowIndex + 1;
1173     	}
1174     	
1175     	//Retrieves the fetch size for this ResultSet object.
1176     	override int getFetchSize() {
1177     		checkClosed();
1178     		lock();
1179     		scope(exit) unlock();
1180     		return rowCount;
1181     	}
1182     }
1183 
1184 
1185     // sample URL:
1186     // mysql://localhost:3306/DatabaseName
1187 
1188     //String url = "jdbc:postgresql://localhost/test";
1189     //Properties props = new Properties();
1190     //props.setProperty("user","fred");
1191     //props.setProperty("password","secret");
1192     //props.setProperty("ssl","true");
1193     //Connection conn = DriverManager.getConnection(url, props);
1194     class PGSQLDriver : Driver {
1195     	// helper function
1196     	public static string generateUrl(string host, ushort port, string dbname) {
1197     		return "postgresql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1198     	}
1199     	public static string[string] setUserAndPassword(string username, string password) {
1200     		string[string] params;
1201     		params["user"] = username;
1202     		params["password"] = password;
1203     		params["ssl"] = "true";
1204     		return params;
1205     	}
1206     	override ddbc.core.Connection connect(string url, string[string] params) {
1207     		//writeln("PGSQLDriver.connect " ~ url);
1208     		return new PGSQLConnection(url, params);
1209     	}
1210     }
1211 
1212     unittest {
1213     	static if (PGSQL_TESTS_ENABLED) {
1214     		
1215     		import std.conv;
1216     		DataSource ds = createUnitTestPGSQLDataSource();
1217     	
1218     		auto conn = ds.getConnection();
1219     		assert(conn !is null);
1220     		scope(exit) conn.close();
1221             {
1222                 //writeln("dropping table");
1223                 Statement stmt = conn.createStatement();
1224                 scope(exit) stmt.close();
1225                 stmt.executeUpdate("DROP TABLE IF EXISTS t1");
1226             }
1227             {
1228                 //writeln("creating table");
1229                 Statement stmt = conn.createStatement();
1230                 scope(exit) stmt.close();
1231                 stmt.executeUpdate("CREATE TABLE IF NOT EXISTS t1 (id SERIAL, name VARCHAR(255) NOT NULL, flags int null)");
1232                 //writeln("populating table");
1233                 Variant id = 0;
1234                 assert(stmt.executeUpdate("INSERT INTO t1 (name) VALUES ('test1') returning id", id) == 1);
1235                 assert(id.get!long > 0);
1236             }
1237             {
1238                 PreparedStatement stmt = conn.prepareStatement("INSERT INTO t1 (name) VALUES ('test2') returning id");
1239                 scope(exit) stmt.close();
1240                 Variant id = 0;
1241                 assert(stmt.executeUpdate(id) == 1);
1242                 assert(id.get!long > 0);
1243             }
1244             {
1245                 //writeln("reading table");
1246                 Statement stmt = conn.createStatement();
1247                 scope(exit) stmt.close();
1248                 ResultSet rs = stmt.executeQuery("SELECT id, name, flags FROM t1");
1249                 assert(rs.getMetaData().getColumnCount() == 3);
1250                 assert(rs.getMetaData().getColumnName(1) == "id");
1251                 assert(rs.getMetaData().getColumnName(2) == "name");
1252                 assert(rs.getMetaData().getColumnName(3) == "flags");
1253                 scope(exit) rs.close();
1254                 //writeln("id" ~ "\t" ~ "name");
1255                 while (rs.next()) {
1256                     long id = rs.getLong(1);
1257                     string name = rs.getString(2);
1258                     assert(rs.isNull(3));
1259                     //writeln("" ~ to!string(id) ~ "\t" ~ name);
1260                 }
1261             }
1262             {
1263                 //writeln("reading table");
1264                 Statement stmt = conn.createStatement();
1265                 scope(exit) stmt.close();
1266                 ResultSet rs = stmt.executeQuery("SELECT id, name, flags FROM t1");
1267                 assert(rs.getMetaData().getColumnCount() == 3);
1268                 assert(rs.getMetaData().getColumnName(1) == "id");
1269                 assert(rs.getMetaData().getColumnName(2) == "name");
1270                 assert(rs.getMetaData().getColumnName(3) == "flags");
1271                 scope(exit) rs.close();
1272                 //writeln("id" ~ "\t" ~ "name");
1273                 while (rs.next()) {
1274                     //writeln("calling getLong");
1275                     long id = rs.getLong(1);
1276                     //writeln("done getLong");
1277                     string name = rs.getString(2);
1278                     assert(rs.isNull(3));
1279                     //writeln("" ~ to!string(id) ~ "\t" ~ name);
1280                 }
1281             }
1282             {
1283                 //writeln("reading table with parameter id=1");
1284                 PreparedStatement stmt = conn.prepareStatement("SELECT id, name, flags FROM t1 WHERE id = ?");
1285                 scope(exit) stmt.close();
1286 //                assert(stmt.getMetaData().getColumnCount() == 3);
1287 //                assert(stmt.getMetaData().getColumnName(1) == "id");
1288 //                assert(stmt.getMetaData().getColumnName(2) == "name");
1289 //                assert(stmt.getMetaData().getColumnName(3) == "flags");
1290                 //writeln("calling setLong");
1291                 stmt.setLong(1, 1);
1292                 //writeln("done setLong");
1293                 {
1294                     ResultSet rs = stmt.executeQuery();
1295                     scope(exit) rs.close();
1296                     //writeln("id" ~ "\t" ~ "name");
1297                     while (rs.next()) {
1298                         long id = rs.getLong(1);
1299                         string name = rs.getString(2);
1300                         assert(rs.isNull(3));
1301                         //writeln("" ~ to!string(id) ~ "\t" ~ name);
1302                     }
1303                 }
1304                 //writeln("changing parameter id=2");
1305                 //writeln("calling setLong");
1306                 stmt.setLong(1, 2);
1307                 //writeln("done setLong");
1308                 {
1309                     ResultSet rs = stmt.executeQuery();
1310                     scope(exit) rs.close();
1311                     //writeln("id" ~ "\t" ~ "name");
1312                     while (rs.next()) {
1313                         long id = rs.getLong(1);
1314                         string name = rs.getString(2);
1315                         //writeln("" ~ to!string(id) ~ "\t" ~ name);
1316                     }
1317                 }
1318             }
1319         }
1320     }
1321 
1322 } else { // version(USE_PGSQL)
1323     immutable bool PGSQL_TESTS_ENABLED = false;
1324 }