1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv;
29 import std.datetime;
30 import std.exception;
31 import std.stdio;
32 import std.string;
33 import std.variant;
34 import core.sync.mutex;
35 import ddbc.common;
36 import ddbc.core;
37 
38 version(USE_MYSQL) {
39 
40 import mysql.connection;
41 import mysql.commands : Command;
42 import mysql.protocol.constants;
43 import mysql.protocol.packets : FieldDescription, ParamDescription;
44 
45 version(unittest) {
46     /*
47         To allow unit tests using MySQL server,
48         run mysql client using admin privileges, e.g. for MySQL server on localhost:
49         > mysql -uroot
50 
51         Create test user and test DB:
52         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'%' IDENTIFIED BY 'testpassword';
53         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'localhost' IDENTIFIED BY 'testpassword';
54         mysql> CREATE DATABASE testdb;
55      */
56     /// change to false to disable tests on real MySQL server
57     immutable bool MYSQL_TESTS_ENABLED = true;
58     /// change parameters if necessary
59     const string MYSQL_UNITTEST_HOST = "localhost";
60     const int    MYSQL_UNITTEST_PORT = 3306;
61     const string MYSQL_UNITTEST_USER = "testuser";
62     const string MYSQL_UNITTEST_PASSWORD = "testpassword";
63     const string MYSQL_UNITTEST_DB = "testdb";
64 
65     static if (MYSQL_TESTS_ENABLED) {
66         /// use this data source for tests
67         
68         DataSource createUnitTestMySQLDataSource() {
69             string url = makeDDBCUrl("mysql", MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
70             string[string] params;
71             setUserAndPassword(params, MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
72             return createConnectionPool(url, params);
73         }
74     }
75 }
76 
77 SqlType fromMySQLType(int t) {
78 	switch(t) {
79 	case SQLType.DECIMAL:
80 		case SQLType.TINY: return SqlType.TINYINT;
81 		case SQLType.SHORT: return SqlType.SMALLINT;
82 		case SQLType.INT: return SqlType.INTEGER;
83 		case SQLType.FLOAT: return SqlType.FLOAT;
84 		case SQLType.DOUBLE: return SqlType.DOUBLE;
85 		case SQLType.NULL: return SqlType.NULL;
86 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
87 		case SQLType.LONGLONG: return SqlType.BIGINT;
88 		case SQLType.INT24: return SqlType.INTEGER;
89 		case SQLType.DATE: return SqlType.DATE;
90 		case SQLType.TIME: return SqlType.TIME;
91 		case SQLType.DATETIME: return SqlType.DATETIME;
92 		case SQLType.YEAR: return SqlType.SMALLINT;
93 		case SQLType.NEWDATE: return SqlType.DATE;
94 		case SQLType.VARCHAR: return SqlType.VARCHAR;
95 		case SQLType.BIT: return SqlType.BIT;
96 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
97 		case SQLType.ENUM: return SqlType.OTHER;
98 		case SQLType.SET: return SqlType.OTHER;
99 		case SQLType.TINYBLOB: return SqlType.BLOB;
100 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
101 		case SQLType.LONGBLOB: return SqlType.BLOB;
102 		case SQLType.BLOB: return SqlType.BLOB;
103 		case SQLType.VARSTRING: return SqlType.VARCHAR;
104 		case SQLType.STRING: return SqlType.VARCHAR;
105 		case SQLType.GEOMETRY: return SqlType.OTHER;
106 		default: return SqlType.OTHER;
107 	}
108 }
109 
110 class MySQLConnection : ddbc.core.Connection {
111 private:
112     string url;
113     string[string] params;
114     string dbName;
115     string username;
116     string password;
117     string hostname;
118     int port = 3306;
119     mysql.connection.Connection conn;
120     bool closed;
121     bool autocommit;
122     Mutex mutex;
123 
124 
125 	MySQLStatement [] activeStatements;
126 
127 	void closeUnclosedStatements() {
128 		MySQLStatement [] list = activeStatements.dup;
129 		foreach(stmt; list) {
130 			stmt.close();
131 		}
132 	}
133 
134 	void checkClosed() {
135 		if (closed)
136 			throw new SQLException("Connection is already closed");
137 	}
138 
139 public:
140 
141     void lock() {
142         mutex.lock();
143     }
144 
145     void unlock() {
146         mutex.unlock();
147     }
148 
149     mysql.connection.Connection getConnection() { return conn; }
150 
151 
152 	void onStatementClosed(MySQLStatement stmt) {
153         myRemove(activeStatements, stmt);
154 	}
155 
156     this(string url, string[string] params) {
157         //writeln("MySQLConnection() creating connection");
158         mutex = new Mutex();
159         this.url = url;
160         this.params = params;
161         try {
162             //writeln("parsing url " ~ url);
163             extractParamsFromURL(url, this.params);
164             string dbName = "";
165     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
166     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
167     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
168     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
169             if (hostNameEnd < url.length - 1) {
170                 dbName = url[hostNameEnd + 1 .. $];
171             }
172             hostname = url[hostNameStart..hostNameEnd];
173             if (hostname.length == 0)
174                 hostname = "localhost";
175     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
176             if (portDelimiter >= 0) {
177                 string portString = hostname[portDelimiter + 1 .. $];
178                 hostname = hostname[0 .. portDelimiter];
179                 if (portString.length > 0)
180                     port = to!int(portString);
181                 if (port < 1 || port > 65535)
182                     port = 3306;
183             }
184             if ("user" in this.params)
185                 username = this.params["user"];
186             if ("password" in this.params)
187                 password = this.params["password"];
188 
189             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
190 
191             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
192             closed = false;
193             setAutoCommit(true);
194         } catch (Throwable e) {
195             //writeln(e.msg);
196             throw new SQLException(e);
197         }
198 
199         //writeln("MySQLConnection() connection created");
200     }
201     override void close() {
202 		checkClosed();
203 
204         lock();
205         scope(exit) unlock();
206         try {
207             closeUnclosedStatements();
208 
209             conn.close();
210             closed = true;
211         } catch (Throwable e) {
212             throw new SQLException(e);
213         }
214     }
215     override void commit() {
216         checkClosed();
217 
218         lock();
219         scope(exit) unlock();
220 
221         try {
222             Statement stmt = createStatement();
223             scope(exit) stmt.close();
224             stmt.executeUpdate("COMMIT");
225         } catch (Throwable e) {
226             throw new SQLException(e);
227         }
228     }
229     override Statement createStatement() {
230         checkClosed();
231 
232         lock();
233         scope(exit) unlock();
234 
235         try {
236             MySQLStatement stmt = new MySQLStatement(this);
237     		activeStatements ~= stmt;
238             return stmt;
239         } catch (Throwable e) {
240             throw new SQLException(e);
241         }
242     }
243 
244     PreparedStatement prepareStatement(string sql) {
245         checkClosed();
246 
247         lock();
248         scope(exit) unlock();
249 
250         try {
251             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
252             activeStatements ~= stmt;
253             return stmt;
254         } catch (Throwable e) {
255             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
256         }
257     }
258 
259     override string getCatalog() {
260         return dbName;
261     }
262 
263     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
264     override void setCatalog(string catalog) {
265         checkClosed();
266         if (dbName == catalog)
267             return;
268 
269         lock();
270         scope(exit) unlock();
271 
272         try {
273             conn.selectDB(catalog);
274             dbName = catalog;
275         } catch (Throwable e) {
276             throw new SQLException(e);
277         }
278     }
279 
280     override bool isClosed() {
281         return closed;
282     }
283 
284     override void rollback() {
285         checkClosed();
286 
287         lock();
288         scope(exit) unlock();
289 
290         try {
291             Statement stmt = createStatement();
292             scope(exit) stmt.close();
293             stmt.executeUpdate("ROLLBACK");
294         } catch (Throwable e) {
295             throw new SQLException(e);
296         }
297     }
298     override bool getAutoCommit() {
299         return autocommit;
300     }
301     override void setAutoCommit(bool autoCommit) {
302         checkClosed();
303         if (this.autocommit == autoCommit)
304             return;
305         lock();
306         scope(exit) unlock();
307 
308         try {
309             Statement stmt = createStatement();
310             scope(exit) stmt.close();
311             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
312             this.autocommit = autoCommit;
313         } catch (Throwable e) {
314             throw new SQLException(e);
315         }
316     }
317 }
318 
319 class MySQLStatement : Statement {
320 private:
321     MySQLConnection conn;
322     Command * cmd;
323     mysql.connection.ResultSet rs;
324 	MySQLResultSet resultSet;
325 
326     bool closed;
327 
328 public:
329     void checkClosed() {
330         enforceEx!SQLException(!closed, "Statement is already closed");
331     }
332 
333     void lock() {
334         conn.lock();
335     }
336     
337     void unlock() {
338         conn.unlock();
339     }
340 
341     this(MySQLConnection conn) {
342         this.conn = conn;
343     }
344 
345     ResultSetMetaData createMetadata(FieldDescription[] fields) {
346         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
347         foreach(i, field; fields) {
348             ColumnMetadataItem item = new ColumnMetadataItem();
349             item.schemaName = field.db;
350             item.name = field.originalName;
351             item.label = field.name;
352             item.precision = field.length;
353             item.scale = field.scale;
354             item.isNullable = !field.notNull;
355             item.isSigned = !field.unsigned;
356 			item.type = fromMySQLType(field.type);
357             // TODO: fill more params
358             res[i] = item;
359         }
360         return new ResultSetMetaDataImpl(res);
361     }
362     ParameterMetaData createMetadata(ParamDescription[] fields) {
363         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
364         foreach(i, field; fields) {
365             ParameterMetaDataItem item = new ParameterMetaDataItem();
366             item.precision = field.length;
367             item.scale = field.scale;
368             item.isNullable = !field.notNull;
369             item.isSigned = !field.unsigned;
370 			item.type = fromMySQLType(field.type);
371 			// TODO: fill more params
372             res[i] = item;
373         }
374         return new ParameterMetaDataImpl(res);
375     }
376 public:
377     MySQLConnection getConnection() {
378         checkClosed();
379         return conn;
380     }
381     override ddbc.core.ResultSet executeQuery(string query) {
382         checkClosed();
383         lock();
384         scope(exit) unlock();
385 		try {
386 			cmd = new Command(conn.getConnection(), query);
387 	        rs = cmd.execSQLResult();
388     	    resultSet = new MySQLResultSet(this, rs, createMetadata(cmd.resultFieldDescriptions));
389         	return resultSet;
390 		} catch (Throwable e) {
391             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
392         }
393 	}
394     override int executeUpdate(string query) {
395         checkClosed();
396         lock();
397         scope(exit) unlock();
398 		ulong rowsAffected = 0;
399 		try {
400 	        cmd = new Command(conn.getConnection(), query);
401 			cmd.execSQL(rowsAffected);
402 	        return cast(int)rowsAffected;
403 		} catch (Throwable e) {
404 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
405 		}
406     }
407 	override int executeUpdate(string query, out Variant insertId) {
408 		checkClosed();
409 		lock();
410 		scope(exit) unlock();
411         try {
412             cmd = new Command(conn.getConnection(), query);
413     		ulong rowsAffected = 0;
414     		cmd.execSQL(rowsAffected);
415     		insertId = Variant(cmd.lastInsertID);
416     		return cast(int)rowsAffected;
417         } catch (Throwable e) {
418             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
419         }
420 	}
421 	override void close() {
422         checkClosed();
423         lock();
424         scope(exit) unlock();
425         try {
426             closeResultSet();
427             closed = true;
428             conn.onStatementClosed(this);
429         } catch (Throwable e) {
430             throw new SQLException(e);
431         }
432     }
433     void closeResultSet() {
434         if (cmd == null) {
435             return;
436         }
437         cmd.releaseStatement();
438         delete cmd;
439         cmd = null;
440 		if (resultSet !is null) {
441 			resultSet.onStatementClosed();
442 			resultSet = null;
443 		}
444     }
445 }
446 
447 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
448     string query;
449     int paramCount;
450     ResultSetMetaData metadata;
451     ParameterMetaData paramMetadata;
452     this(MySQLConnection conn, string query) {
453         super(conn);
454         this.query = query;
455         try {
456             cmd = new Command(conn.getConnection(), query);
457             cmd.prepare();
458             paramCount = cmd.numParams;
459         } catch (Throwable e) {
460             throw new SQLException(e);
461         }
462     }
463     void checkIndex(int index) {
464         if (index < 1 || index > paramCount)
465             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
466     }
467     ref Variant getParam(int index) {
468         checkIndex(index);
469         return cmd.param(cast(ushort)(index - 1));
470     }
471 public:
472 
473     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
474     override ResultSetMetaData getMetaData() {
475         checkClosed();
476         lock();
477         scope(exit) unlock();
478         try {
479             if (metadata is null)
480                 metadata = createMetadata(cmd.preparedFieldDescriptions);
481             return metadata;
482         } catch (Throwable e) {
483             throw new SQLException(e);
484         }
485     }
486 
487     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
488     override ParameterMetaData getParameterMetaData() {
489         checkClosed();
490         lock();
491         scope(exit) unlock();
492         try {
493             if (paramMetadata is null)
494                 paramMetadata = createMetadata(cmd.preparedParamDescriptions);
495             return paramMetadata;
496         } catch (Throwable e) {
497             throw new SQLException(e);
498         }
499     }
500 
501     override int executeUpdate() {
502         checkClosed();
503         lock();
504         scope(exit) unlock();
505         try {
506             ulong rowsAffected = 0;
507             cmd.execPrepared(rowsAffected);
508             return cast(int)rowsAffected;
509         } catch (Throwable e) {
510             throw new SQLException(e);
511         }
512     }
513 
514 	override int executeUpdate(out Variant insertId) {
515 		checkClosed();
516 		lock();
517 		scope(exit) unlock();
518         try {
519     		ulong rowsAffected = 0;
520     		cmd.execPrepared(rowsAffected);
521     		insertId = cmd.lastInsertID;
522     		return cast(int)rowsAffected;
523         } catch (Throwable e) {
524             throw new SQLException(e);
525         }
526 	}
527 
528     override ddbc.core.ResultSet executeQuery() {
529         checkClosed();
530         lock();
531         scope(exit) unlock();
532         try {
533             rs = cmd.execPreparedResult();
534             resultSet = new MySQLResultSet(this, rs, getMetaData());
535             return resultSet;
536         } catch (Throwable e) {
537             throw new SQLException(e);
538         }
539     }
540     
541     override void clearParameters() {
542         checkClosed();
543         lock();
544         scope(exit) unlock();
545         try {
546             for (int i = 1; i <= paramCount; i++)
547                 setNull(i);
548         } catch (Throwable e) {
549             throw new SQLException(e);
550         }
551     }
552     
553 	override void setFloat(int parameterIndex, float x) {
554 		checkClosed();
555 		lock();
556 		scope(exit) unlock();
557         checkIndex(parameterIndex);
558         try {
559     		cmd.param(parameterIndex-1) = x;
560         } catch (Throwable e) {
561             throw new SQLException(e);
562         }
563 	}
564 	override void setDouble(int parameterIndex, double x){
565 		checkClosed();
566 		lock();
567 		scope(exit) unlock();
568         checkIndex(parameterIndex);
569         try {
570     		cmd.param(parameterIndex-1) = x;
571         } catch (Throwable e) {
572             throw new SQLException(e);
573         }
574 	}
575 	override void setBoolean(int parameterIndex, bool x) {
576         checkClosed();
577         lock();
578         scope(exit) unlock();
579         checkIndex(parameterIndex);
580         try {
581             cmd.param(parameterIndex-1) = x;
582         } catch (Throwable e) {
583             throw new SQLException(e);
584         }
585     }
586     override void setLong(int parameterIndex, long x) {
587         checkClosed();
588         lock();
589         scope(exit) unlock();
590         checkIndex(parameterIndex);
591         try {
592             cmd.param(parameterIndex-1) = x;
593         } catch (Throwable e) {
594             throw new SQLException(e);
595         }
596     }
597     override void setUlong(int parameterIndex, ulong x) {
598         checkClosed();
599         lock();
600         scope(exit) unlock();
601         checkIndex(parameterIndex);
602         try {
603             cmd.param(parameterIndex-1) = x;
604         } catch (Throwable e) {
605             throw new SQLException(e);
606         }
607     }
608     override void setInt(int parameterIndex, int x) {
609         checkClosed();
610         lock();
611         scope(exit) unlock();
612         checkIndex(parameterIndex);
613         try {
614             cmd.param(parameterIndex-1) = x;
615         } catch (Throwable e) {
616             throw new SQLException(e);
617         }
618     }
619     override void setUint(int parameterIndex, uint x) {
620         checkClosed();
621         lock();
622         scope(exit) unlock();
623         checkIndex(parameterIndex);
624         try {
625             cmd.param(parameterIndex-1) = x;
626         } catch (Throwable e) {
627             throw new SQLException(e);
628         }
629     }
630     override void setShort(int parameterIndex, short x) {
631         checkClosed();
632         lock();
633         scope(exit) unlock();
634         checkIndex(parameterIndex);
635         try {
636             cmd.param(parameterIndex-1) = x;
637         } catch (Throwable e) {
638             throw new SQLException(e);
639         }
640     }
641     override void setUshort(int parameterIndex, ushort x) {
642         checkClosed();
643         lock();
644         scope(exit) unlock();
645         checkIndex(parameterIndex);
646         try {
647             cmd.param(parameterIndex-1) = x;
648         } catch (Throwable e) {
649             throw new SQLException(e);
650         }
651     }
652     override void setByte(int parameterIndex, byte x) {
653         checkClosed();
654         lock();
655         scope(exit) unlock();
656         checkIndex(parameterIndex);
657         try {
658             cmd.param(parameterIndex-1) = x;
659         } catch (Throwable e) {
660             throw new SQLException(e);
661         }
662     }
663     override void setUbyte(int parameterIndex, ubyte x) {
664         checkClosed();
665         lock();
666         scope(exit) unlock();
667         checkIndex(parameterIndex);
668         try {
669             cmd.param(parameterIndex-1) = x;
670         } catch (Throwable e) {
671             throw new SQLException(e);
672         }
673     }
674     override void setBytes(int parameterIndex, byte[] x) {
675         checkClosed();
676         lock();
677         scope(exit) unlock();
678         checkIndex(parameterIndex);
679         try {
680             if (x.ptr is null)
681                 setNull(parameterIndex);
682             else
683                 cmd.param(parameterIndex-1) = x;
684         } catch (Throwable e) {
685             throw new SQLException(e);
686         }
687     }
688     override void setUbytes(int parameterIndex, ubyte[] x) {
689         checkClosed();
690         lock();
691         scope(exit) unlock();
692         checkIndex(parameterIndex);
693         try {
694             if (x.ptr is null)
695                 setNull(parameterIndex);
696             else
697                 cmd.param(parameterIndex-1) = x;
698         } catch (Throwable e) {
699             throw new SQLException(e);
700         }
701     }
702     override void setString(int parameterIndex, string x) {
703         checkClosed();
704         lock();
705         scope(exit) unlock();
706         checkIndex(parameterIndex);
707         try {
708             if (x.ptr is null)
709                 setNull(parameterIndex);
710             else
711                 cmd.param(parameterIndex-1) = x;
712         } catch (Throwable e) {
713             throw new SQLException(e);
714         }
715     }
716 	override void setDateTime(int parameterIndex, DateTime x) {
717 		checkClosed();
718 		lock();
719 		scope(exit) unlock();
720 		checkIndex(parameterIndex);
721         try {
722 		    cmd.param(parameterIndex-1) = x;
723         } catch (Throwable e) {
724             throw new SQLException(e);
725         }
726 	}
727 	override void setDate(int parameterIndex, Date x) {
728 		checkClosed();
729 		lock();
730 		scope(exit) unlock();
731 		checkIndex(parameterIndex);
732         try {
733     		cmd.param(parameterIndex-1) = x;
734         } catch (Throwable e) {
735             throw new SQLException(e);
736         }
737 	}
738 	override void setTime(int parameterIndex, TimeOfDay x) {
739 		checkClosed();
740 		lock();
741 		scope(exit) unlock();
742 		checkIndex(parameterIndex);
743         try {
744 		    cmd.param(parameterIndex-1) = x;
745         } catch (Throwable e) {
746             throw new SQLException(e);
747         }
748 	}
749 	override void setVariant(int parameterIndex, Variant x) {
750         checkClosed();
751         lock();
752         scope(exit) unlock();
753         checkIndex(parameterIndex);
754         try {
755             if (x == null)
756                 setNull(parameterIndex);
757             else
758                 cmd.param(parameterIndex-1) = x;
759         } catch (Throwable e) {
760             throw new SQLException(e);
761         }
762     }
763     override void setNull(int parameterIndex) {
764         checkClosed();
765         lock();
766         scope(exit) unlock();
767         checkIndex(parameterIndex);
768         try {
769             cmd.setNullParam(parameterIndex-1);
770         } catch (Throwable e) {
771             throw new SQLException(e);
772         }
773     }
774     override void setNull(int parameterIndex, int sqlType) {
775         checkClosed();
776         lock();
777         scope(exit) unlock();
778         try {
779             setNull(parameterIndex);
780         } catch (Throwable e) {
781             throw new SQLException(e);
782         }
783     }
784 }
785 
786 class MySQLResultSet : ResultSetImpl {
787     private MySQLStatement stmt;
788     private mysql.connection.ResultSet rs;
789     ResultSetMetaData metadata;
790     private bool closed;
791     private int currentRowIndex;
792     private int rowCount;
793     private int[string] columnMap;
794     private bool lastIsNull;
795     private int columnCount;
796 
797     Variant getValue(int columnIndex) {
798 		checkClosed();
799         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
800         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
801         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
802 		Variant res;
803 		if (!lastIsNull)
804 		    res = rs[currentRowIndex][columnIndex - 1];
805         return res;
806     }
807 
808 	void checkClosed() {
809 		if (closed)
810 			throw new SQLException("Result set is already closed");
811 	}
812 
813 public:
814 
815     void lock() {
816         stmt.lock();
817     }
818 
819     void unlock() {
820         stmt.unlock();
821     }
822 
823     this(MySQLStatement stmt, mysql.connection.ResultSet resultSet, ResultSetMetaData metadata) {
824         this.stmt = stmt;
825         this.rs = resultSet;
826         this.metadata = metadata;
827         try {
828             closed = false;
829             rowCount = cast(int)rs.length;
830             currentRowIndex = -1;
831 			foreach(key, val; rs.colNameIndicies)
832 				columnMap[key] = cast(int)val;
833     		columnCount = cast(int)rs.colNames.length;
834         } catch (Throwable e) {
835             throw new SQLException(e);
836         }
837     }
838 
839 	void onStatementClosed() {
840 		closed = true;
841 	}
842     string decodeTextBlob(ubyte[] data) {
843         char[] res = new char[data.length];
844         foreach (i, ch; data) {
845             res[i] = cast(char)ch;
846         }
847         return to!string(res);
848     }
849 
850     // ResultSet interface implementation
851 
852     //Retrieves the number, types and properties of this ResultSet object's columns
853     override ResultSetMetaData getMetaData() {
854         checkClosed();
855         lock();
856         scope(exit) unlock();
857         return metadata;
858     }
859 
860     override void close() {
861         checkClosed();
862         lock();
863         scope(exit) unlock();
864         stmt.closeResultSet();
865        	closed = true;
866     }
867     override bool first() {
868 		checkClosed();
869         lock();
870         scope(exit) unlock();
871         currentRowIndex = 0;
872         return currentRowIndex >= 0 && currentRowIndex < rowCount;
873     }
874     override bool isFirst() {
875 		checkClosed();
876         lock();
877         scope(exit) unlock();
878         return rowCount > 0 && currentRowIndex == 0;
879     }
880     override bool isLast() {
881 		checkClosed();
882         lock();
883         scope(exit) unlock();
884         return rowCount > 0 && currentRowIndex == rowCount - 1;
885     }
886     override bool next() {
887 		checkClosed();
888         lock();
889         scope(exit) unlock();
890         if (currentRowIndex + 1 >= rowCount)
891             return false;
892         currentRowIndex++;
893         return true;
894     }
895     
896     override int findColumn(string columnName) {
897 		checkClosed();
898         lock();
899         scope(exit) unlock();
900         int * p = (columnName in columnMap);
901         if (!p)
902             throw new SQLException("Column " ~ columnName ~ " not found");
903         return *p + 1;
904     }
905 
906     override bool getBoolean(int columnIndex) {
907         checkClosed();
908         lock();
909         scope(exit) unlock();
910         Variant v = getValue(columnIndex);
911         if (lastIsNull)
912             return false;
913         if (v.convertsTo!(bool))
914             return v.get!(bool);
915         if (v.convertsTo!(int))
916             return v.get!(int) != 0;
917         if (v.convertsTo!(long))
918             return v.get!(long) != 0;
919         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
920     }
921     override ubyte getUbyte(int columnIndex) {
922         checkClosed();
923         lock();
924         scope(exit) unlock();
925         Variant v = getValue(columnIndex);
926         if (lastIsNull)
927             return 0;
928         if (v.convertsTo!(ubyte))
929             return v.get!(ubyte);
930         if (v.convertsTo!(long))
931             return to!ubyte(v.get!(long));
932         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
933     }
934     override byte getByte(int columnIndex) {
935         checkClosed();
936         lock();
937         scope(exit) unlock();
938         Variant v = getValue(columnIndex);
939         if (lastIsNull)
940             return 0;
941         if (v.convertsTo!(byte))
942             return v.get!(byte);
943         if (v.convertsTo!(long))
944             return to!byte(v.get!(long));
945         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
946     }
947     override short getShort(int columnIndex) {
948         checkClosed();
949         lock();
950         scope(exit) unlock();
951         Variant v = getValue(columnIndex);
952         if (lastIsNull)
953             return 0;
954         if (v.convertsTo!(short))
955             return v.get!(short);
956         if (v.convertsTo!(long))
957             return to!short(v.get!(long));
958         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
959     }
960     override ushort getUshort(int columnIndex) {
961         checkClosed();
962         lock();
963         scope(exit) unlock();
964         Variant v = getValue(columnIndex);
965         if (lastIsNull)
966             return 0;
967         if (v.convertsTo!(ushort))
968             return v.get!(ushort);
969         if (v.convertsTo!(long))
970             return to!ushort(v.get!(long));
971         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
972     }
973     override int getInt(int columnIndex) {
974         checkClosed();
975         lock();
976         scope(exit) unlock();
977         Variant v = getValue(columnIndex);
978         if (lastIsNull)
979             return 0;
980         if (v.convertsTo!(int))
981             return v.get!(int);
982         if (v.convertsTo!(long))
983             return to!int(v.get!(long));
984         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
985     }
986     override uint getUint(int columnIndex) {
987         checkClosed();
988         lock();
989         scope(exit) unlock();
990         Variant v = getValue(columnIndex);
991         if (lastIsNull)
992             return 0;
993         if (v.convertsTo!(uint))
994             return v.get!(uint);
995         if (v.convertsTo!(ulong))
996             return to!int(v.get!(ulong));
997         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
998     }
999     override long getLong(int columnIndex) {
1000         checkClosed();
1001         lock();
1002         scope(exit) unlock();
1003         Variant v = getValue(columnIndex);
1004         if (lastIsNull)
1005             return 0;
1006         if (v.convertsTo!(long))
1007             return v.get!(long);
1008         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1009     }
1010     override ulong getUlong(int columnIndex) {
1011         checkClosed();
1012         lock();
1013         scope(exit) unlock();
1014         Variant v = getValue(columnIndex);
1015         if (lastIsNull)
1016             return 0;
1017         if (v.convertsTo!(ulong))
1018             return v.get!(ulong);
1019         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1020     }
1021     override double getDouble(int columnIndex) {
1022         checkClosed();
1023         lock();
1024         scope(exit) unlock();
1025         Variant v = getValue(columnIndex);
1026         if (lastIsNull)
1027             return 0;
1028         if (v.convertsTo!(double))
1029             return v.get!(double);
1030         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1031     }
1032     override float getFloat(int columnIndex) {
1033         checkClosed();
1034         lock();
1035         scope(exit) unlock();
1036         Variant v = getValue(columnIndex);
1037         if (lastIsNull)
1038             return 0;
1039         if (v.convertsTo!(float))
1040             return v.get!(float);
1041         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1042     }
1043     override byte[] getBytes(int columnIndex) {
1044         checkClosed();
1045         lock();
1046         scope(exit) unlock();
1047         Variant v = getValue(columnIndex);
1048         if (lastIsNull)
1049             return null;
1050         if (v.convertsTo!(byte[])) {
1051             return v.get!(byte[]);
1052         }
1053         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1054     }
1055 	override ubyte[] getUbytes(int columnIndex) {
1056 		checkClosed();
1057 		lock();
1058 		scope(exit) unlock();
1059 		Variant v = getValue(columnIndex);
1060 		if (lastIsNull)
1061 			return null;
1062 		if (v.convertsTo!(ubyte[])) {
1063 			return v.get!(ubyte[]);
1064 		}
1065 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1066 	}
1067 	override string getString(int columnIndex) {
1068         checkClosed();
1069         lock();
1070         scope(exit) unlock();
1071         Variant v = getValue(columnIndex);
1072         if (lastIsNull)
1073             return null;
1074 		if (v.convertsTo!(ubyte[])) {
1075 			// assume blob encoding is utf-8
1076 			// TODO: check field encoding
1077             return decodeTextBlob(v.get!(ubyte[]));
1078 		}
1079         return v.toString();
1080     }
1081 	override std.datetime.DateTime getDateTime(int columnIndex) {
1082 		checkClosed();
1083 		lock();
1084 		scope(exit) unlock();
1085 		Variant v = getValue(columnIndex);
1086 		if (lastIsNull)
1087 			return DateTime();
1088 		if (v.convertsTo!(DateTime)) {
1089 			return v.get!DateTime();
1090 		}
1091 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1092 	}
1093 	override std.datetime.Date getDate(int columnIndex) {
1094 		checkClosed();
1095 		lock();
1096 		scope(exit) unlock();
1097 		Variant v = getValue(columnIndex);
1098 		if (lastIsNull)
1099 			return Date();
1100 		if (v.convertsTo!(Date)) {
1101 			return v.get!Date();
1102 		}
1103 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1104 	}
1105 	override std.datetime.TimeOfDay getTime(int columnIndex) {
1106 		checkClosed();
1107 		lock();
1108 		scope(exit) unlock();
1109 		Variant v = getValue(columnIndex);
1110 		if (lastIsNull)
1111 			return TimeOfDay();
1112 		if (v.convertsTo!(TimeOfDay)) {
1113 			return v.get!TimeOfDay();
1114 		}
1115 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1116 	}
1117 
1118     override Variant getVariant(int columnIndex) {
1119         checkClosed();
1120         lock();
1121         scope(exit) unlock();
1122         Variant v = getValue(columnIndex);
1123         if (lastIsNull) {
1124             Variant vnull = null;
1125             return vnull;
1126         }
1127         return v;
1128     }
1129     override bool wasNull() {
1130         checkClosed();
1131         lock();
1132         scope(exit) unlock();
1133         return lastIsNull;
1134     }
1135     override bool isNull(int columnIndex) {
1136         checkClosed();
1137         lock();
1138         scope(exit) unlock();
1139         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1140         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1141         return rs[currentRowIndex].isNull(columnIndex - 1);
1142     }
1143 
1144     //Retrieves the Statement object that produced this ResultSet object.
1145     override Statement getStatement() {
1146         checkClosed();
1147         lock();
1148         scope(exit) unlock();
1149         return stmt;
1150     }
1151 
1152     //Retrieves the current row number
1153     override int getRow() {
1154         checkClosed();
1155         lock();
1156         scope(exit) unlock();
1157         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1158             return 0;
1159         return currentRowIndex + 1;
1160     }
1161 
1162     //Retrieves the fetch size for this ResultSet object.
1163     override int getFetchSize() {
1164         checkClosed();
1165         lock();
1166         scope(exit) unlock();
1167         return rowCount;
1168     }
1169 }
1170 
1171 // sample URL:
1172 // mysql://localhost:3306/DatabaseName
1173 class MySQLDriver : Driver {
1174     // helper function
1175     public static string generateUrl(string host, ushort port, string dbname) {
1176         return "mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1177     }
1178 	public static string[string] setUserAndPassword(string username, string password) {
1179 		string[string] params;
1180         params["user"] = username;
1181         params["password"] = password;
1182 		return params;
1183     }
1184     override ddbc.core.Connection connect(string url, string[string] params) {
1185         //writeln("MySQLDriver.connect " ~ url);
1186         return new MySQLConnection(url, params);
1187     }
1188 }
1189 
1190 unittest {
1191     static if (MYSQL_TESTS_ENABLED) {
1192 
1193         import std.conv;
1194 
1195         DataSource ds = createUnitTestMySQLDataSource();
1196 
1197         auto conn = ds.getConnection();
1198         scope(exit) conn.close();
1199         auto stmt = conn.createStatement();
1200         scope(exit) stmt.close();
1201 
1202         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1203         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1204         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1205         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1206         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1207         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1208         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1209         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1210         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1211         
1212         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1213         ps.setString(1, null);
1214         ps.setLong(2, 3);
1215         assert(ps.executeUpdate() == 1);
1216         
1217         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1218 
1219         // testing result set meta data
1220         ResultSetMetaData meta = rs.getMetaData();
1221         assert(meta.getColumnCount() == 4);
1222         assert(meta.getColumnName(1) == "id");
1223         assert(meta.getColumnLabel(1) == "id");
1224         assert(meta.isNullable(1) == false);
1225         assert(meta.isNullable(2) == true);
1226         assert(meta.isNullable(3) == true);
1227         assert(meta.getColumnName(2) == "name");
1228         assert(meta.getColumnLabel(2) == "name_alias");
1229         assert(meta.getColumnName(3) == "comment");
1230 
1231         int rowCount = rs.getFetchSize();
1232         assert(rowCount == 6);
1233         int index = 1;
1234         while (rs.next()) {
1235             assert(!rs.isNull(1));
1236             ubyte[] bytes = rs.getUbytes(3);
1237             int rowIndex = rs.getRow();
1238             assert(rowIndex == index);
1239             long id = rs.getLong(1);
1240             assert(id == index);
1241             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1242             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1243             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1244 			if (id == 1) {
1245 				DateTime ts = rs.getDateTime(4);
1246 				assert(ts == DateTime(2013,02,02,12,30,25));
1247 			}
1248 			if (id == 4) {
1249                 assert(rs.getString(2) == "name4_x");
1250                 assert(rs.isNull(3));
1251             }
1252             if (id == 5) {
1253                 assert(rs.isNull(2));
1254                 assert(!rs.isNull(3));
1255             }
1256             if (id == 6) {
1257                 assert(!rs.isNull(2));
1258                 assert(rs.isNull(3));
1259             }
1260             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1261             index++;
1262         }
1263         
1264         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1265 		scope(exit) ps2.close();
1266         ps2.setLong(1, 3);
1267         rs = ps2.executeQuery();
1268         while (rs.next()) {
1269             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1270             index++;
1271         }
1272 
1273 		// checking last insert ID for prepared statement
1274 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1275 		scope(exit) ps3.close();
1276 		Variant newId;
1277 		assert(ps3.executeUpdate(newId) == 1);
1278 		//writeln("Generated insert id = " ~ newId.toString());
1279 		assert(newId.get!ulong > 0);
1280 
1281 		// checking last insert ID for normal statement
1282 		Statement stmt4 = conn.createStatement();
1283 		scope(exit) stmt4.close();
1284 		Variant newId2;
1285 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1286 		//writeln("Generated insert id = " ~ newId2.toString());
1287 		assert(newId2.get!ulong > 0);
1288 
1289 	}
1290 }
1291 
1292 __gshared static this() {
1293     // register MySQLDriver
1294     import ddbc.common;
1295     DriverFactory.registerDriverFactory("mysql", delegate() { return new MySQLDriver(); });
1296 }
1297 
1298 
1299 } else { // version(USE_MYSQL)
1300     version(unittest) {
1301         immutable bool MYSQL_TESTS_ENABLED = false;
1302     }
1303 }