1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv;
29 import std.datetime;
30 import std.exception;
31 import std.stdio;
32 import std.string;
33 import std.variant;
34 import core.sync.mutex;
35 import ddbc.common;
36 import ddbc.core;
37 
38 version(USE_MYSQL) {
39 
40 import mysql.connection;
41 
42 version(unittest) {
43     /*
44         To allow unit tests using MySQL server,
45         run mysql client using admin privileges, e.g. for MySQL server on localhost:
46         > mysql -uroot
47 
48         Create test user and test DB:
49         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'%' IDENTIFIED BY 'testpassword';
50         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'localhost' IDENTIFIED BY 'testpassword';
51         mysql> CREATE DATABASE testdb;
52      */
53     /// change to false to disable tests on real MySQL server
54     immutable bool MYSQL_TESTS_ENABLED = true;
55     /// change parameters if necessary
56     const string MYSQL_UNITTEST_HOST = "localhost";
57     const int    MYSQL_UNITTEST_PORT = 3306;
58     const string MYSQL_UNITTEST_USER = "testuser";
59     const string MYSQL_UNITTEST_PASSWORD = "testpassword";
60     const string MYSQL_UNITTEST_DB = "testdb";
61 
62     static if (MYSQL_TESTS_ENABLED) {
63         /// use this data source for tests
64         DataSource createUnitTestMySQLDataSource() {
65             MySQLDriver driver = new MySQLDriver();
66             string url = MySQLDriver.generateUrl(MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
67             string[string] params = MySQLDriver.setUserAndPassword(MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
68             return new ConnectionPoolDataSourceImpl(driver, url, params);
69         }
70     }
71 }
72 
73 SqlType fromMySQLType(int t) {
74 	switch(t) {
75 	case SQLType.DECIMAL:
76 		case SQLType.TINY: return SqlType.TINYINT;
77 		case SQLType.SHORT: return SqlType.SMALLINT;
78 		case SQLType.INT: return SqlType.INTEGER;
79 		case SQLType.FLOAT: return SqlType.FLOAT;
80 		case SQLType.DOUBLE: return SqlType.DOUBLE;
81 		case SQLType.NULL: return SqlType.NULL;
82 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
83 		case SQLType.LONGLONG: return SqlType.BIGINT;
84 		case SQLType.INT24: return SqlType.INTEGER;
85 		case SQLType.DATE: return SqlType.DATE;
86 		case SQLType.TIME: return SqlType.TIME;
87 		case SQLType.DATETIME: return SqlType.DATETIME;
88 		case SQLType.YEAR: return SqlType.SMALLINT;
89 		case SQLType.NEWDATE: return SqlType.DATE;
90 		case SQLType.VARCHAR: return SqlType.VARCHAR;
91 		case SQLType.BIT: return SqlType.BIT;
92 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
93 		case SQLType.ENUM: return SqlType.OTHER;
94 		case SQLType.SET: return SqlType.OTHER;
95 		case SQLType.TINYBLOB: return SqlType.BLOB;
96 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
97 		case SQLType.LONGBLOB: return SqlType.BLOB;
98 		case SQLType.BLOB: return SqlType.BLOB;
99 		case SQLType.VARSTRING: return SqlType.VARCHAR;
100 		case SQLType.STRING: return SqlType.VARCHAR;
101 		case SQLType.GEOMETRY: return SqlType.OTHER;
102 		default: return SqlType.OTHER;
103 	}
104 }
105 
106 class MySQLConnection : ddbc.core.Connection {
107 private:
108     string url;
109     string[string] params;
110     string dbName;
111     string username;
112     string password;
113     string hostname;
114     int port = 3306;
115     mysql.connection.Connection conn;
116     bool closed;
117     bool autocommit;
118     Mutex mutex;
119 
120 
121 	MySQLStatement [] activeStatements;
122 
123 	void closeUnclosedStatements() {
124 		MySQLStatement [] list = activeStatements.dup;
125 		foreach(stmt; list) {
126 			stmt.close();
127 		}
128 	}
129 
130 	void checkClosed() {
131 		if (closed)
132 			throw new SQLException("Connection is already closed");
133 	}
134 
135 public:
136 
137     void lock() {
138         mutex.lock();
139     }
140 
141     void unlock() {
142         mutex.unlock();
143     }
144 
145     mysql.connection.Connection getConnection() { return conn; }
146 
147 
148 	void onStatementClosed(MySQLStatement stmt) {
149 		foreach(index, item; activeStatements) {
150 			if (item == stmt) {
151 				remove(activeStatements, index);
152 				return;
153 			}
154 		}
155 	}
156 
157     this(string url, string[string] params) {
158         //writeln("MySQLConnection() creating connection");
159         mutex = new Mutex();
160         this.url = url;
161         this.params = params;
162         try {
163             //writeln("parsing url " ~ url);
164             string urlParams;
165             ptrdiff_t qmIndex = std..string.indexOf(url, '?');
166             if (qmIndex >=0 ) {
167                 urlParams = url[qmIndex + 1 .. $];
168                 url = url[0 .. qmIndex];
169                 // TODO: parse params
170             }
171             string dbName = "";
172     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
173     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
174     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
175     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
176             if (hostNameEnd < url.length - 1) {
177                 dbName = url[hostNameEnd + 1 .. $];
178             }
179             hostname = url[hostNameStart..hostNameEnd];
180             if (hostname.length == 0)
181                 hostname = "localhost";
182     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
183             if (portDelimiter >= 0) {
184                 string portString = hostname[portDelimiter + 1 .. $];
185                 hostname = hostname[0 .. portDelimiter];
186                 if (portString.length > 0)
187                     port = to!int(portString);
188                 if (port < 1 || port > 65535)
189                     port = 3306;
190             }
191             username = params["user"];
192             password = params["password"];
193 
194             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
195 
196             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
197             closed = false;
198             setAutoCommit(true);
199         } catch (Throwable e) {
200             //writeln(e.msg);
201             throw new SQLException(e);
202         }
203 
204         //writeln("MySQLConnection() connection created");
205     }
206     override void close() {
207 		checkClosed();
208 
209         lock();
210         scope(exit) unlock();
211         try {
212             closeUnclosedStatements();
213 
214             conn.close();
215             closed = true;
216         } catch (Throwable e) {
217             throw new SQLException(e);
218         }
219     }
220     override void commit() {
221         checkClosed();
222 
223         lock();
224         scope(exit) unlock();
225 
226         try {
227             Statement stmt = createStatement();
228             scope(exit) stmt.close();
229             stmt.executeUpdate("COMMIT");
230         } catch (Throwable e) {
231             throw new SQLException(e);
232         }
233     }
234     override Statement createStatement() {
235         checkClosed();
236 
237         lock();
238         scope(exit) unlock();
239 
240         try {
241             MySQLStatement stmt = new MySQLStatement(this);
242     		activeStatements ~= stmt;
243             return stmt;
244         } catch (Throwable e) {
245             throw new SQLException(e);
246         }
247     }
248 
249     PreparedStatement prepareStatement(string sql) {
250         checkClosed();
251 
252         lock();
253         scope(exit) unlock();
254 
255         try {
256             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
257             activeStatements ~= stmt;
258             return stmt;
259         } catch (Throwable e) {
260             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
261         }
262     }
263 
264     override string getCatalog() {
265         return dbName;
266     }
267 
268     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
269     override void setCatalog(string catalog) {
270         checkClosed();
271         if (dbName == catalog)
272             return;
273 
274         lock();
275         scope(exit) unlock();
276 
277         try {
278             conn.selectDB(catalog);
279             dbName = catalog;
280         } catch (Throwable e) {
281             throw new SQLException(e);
282         }
283     }
284 
285     override bool isClosed() {
286         return closed;
287     }
288 
289     override void rollback() {
290         checkClosed();
291 
292         lock();
293         scope(exit) unlock();
294 
295         try {
296             Statement stmt = createStatement();
297             scope(exit) stmt.close();
298             stmt.executeUpdate("ROLLBACK");
299         } catch (Throwable e) {
300             throw new SQLException(e);
301         }
302     }
303     override bool getAutoCommit() {
304         return autocommit;
305     }
306     override void setAutoCommit(bool autoCommit) {
307         checkClosed();
308         if (this.autocommit == autoCommit)
309             return;
310         lock();
311         scope(exit) unlock();
312 
313         try {
314             Statement stmt = createStatement();
315             scope(exit) stmt.close();
316             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
317             this.autocommit = autoCommit;
318         } catch (Throwable e) {
319             throw new SQLException(e);
320         }
321     }
322 }
323 
324 class MySQLStatement : Statement {
325 private:
326     MySQLConnection conn;
327     Command * cmd;
328     mysql.connection.ResultSet rs;
329 	MySQLResultSet resultSet;
330 
331     bool closed;
332 
333 public:
334     void checkClosed() {
335         enforceEx!SQLException(!closed, "Statement is already closed");
336     }
337 
338     void lock() {
339         conn.lock();
340     }
341     
342     void unlock() {
343         conn.unlock();
344     }
345 
346     this(MySQLConnection conn) {
347         this.conn = conn;
348     }
349 
350     ResultSetMetaData createMetadata(FieldDescription[] fields) {
351         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
352         foreach(i, field; fields) {
353             ColumnMetadataItem item = new ColumnMetadataItem();
354             item.schemaName = field.db;
355             item.name = field.originalName;
356             item.label = field.name;
357             item.precision = field.length;
358             item.scale = field.scale;
359             item.isNullable = !field.notNull;
360             item.isSigned = !field.unsigned;
361 			item.type = fromMySQLType(field.type);
362             // TODO: fill more params
363             res[i] = item;
364         }
365         return new ResultSetMetaDataImpl(res);
366     }
367     ParameterMetaData createMetadata(ParamDescription[] fields) {
368         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
369         foreach(i, field; fields) {
370             ParameterMetaDataItem item = new ParameterMetaDataItem();
371             item.precision = field.length;
372             item.scale = field.scale;
373             item.isNullable = !field.notNull;
374             item.isSigned = !field.unsigned;
375 			item.type = fromMySQLType(field.type);
376 			// TODO: fill more params
377             res[i] = item;
378         }
379         return new ParameterMetaDataImpl(res);
380     }
381 public:
382     MySQLConnection getConnection() {
383         checkClosed();
384         return conn;
385     }
386     override ddbc.core.ResultSet executeQuery(string query) {
387         checkClosed();
388         lock();
389         scope(exit) unlock();
390 		try {
391 			cmd = new Command(conn.getConnection(), query);
392 	        rs = cmd.execSQLResult();
393     	    resultSet = new MySQLResultSet(this, rs, createMetadata(cmd.resultFieldDescriptions));
394         	return resultSet;
395 		} catch (Throwable e) {
396             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
397         }
398 	}
399     override int executeUpdate(string query) {
400         checkClosed();
401         lock();
402         scope(exit) unlock();
403 		ulong rowsAffected = 0;
404 		try {
405 	        cmd = new Command(conn.getConnection(), query);
406 			cmd.execSQL(rowsAffected);
407 	        return cast(int)rowsAffected;
408 		} catch (Throwable e) {
409 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
410 		}
411     }
412 	override int executeUpdate(string query, out Variant insertId) {
413 		checkClosed();
414 		lock();
415 		scope(exit) unlock();
416         try {
417             cmd = new Command(conn.getConnection(), query);
418     		ulong rowsAffected = 0;
419     		cmd.execSQL(rowsAffected);
420     		insertId = Variant(cmd.lastInsertID);
421     		return cast(int)rowsAffected;
422         } catch (Throwable e) {
423             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
424         }
425 	}
426 	override void close() {
427         checkClosed();
428         lock();
429         scope(exit) unlock();
430         try {
431             closeResultSet();
432             closed = true;
433         } catch (Throwable e) {
434             throw new SQLException(e);
435         }
436     }
437     void closeResultSet() {
438         if (cmd == null) {
439             return;
440         }
441         cmd.releaseStatement();
442         delete cmd;
443         cmd = null;
444 		if (resultSet !is null) {
445 			resultSet.onStatementClosed();
446 			resultSet = null;
447 		}
448     }
449 }
450 
451 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
452     string query;
453     int paramCount;
454     ResultSetMetaData metadata;
455     ParameterMetaData paramMetadata;
456     this(MySQLConnection conn, string query) {
457         super(conn);
458         this.query = query;
459         try {
460             cmd = new Command(conn.getConnection(), query);
461             cmd.prepare();
462             paramCount = cmd.numParams;
463         } catch (Throwable e) {
464             throw new SQLException(e);
465         }
466     }
467     void checkIndex(int index) {
468         if (index < 1 || index > paramCount)
469             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
470     }
471     ref Variant getParam(int index) {
472         checkIndex(index);
473         return cmd.param(cast(ushort)(index - 1));
474     }
475 public:
476 
477     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
478     override ResultSetMetaData getMetaData() {
479         checkClosed();
480         lock();
481         scope(exit) unlock();
482         try {
483             if (metadata is null)
484                 metadata = createMetadata(cmd.preparedFieldDescriptions);
485             return metadata;
486         } catch (Throwable e) {
487             throw new SQLException(e);
488         }
489     }
490 
491     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
492     override ParameterMetaData getParameterMetaData() {
493         checkClosed();
494         lock();
495         scope(exit) unlock();
496         try {
497             if (paramMetadata is null)
498                 paramMetadata = createMetadata(cmd.preparedParamDescriptions);
499             return paramMetadata;
500         } catch (Throwable e) {
501             throw new SQLException(e);
502         }
503     }
504 
505     override int executeUpdate() {
506         checkClosed();
507         lock();
508         scope(exit) unlock();
509         try {
510             ulong rowsAffected = 0;
511             cmd.execPrepared(rowsAffected);
512             return cast(int)rowsAffected;
513         } catch (Throwable e) {
514             throw new SQLException(e);
515         }
516     }
517 
518 	override int executeUpdate(out Variant insertId) {
519 		checkClosed();
520 		lock();
521 		scope(exit) unlock();
522         try {
523     		ulong rowsAffected = 0;
524     		cmd.execPrepared(rowsAffected);
525     		insertId = cmd.lastInsertID;
526     		return cast(int)rowsAffected;
527         } catch (Throwable e) {
528             throw new SQLException(e);
529         }
530 	}
531 
532     override ddbc.core.ResultSet executeQuery() {
533         checkClosed();
534         lock();
535         scope(exit) unlock();
536         try {
537             rs = cmd.execPreparedResult();
538             resultSet = new MySQLResultSet(this, rs, getMetaData());
539             return resultSet;
540         } catch (Throwable e) {
541             throw new SQLException(e);
542         }
543     }
544     
545     override void clearParameters() {
546         checkClosed();
547         lock();
548         scope(exit) unlock();
549         try {
550             for (int i = 1; i <= paramCount; i++)
551                 setNull(i);
552         } catch (Throwable e) {
553             throw new SQLException(e);
554         }
555     }
556     
557 	override void setFloat(int parameterIndex, float x) {
558 		checkClosed();
559 		lock();
560 		scope(exit) unlock();
561         checkIndex(parameterIndex);
562         try {
563     		cmd.param(parameterIndex-1) = x;
564         } catch (Throwable e) {
565             throw new SQLException(e);
566         }
567 	}
568 	override void setDouble(int parameterIndex, double x){
569 		checkClosed();
570 		lock();
571 		scope(exit) unlock();
572         checkIndex(parameterIndex);
573         try {
574     		cmd.param(parameterIndex-1) = x;
575         } catch (Throwable e) {
576             throw new SQLException(e);
577         }
578 	}
579 	override void setBoolean(int parameterIndex, bool x) {
580         checkClosed();
581         lock();
582         scope(exit) unlock();
583         checkIndex(parameterIndex);
584         try {
585             cmd.param(parameterIndex-1) = x;
586         } catch (Throwable e) {
587             throw new SQLException(e);
588         }
589     }
590     override void setLong(int parameterIndex, long x) {
591         checkClosed();
592         lock();
593         scope(exit) unlock();
594         checkIndex(parameterIndex);
595         try {
596             cmd.param(parameterIndex-1) = x;
597         } catch (Throwable e) {
598             throw new SQLException(e);
599         }
600     }
601     override void setUlong(int parameterIndex, ulong x) {
602         checkClosed();
603         lock();
604         scope(exit) unlock();
605         checkIndex(parameterIndex);
606         try {
607             cmd.param(parameterIndex-1) = x;
608         } catch (Throwable e) {
609             throw new SQLException(e);
610         }
611     }
612     override void setInt(int parameterIndex, int x) {
613         checkClosed();
614         lock();
615         scope(exit) unlock();
616         checkIndex(parameterIndex);
617         try {
618             cmd.param(parameterIndex-1) = x;
619         } catch (Throwable e) {
620             throw new SQLException(e);
621         }
622     }
623     override void setUint(int parameterIndex, uint x) {
624         checkClosed();
625         lock();
626         scope(exit) unlock();
627         checkIndex(parameterIndex);
628         try {
629             cmd.param(parameterIndex-1) = x;
630         } catch (Throwable e) {
631             throw new SQLException(e);
632         }
633     }
634     override void setShort(int parameterIndex, short x) {
635         checkClosed();
636         lock();
637         scope(exit) unlock();
638         checkIndex(parameterIndex);
639         try {
640             cmd.param(parameterIndex-1) = x;
641         } catch (Throwable e) {
642             throw new SQLException(e);
643         }
644     }
645     override void setUshort(int parameterIndex, ushort x) {
646         checkClosed();
647         lock();
648         scope(exit) unlock();
649         checkIndex(parameterIndex);
650         try {
651             cmd.param(parameterIndex-1) = x;
652         } catch (Throwable e) {
653             throw new SQLException(e);
654         }
655     }
656     override void setByte(int parameterIndex, byte x) {
657         checkClosed();
658         lock();
659         scope(exit) unlock();
660         checkIndex(parameterIndex);
661         try {
662             cmd.param(parameterIndex-1) = x;
663         } catch (Throwable e) {
664             throw new SQLException(e);
665         }
666     }
667     override void setUbyte(int parameterIndex, ubyte x) {
668         checkClosed();
669         lock();
670         scope(exit) unlock();
671         checkIndex(parameterIndex);
672         try {
673             cmd.param(parameterIndex-1) = x;
674         } catch (Throwable e) {
675             throw new SQLException(e);
676         }
677     }
678     override void setBytes(int parameterIndex, byte[] x) {
679         checkClosed();
680         lock();
681         scope(exit) unlock();
682         checkIndex(parameterIndex);
683         try {
684             if (x == null)
685                 setNull(parameterIndex);
686             else
687                 cmd.param(parameterIndex-1) = x;
688         } catch (Throwable e) {
689             throw new SQLException(e);
690         }
691     }
692     override void setUbytes(int parameterIndex, ubyte[] x) {
693         checkClosed();
694         lock();
695         scope(exit) unlock();
696         checkIndex(parameterIndex);
697         try {
698             if (x == null)
699                 setNull(parameterIndex);
700             else
701                 cmd.param(parameterIndex-1) = x;
702         } catch (Throwable e) {
703             throw new SQLException(e);
704         }
705     }
706     override void setString(int parameterIndex, string x) {
707         checkClosed();
708         lock();
709         scope(exit) unlock();
710         checkIndex(parameterIndex);
711         try {
712             if (x == null)
713                 setNull(parameterIndex);
714             else
715                 cmd.param(parameterIndex-1) = x;
716         } catch (Throwable e) {
717             throw new SQLException(e);
718         }
719     }
720 	override void setDateTime(int parameterIndex, DateTime x) {
721 		checkClosed();
722 		lock();
723 		scope(exit) unlock();
724 		checkIndex(parameterIndex);
725         try {
726 		    cmd.param(parameterIndex-1) = x;
727         } catch (Throwable e) {
728             throw new SQLException(e);
729         }
730 	}
731 	override void setDate(int parameterIndex, Date x) {
732 		checkClosed();
733 		lock();
734 		scope(exit) unlock();
735 		checkIndex(parameterIndex);
736         try {
737     		cmd.param(parameterIndex-1) = x;
738         } catch (Throwable e) {
739             throw new SQLException(e);
740         }
741 	}
742 	override void setTime(int parameterIndex, TimeOfDay x) {
743 		checkClosed();
744 		lock();
745 		scope(exit) unlock();
746 		checkIndex(parameterIndex);
747         try {
748 		    cmd.param(parameterIndex-1) = x;
749         } catch (Throwable e) {
750             throw new SQLException(e);
751         }
752 	}
753 	override void setVariant(int parameterIndex, Variant x) {
754         checkClosed();
755         lock();
756         scope(exit) unlock();
757         checkIndex(parameterIndex);
758         try {
759             if (x == null)
760                 setNull(parameterIndex);
761             else
762                 cmd.param(parameterIndex-1) = x;
763         } catch (Throwable e) {
764             throw new SQLException(e);
765         }
766     }
767     override void setNull(int parameterIndex) {
768         checkClosed();
769         lock();
770         scope(exit) unlock();
771         checkIndex(parameterIndex);
772         try {
773             cmd.setNullParam(parameterIndex-1);
774         } catch (Throwable e) {
775             throw new SQLException(e);
776         }
777     }
778     override void setNull(int parameterIndex, int sqlType) {
779         checkClosed();
780         lock();
781         scope(exit) unlock();
782         try {
783             setNull(parameterIndex);
784         } catch (Throwable e) {
785             throw new SQLException(e);
786         }
787     }
788 }
789 
790 class MySQLResultSet : ResultSetImpl {
791     private MySQLStatement stmt;
792     private mysql.connection.ResultSet rs;
793     ResultSetMetaData metadata;
794     private bool closed;
795     private int currentRowIndex;
796     private int rowCount;
797     private int[string] columnMap;
798     private bool lastIsNull;
799     private int columnCount;
800 
801     Variant getValue(int columnIndex) {
802 		checkClosed();
803         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
804         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
805         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
806 		Variant res;
807 		if (!lastIsNull)
808 		    res = rs[currentRowIndex][columnIndex - 1];
809         return res;
810     }
811 
812 	void checkClosed() {
813 		if (closed)
814 			throw new SQLException("Result set is already closed");
815 	}
816 
817 public:
818 
819     void lock() {
820         stmt.lock();
821     }
822 
823     void unlock() {
824         stmt.unlock();
825     }
826 
827     this(MySQLStatement stmt, mysql.connection.ResultSet resultSet, ResultSetMetaData metadata) {
828         this.stmt = stmt;
829         this.rs = resultSet;
830         this.metadata = metadata;
831         try {
832             closed = false;
833             rowCount = cast(int)rs.length;
834             currentRowIndex = -1;
835 			foreach(key, val; rs.colNameIndicies)
836 				columnMap[key] = cast(int)val;
837     		columnCount = cast(int)rs.colNames.length;
838         } catch (Throwable e) {
839             throw new SQLException(e);
840         }
841     }
842 
843 	void onStatementClosed() {
844 		closed = true;
845 	}
846     string decodeTextBlob(ubyte[] data) {
847         char[] res = new char[data.length];
848         foreach (i, ch; data) {
849             res[i] = cast(char)ch;
850         }
851         return to!string(res);
852     }
853 
854     // ResultSet interface implementation
855 
856     //Retrieves the number, types and properties of this ResultSet object's columns
857     override ResultSetMetaData getMetaData() {
858         checkClosed();
859         lock();
860         scope(exit) unlock();
861         return metadata;
862     }
863 
864     override void close() {
865         checkClosed();
866         lock();
867         scope(exit) unlock();
868         stmt.closeResultSet();
869        	closed = true;
870     }
871     override bool first() {
872 		checkClosed();
873         lock();
874         scope(exit) unlock();
875         currentRowIndex = 0;
876         return currentRowIndex >= 0 && currentRowIndex < rowCount;
877     }
878     override bool isFirst() {
879 		checkClosed();
880         lock();
881         scope(exit) unlock();
882         return rowCount > 0 && currentRowIndex == 0;
883     }
884     override bool isLast() {
885 		checkClosed();
886         lock();
887         scope(exit) unlock();
888         return rowCount > 0 && currentRowIndex == rowCount - 1;
889     }
890     override bool next() {
891 		checkClosed();
892         lock();
893         scope(exit) unlock();
894         if (currentRowIndex + 1 >= rowCount)
895             return false;
896         currentRowIndex++;
897         return true;
898     }
899     
900     override int findColumn(string columnName) {
901 		checkClosed();
902         lock();
903         scope(exit) unlock();
904         int * p = (columnName in columnMap);
905         if (!p)
906             throw new SQLException("Column " ~ columnName ~ " not found");
907         return *p + 1;
908     }
909 
910     override bool getBoolean(int columnIndex) {
911         checkClosed();
912         lock();
913         scope(exit) unlock();
914         Variant v = getValue(columnIndex);
915         if (lastIsNull)
916             return false;
917         if (v.convertsTo!(bool))
918             return v.get!(bool);
919         if (v.convertsTo!(int))
920             return v.get!(int) != 0;
921         if (v.convertsTo!(long))
922             return v.get!(long) != 0;
923         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
924     }
925     override ubyte getUbyte(int columnIndex) {
926         checkClosed();
927         lock();
928         scope(exit) unlock();
929         Variant v = getValue(columnIndex);
930         if (lastIsNull)
931             return 0;
932         if (v.convertsTo!(ubyte))
933             return v.get!(ubyte);
934         if (v.convertsTo!(long))
935             return to!ubyte(v.get!(long));
936         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
937     }
938     override byte getByte(int columnIndex) {
939         checkClosed();
940         lock();
941         scope(exit) unlock();
942         Variant v = getValue(columnIndex);
943         if (lastIsNull)
944             return 0;
945         if (v.convertsTo!(byte))
946             return v.get!(byte);
947         if (v.convertsTo!(long))
948             return to!byte(v.get!(long));
949         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
950     }
951     override short getShort(int columnIndex) {
952         checkClosed();
953         lock();
954         scope(exit) unlock();
955         Variant v = getValue(columnIndex);
956         if (lastIsNull)
957             return 0;
958         if (v.convertsTo!(short))
959             return v.get!(short);
960         if (v.convertsTo!(long))
961             return to!short(v.get!(long));
962         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
963     }
964     override ushort getUshort(int columnIndex) {
965         checkClosed();
966         lock();
967         scope(exit) unlock();
968         Variant v = getValue(columnIndex);
969         if (lastIsNull)
970             return 0;
971         if (v.convertsTo!(ushort))
972             return v.get!(ushort);
973         if (v.convertsTo!(long))
974             return to!ushort(v.get!(long));
975         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
976     }
977     override int getInt(int columnIndex) {
978         checkClosed();
979         lock();
980         scope(exit) unlock();
981         Variant v = getValue(columnIndex);
982         if (lastIsNull)
983             return 0;
984         if (v.convertsTo!(int))
985             return v.get!(int);
986         if (v.convertsTo!(long))
987             return to!int(v.get!(long));
988         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
989     }
990     override uint getUint(int columnIndex) {
991         checkClosed();
992         lock();
993         scope(exit) unlock();
994         Variant v = getValue(columnIndex);
995         if (lastIsNull)
996             return 0;
997         if (v.convertsTo!(uint))
998             return v.get!(uint);
999         if (v.convertsTo!(ulong))
1000             return to!int(v.get!(ulong));
1001         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
1002     }
1003     override long getLong(int columnIndex) {
1004         checkClosed();
1005         lock();
1006         scope(exit) unlock();
1007         Variant v = getValue(columnIndex);
1008         if (lastIsNull)
1009             return 0;
1010         if (v.convertsTo!(long))
1011             return v.get!(long);
1012         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1013     }
1014     override ulong getUlong(int columnIndex) {
1015         checkClosed();
1016         lock();
1017         scope(exit) unlock();
1018         Variant v = getValue(columnIndex);
1019         if (lastIsNull)
1020             return 0;
1021         if (v.convertsTo!(ulong))
1022             return v.get!(ulong);
1023         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1024     }
1025     override double getDouble(int columnIndex) {
1026         checkClosed();
1027         lock();
1028         scope(exit) unlock();
1029         Variant v = getValue(columnIndex);
1030         if (lastIsNull)
1031             return 0;
1032         if (v.convertsTo!(double))
1033             return v.get!(double);
1034         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1035     }
1036     override float getFloat(int columnIndex) {
1037         checkClosed();
1038         lock();
1039         scope(exit) unlock();
1040         Variant v = getValue(columnIndex);
1041         if (lastIsNull)
1042             return 0;
1043         if (v.convertsTo!(float))
1044             return v.get!(float);
1045         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1046     }
1047     override byte[] getBytes(int columnIndex) {
1048         checkClosed();
1049         lock();
1050         scope(exit) unlock();
1051         Variant v = getValue(columnIndex);
1052         if (lastIsNull)
1053             return null;
1054         if (v.convertsTo!(byte[])) {
1055             return v.get!(byte[]);
1056         }
1057         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1058     }
1059 	override ubyte[] getUbytes(int columnIndex) {
1060 		checkClosed();
1061 		lock();
1062 		scope(exit) unlock();
1063 		Variant v = getValue(columnIndex);
1064 		if (lastIsNull)
1065 			return null;
1066 		if (v.convertsTo!(ubyte[])) {
1067 			return v.get!(ubyte[]);
1068 		}
1069 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1070 	}
1071 	override string getString(int columnIndex) {
1072         checkClosed();
1073         lock();
1074         scope(exit) unlock();
1075         Variant v = getValue(columnIndex);
1076         if (lastIsNull)
1077             return null;
1078 		if (v.convertsTo!(ubyte[])) {
1079 			// assume blob encoding is utf-8
1080 			// TODO: check field encoding
1081             return decodeTextBlob(v.get!(ubyte[]));
1082 		}
1083         return v.toString();
1084     }
1085 	override std.datetime.DateTime getDateTime(int columnIndex) {
1086 		checkClosed();
1087 		lock();
1088 		scope(exit) unlock();
1089 		Variant v = getValue(columnIndex);
1090 		if (lastIsNull)
1091 			return DateTime();
1092 		if (v.convertsTo!(DateTime)) {
1093 			return v.get!DateTime();
1094 		}
1095 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1096 	}
1097 	override std.datetime.Date getDate(int columnIndex) {
1098 		checkClosed();
1099 		lock();
1100 		scope(exit) unlock();
1101 		Variant v = getValue(columnIndex);
1102 		if (lastIsNull)
1103 			return Date();
1104 		if (v.convertsTo!(Date)) {
1105 			return v.get!Date();
1106 		}
1107 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1108 	}
1109 	override std.datetime.TimeOfDay getTime(int columnIndex) {
1110 		checkClosed();
1111 		lock();
1112 		scope(exit) unlock();
1113 		Variant v = getValue(columnIndex);
1114 		if (lastIsNull)
1115 			return TimeOfDay();
1116 		if (v.convertsTo!(TimeOfDay)) {
1117 			return v.get!TimeOfDay();
1118 		}
1119 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1120 	}
1121 
1122     override Variant getVariant(int columnIndex) {
1123         checkClosed();
1124         lock();
1125         scope(exit) unlock();
1126         Variant v = getValue(columnIndex);
1127         if (lastIsNull) {
1128             Variant vnull = null;
1129             return vnull;
1130         }
1131         return v;
1132     }
1133     override bool wasNull() {
1134         checkClosed();
1135         lock();
1136         scope(exit) unlock();
1137         return lastIsNull;
1138     }
1139     override bool isNull(int columnIndex) {
1140         checkClosed();
1141         lock();
1142         scope(exit) unlock();
1143         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1144         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1145         return rs[currentRowIndex].isNull(columnIndex - 1);
1146     }
1147 
1148     //Retrieves the Statement object that produced this ResultSet object.
1149     override Statement getStatement() {
1150         checkClosed();
1151         lock();
1152         scope(exit) unlock();
1153         return stmt;
1154     }
1155 
1156     //Retrieves the current row number
1157     override int getRow() {
1158         checkClosed();
1159         lock();
1160         scope(exit) unlock();
1161         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1162             return 0;
1163         return currentRowIndex + 1;
1164     }
1165 
1166     //Retrieves the fetch size for this ResultSet object.
1167     override int getFetchSize() {
1168         checkClosed();
1169         lock();
1170         scope(exit) unlock();
1171         return rowCount;
1172     }
1173 }
1174 
1175 // sample URL:
1176 // mysql://localhost:3306/DatabaseName
1177 class MySQLDriver : Driver {
1178     // helper function
1179     public static string generateUrl(string host, ushort port, string dbname) {
1180         return "mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1181     }
1182 	public static string[string] setUserAndPassword(string username, string password) {
1183 		string[string] params;
1184         params["user"] = username;
1185         params["password"] = password;
1186 		return params;
1187     }
1188     override ddbc.core.Connection connect(string url, string[string] params) {
1189         //writeln("MySQLDriver.connect " ~ url);
1190         return new MySQLConnection(url, params);
1191     }
1192 }
1193 
1194 unittest {
1195     static if (MYSQL_TESTS_ENABLED) {
1196 
1197         import std.conv;
1198 
1199         DataSource ds = createUnitTestMySQLDataSource();
1200 
1201         auto conn = ds.getConnection();
1202         scope(exit) conn.close();
1203         auto stmt = conn.createStatement();
1204         scope(exit) stmt.close();
1205 
1206         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1207         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1208         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1209         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1210         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1211         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1212         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1213         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1214         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1215         
1216         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1217         ps.setString(1, null);
1218         ps.setLong(2, 3);
1219         assert(ps.executeUpdate() == 1);
1220         
1221         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1222 
1223         // testing result set meta data
1224         ResultSetMetaData meta = rs.getMetaData();
1225         assert(meta.getColumnCount() == 4);
1226         assert(meta.getColumnName(1) == "id");
1227         assert(meta.getColumnLabel(1) == "id");
1228         assert(meta.isNullable(1) == false);
1229         assert(meta.isNullable(2) == true);
1230         assert(meta.isNullable(3) == true);
1231         assert(meta.getColumnName(2) == "name");
1232         assert(meta.getColumnLabel(2) == "name_alias");
1233         assert(meta.getColumnName(3) == "comment");
1234 
1235         int rowCount = rs.getFetchSize();
1236         assert(rowCount == 6);
1237         int index = 1;
1238         while (rs.next()) {
1239             assert(!rs.isNull(1));
1240             ubyte[] bytes = rs.getUbytes(3);
1241             int rowIndex = rs.getRow();
1242             assert(rowIndex == index);
1243             long id = rs.getLong(1);
1244             assert(id == index);
1245             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1246             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1247             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1248 			if (id == 1) {
1249 				DateTime ts = rs.getDateTime(4);
1250 				assert(ts == DateTime(2013,02,02,12,30,25));
1251 			}
1252 			if (id == 4) {
1253                 assert(rs.getString(2) == "name4_x");
1254                 assert(rs.isNull(3));
1255             }
1256             if (id == 5) {
1257                 assert(rs.isNull(2));
1258                 assert(!rs.isNull(3));
1259             }
1260             if (id == 6) {
1261                 assert(!rs.isNull(2));
1262                 assert(rs.isNull(3));
1263             }
1264             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1265             index++;
1266         }
1267         
1268         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1269 		scope(exit) ps2.close();
1270         ps2.setLong(1, 3);
1271         rs = ps2.executeQuery();
1272         while (rs.next()) {
1273             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1274             index++;
1275         }
1276 
1277 		// checking last insert ID for prepared statement
1278 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1279 		scope(exit) ps3.close();
1280 		Variant newId;
1281 		assert(ps3.executeUpdate(newId) == 1);
1282 		//writeln("Generated insert id = " ~ newId.toString());
1283 		assert(newId.get!ulong > 0);
1284 
1285 		// checking last insert ID for normal statement
1286 		Statement stmt4 = conn.createStatement();
1287 		scope(exit) stmt4.close();
1288 		Variant newId2;
1289 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1290 		//writeln("Generated insert id = " ~ newId2.toString());
1291 		assert(newId2.get!ulong > 0);
1292 
1293 	}
1294 }
1295 
1296 } else { // version(USE_MYSQL)
1297     version(unittest) {
1298         immutable bool MYSQL_TESTS_ENABLED = false;
1299     }
1300 }