1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv : to;
29 import std.datetime;
30 import std.exception;
31 import std.stdio;
32 import std.string;
33 import std.variant;
34 import core.sync.mutex;
35 import ddbc.common;
36 import ddbc.core;
37 
38 version(USE_MYSQL) {
39 
40 import std.array;
41 import mysql.connection : prepare;
42 import mysql.commands : query, exec;
43 import mysql.prepared;
44 import mysql.protocol.constants;
45 import mysql.protocol.packets : FieldDescription, ParamDescription;
46 import mysql.result : Row, ResultRange;
47 
48 version(unittest) {
49     /*
50         To allow unit tests using MySQL server,
51         run mysql client using admin privileges, e.g. for MySQL server on localhost:
52         > mysql -uroot
53 
54         Create test user and test DB:
55         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'%' IDENTIFIED BY 'testpassword';
56         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'localhost' IDENTIFIED BY 'testpassword';
57         mysql> CREATE DATABASE testdb;
58      */
59     /// change to false to disable tests on real MySQL server
60     immutable bool MYSQL_TESTS_ENABLED = true;
61     /// change parameters if necessary
62     const string MYSQL_UNITTEST_HOST = "localhost";
63     const int    MYSQL_UNITTEST_PORT = 3306;
64     const string MYSQL_UNITTEST_USER = "testuser";
65     const string MYSQL_UNITTEST_PASSWORD = "testpassword";
66     const string MYSQL_UNITTEST_DB = "testdb";
67 
68     static if (MYSQL_TESTS_ENABLED) {
69         /// use this data source for tests
70         
71         DataSource createUnitTestMySQLDataSource() {
72             string url = makeDDBCUrl("mysql", MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
73             string[string] params;
74             setUserAndPassword(params, MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
75             return createConnectionPool(url, params);
76         }
77     }
78 }
79 
80 SqlType fromMySQLType(int t) {
81 	switch(t) {
82 	case SQLType.DECIMAL:
83 		case SQLType.TINY: return SqlType.TINYINT;
84 		case SQLType.SHORT: return SqlType.SMALLINT;
85 		case SQLType.INT: return SqlType.INTEGER;
86 		case SQLType.FLOAT: return SqlType.FLOAT;
87 		case SQLType.DOUBLE: return SqlType.DOUBLE;
88 		case SQLType.NULL: return SqlType.NULL;
89 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
90 		case SQLType.LONGLONG: return SqlType.BIGINT;
91 		case SQLType.INT24: return SqlType.INTEGER;
92 		case SQLType.DATE: return SqlType.DATE;
93 		case SQLType.TIME: return SqlType.TIME;
94 		case SQLType.DATETIME: return SqlType.DATETIME;
95 		case SQLType.YEAR: return SqlType.SMALLINT;
96 		case SQLType.NEWDATE: return SqlType.DATE;
97 		case SQLType.VARCHAR: return SqlType.VARCHAR;
98 		case SQLType.BIT: return SqlType.BIT;
99 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
100 		case SQLType.ENUM: return SqlType.OTHER;
101 		case SQLType.SET: return SqlType.OTHER;
102 		case SQLType.TINYBLOB: return SqlType.BLOB;
103 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
104 		case SQLType.LONGBLOB: return SqlType.BLOB;
105 		case SQLType.BLOB: return SqlType.BLOB;
106 		case SQLType.VARSTRING: return SqlType.VARCHAR;
107 		case SQLType.STRING: return SqlType.VARCHAR;
108 		case SQLType.GEOMETRY: return SqlType.OTHER;
109 		default: return SqlType.OTHER;
110 	}
111 }
112 
113 class MySQLConnection : ddbc.core.Connection {
114 private:
115     string url;
116     string[string] params;
117     string dbName;
118     string username;
119     string password;
120     string hostname;
121     int port = 3306;
122     mysql.connection.Connection conn;
123     bool closed;
124     bool autocommit;
125     Mutex mutex;
126 
127 
128 	MySQLStatement [] activeStatements;
129 
130 	void closeUnclosedStatements() {
131 		MySQLStatement [] list = activeStatements.dup;
132 		foreach(stmt; list) {
133 			stmt.close();
134 		}
135 	}
136 
137 	void checkClosed() {
138 		if (closed)
139 			throw new SQLException("Connection is already closed");
140 	}
141 
142 public:
143 
144     void lock() {
145         mutex.lock();
146     }
147 
148     void unlock() {
149         mutex.unlock();
150     }
151 
152     mysql.connection.Connection getConnection() { return conn; }
153 
154 
155 	void onStatementClosed(MySQLStatement stmt) {
156         myRemove(activeStatements, stmt);
157 	}
158 
159     this(string url, string[string] params) {
160         //writeln("MySQLConnection() creating connection");
161         mutex = new Mutex();
162         this.url = url;
163         this.params = params;
164         try {
165             //writeln("parsing url " ~ url);
166             extractParamsFromURL(url, this.params);
167             string dbName = "";
168     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
169     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
170     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
171     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
172             if (hostNameEnd < url.length - 1) {
173                 dbName = url[hostNameEnd + 1 .. $];
174             }
175             hostname = url[hostNameStart..hostNameEnd];
176             if (hostname.length == 0)
177                 hostname = "localhost";
178     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
179             if (portDelimiter >= 0) {
180                 string portString = hostname[portDelimiter + 1 .. $];
181                 hostname = hostname[0 .. portDelimiter];
182                 if (portString.length > 0)
183                     port = to!int(portString);
184                 if (port < 1 || port > 65535)
185                     port = 3306;
186             }
187             if ("user" in this.params)
188                 username = this.params["user"];
189             if ("password" in this.params)
190                 password = this.params["password"];
191 
192             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
193 
194             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
195             closed = false;
196             setAutoCommit(true);
197         } catch (Throwable e) {
198             //writeln(e.msg);
199             throw new SQLException(e);
200         }
201 
202         //writeln("MySQLConnection() connection created");
203     }
204     override void close() {
205 		checkClosed();
206 
207         lock();
208         scope(exit) unlock();
209         try {
210             closeUnclosedStatements();
211 
212             conn.close();
213             closed = true;
214         } catch (Throwable e) {
215             throw new SQLException(e);
216         }
217     }
218     override void commit() {
219         checkClosed();
220 
221         lock();
222         scope(exit) unlock();
223 
224         try {
225             Statement stmt = createStatement();
226             scope(exit) stmt.close();
227             stmt.executeUpdate("COMMIT");
228         } catch (Throwable e) {
229             throw new SQLException(e);
230         }
231     }
232     override Statement createStatement() {
233         checkClosed();
234 
235         lock();
236         scope(exit) unlock();
237 
238         try {
239             MySQLStatement stmt = new MySQLStatement(this);
240     		activeStatements ~= stmt;
241             return stmt;
242         } catch (Throwable e) {
243             throw new SQLException(e);
244         }
245     }
246 
247     PreparedStatement prepareStatement(string sql) {
248         checkClosed();
249 
250         lock();
251         scope(exit) unlock();
252 
253         try {
254             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
255             activeStatements ~= stmt;
256             return stmt;
257         } catch (Throwable e) {
258             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
259         }
260     }
261 
262     override string getCatalog() {
263         return dbName;
264     }
265 
266     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
267     override void setCatalog(string catalog) {
268         checkClosed();
269         if (dbName == catalog)
270             return;
271 
272         lock();
273         scope(exit) unlock();
274 
275         try {
276             conn.selectDB(catalog);
277             dbName = catalog;
278         } catch (Throwable e) {
279             throw new SQLException(e);
280         }
281     }
282 
283     override bool isClosed() {
284         return closed;
285     }
286 
287     override void rollback() {
288         checkClosed();
289 
290         lock();
291         scope(exit) unlock();
292 
293         try {
294             Statement stmt = createStatement();
295             scope(exit) stmt.close();
296             stmt.executeUpdate("ROLLBACK");
297         } catch (Throwable e) {
298             throw new SQLException(e);
299         }
300     }
301     override bool getAutoCommit() {
302         return autocommit;
303     }
304     override void setAutoCommit(bool autoCommit) {
305         checkClosed();
306         if (this.autocommit == autoCommit)
307             return;
308         lock();
309         scope(exit) unlock();
310 
311         try {
312             Statement stmt = createStatement();
313             scope(exit) stmt.close();
314             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
315             this.autocommit = autoCommit;
316         } catch (Throwable e) {
317             throw new SQLException(e);
318         }
319     }
320 }
321 
322 class MySQLStatement : Statement {
323 private:
324     MySQLConnection conn;
325     ResultRange results;
326 	MySQLResultSet resultSet;
327 
328     bool closed;
329 
330 public:
331     void checkClosed() {
332         enforce!SQLException(!closed, "Statement is already closed");
333     }
334 
335     void lock() {
336         conn.lock();
337     }
338     
339     void unlock() {
340         conn.unlock();
341     }
342 
343     this(MySQLConnection conn) {
344         this.conn = conn;
345     }
346 
347     ResultSetMetaData createMetadata(FieldDescription[] fields) {
348         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
349         foreach(i, field; fields) {
350             ColumnMetadataItem item = new ColumnMetadataItem();
351             item.schemaName = field.db;
352             item.name = field.originalName;
353             item.label = field.name;
354             item.precision = field.length;
355             item.scale = field.scale;
356             item.isNullable = !field.notNull;
357             item.isSigned = !field.unsigned;
358 			item.type = fromMySQLType(field.type);
359             // TODO: fill more params
360             res[i] = item;
361         }
362         return new ResultSetMetaDataImpl(res);
363     }
364     ParameterMetaData createMetadata(ParamDescription[] fields) {
365         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
366         foreach(i, field; fields) {
367             ParameterMetaDataItem item = new ParameterMetaDataItem();
368             item.precision = field.length;
369             item.scale = field.scale;
370             item.isNullable = !field.notNull;
371             item.isSigned = !field.unsigned;
372 			item.type = fromMySQLType(field.type);
373 			// TODO: fill more params
374             res[i] = item;
375         }
376         return new ParameterMetaDataImpl(res);
377     }
378 public:
379     MySQLConnection getConnection() {
380         checkClosed();
381         return conn;
382     }
383     override ddbc.core.ResultSet executeQuery(string queryString) {
384         checkClosed();
385         lock();
386         scope(exit) unlock();
387 		try {
388 	        results = query(conn.getConnection(), queryString);
389     	    resultSet = new MySQLResultSet(this, results, createMetadata(conn.getConnection().resultFieldDescriptions));
390         	return resultSet;
391 		} catch (Throwable e) {
392             throw new SQLException(e.msg ~ " - while execution of query " ~ queryString);
393         }
394 	}
395     override int executeUpdate(string query) {
396         checkClosed();
397         lock();
398         scope(exit) unlock();
399 		ulong rowsAffected = 0;
400 		try {
401 			rowsAffected = exec(conn.getConnection(), query);
402 	        return cast(int)rowsAffected;
403 		} catch (Throwable e) {
404 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
405 		}
406     }
407 	override int executeUpdate(string query, out Variant insertId) {
408 		checkClosed();
409 		lock();
410 		scope(exit) unlock();
411         try {
412     		ulong rowsAffected = exec(conn.getConnection(), query);
413     		insertId = Variant(conn.getConnection().lastInsertID);
414     		return cast(int)rowsAffected;
415         } catch (Throwable e) {
416             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
417         }
418 	}
419 	override void close() {
420         checkClosed();
421         lock();
422         scope(exit) unlock();
423         try {
424             closeResultSet();
425             closed = true;
426             conn.onStatementClosed(this);
427         } catch (Throwable e) {
428             throw new SQLException(e);
429         }
430     }
431     void closeResultSet() {
432 		if (resultSet !is null) {
433 			resultSet.onStatementClosed();
434 			resultSet = null;
435 		}
436     }
437 }
438 
439 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
440 
441     private string queryString;
442     private Prepared statement;
443     private int paramCount;
444     private ResultSetMetaData metadata;
445     private ParameterMetaData paramMetadata;
446 
447     this(MySQLConnection conn, string queryString) {
448         super(conn);
449         this.queryString = queryString;
450         try {
451             this.statement = prepare(conn.getConnection(), queryString);
452             this.paramCount = this.statement.numArgs;
453         } catch (Throwable e) {
454             throw new SQLException(e);
455         }
456     }
457     void checkIndex(int index) {
458         if (index < 1 || index > paramCount)
459             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
460     }
461     Variant getParam(int index) {
462         checkIndex(index);
463         return this.statement.getArg( cast(ushort)(index - 1) );
464     }
465 public:
466 
467     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
468     override ResultSetMetaData getMetaData() {
469         checkClosed();
470         lock();
471         scope(exit) unlock();
472         try {
473             if (metadata is null) {
474                 metadata = createMetadata(this.statement.preparedFieldDescriptions);
475             }
476             return metadata;
477         } catch (Throwable e) {
478             throw new SQLException(e);
479         }
480     }
481 
482     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
483     override ParameterMetaData getParameterMetaData() {
484         checkClosed();
485         lock();
486         scope(exit) unlock();
487         try {
488             if (paramMetadata is null) {
489                 paramMetadata = createMetadata(this.statement.preparedParamDescriptions);
490             }
491             return paramMetadata;
492         } catch (Throwable e) {
493             throw new SQLException(e);
494         }
495     }
496 
497     override int executeUpdate() {
498         checkClosed();
499         lock();
500         scope(exit) unlock();
501         try {
502             ulong rowsAffected = 0;
503             rowsAffected = conn.getConnection().exec(statement);
504             return cast(int)rowsAffected;
505         } catch (Throwable e) {
506             throw new SQLException(e);
507         }
508     }
509 
510 	override int executeUpdate(out Variant insertId) {
511 		checkClosed();
512 		lock();
513 		scope(exit) unlock();
514         try {
515     		ulong rowsAffected = 0;
516     		rowsAffected = conn.getConnection().exec(statement);
517     		insertId = conn.getConnection().lastInsertID;
518     		return cast(int)rowsAffected;
519         } catch (Throwable e) {
520             throw new SQLException(e);
521         }
522 	}
523 
524     override ddbc.core.ResultSet executeQuery() {
525         checkClosed();
526         lock();
527         scope(exit) unlock();
528         try {
529             results = query(conn.getConnection(), statement);
530             resultSet = new MySQLResultSet(this, results, getMetaData());
531             return resultSet;
532         } catch (Throwable e) {
533             throw new SQLException(e);
534         }
535     }
536     
537     override void clearParameters() {
538         checkClosed();
539         lock();
540         scope(exit) unlock();
541         try {
542             for (int i = 1; i <= paramCount; i++)
543                 setNull(i);
544         } catch (Throwable e) {
545             throw new SQLException(e);
546         }
547     }
548     
549 	override void setFloat(int parameterIndex, float x) {
550 		checkClosed();
551 		lock();
552 		scope(exit) unlock();
553         checkIndex(parameterIndex);
554         try {
555     		this.statement.setArg(parameterIndex-1, x);
556         } catch (Throwable e) {
557             throw new SQLException(e);
558         }
559 	}
560 	override void setDouble(int parameterIndex, double x){
561 		checkClosed();
562 		lock();
563 		scope(exit) unlock();
564         checkIndex(parameterIndex);
565         try {
566     		this.statement.setArg(parameterIndex-1, x);
567         } catch (Throwable e) {
568             throw new SQLException(e);
569         }
570 	}
571 	override void setBoolean(int parameterIndex, bool x) {
572         checkClosed();
573         lock();
574         scope(exit) unlock();
575         checkIndex(parameterIndex);
576         try {
577             this.statement.setArg(parameterIndex-1, x);
578         } catch (Throwable e) {
579             throw new SQLException(e);
580         }
581     }
582     override void setLong(int parameterIndex, long x) {
583         checkClosed();
584         lock();
585         scope(exit) unlock();
586         checkIndex(parameterIndex);
587         try {
588             this.statement.setArg(parameterIndex-1, x);
589         } catch (Throwable e) {
590             throw new SQLException(e);
591         }
592     }
593     override void setUlong(int parameterIndex, ulong x) {
594         checkClosed();
595         lock();
596         scope(exit) unlock();
597         checkIndex(parameterIndex);
598         try {
599             this.statement.setArg(parameterIndex-1, x);
600         } catch (Throwable e) {
601             throw new SQLException(e);
602         }
603     }
604     override void setInt(int parameterIndex, int x) {
605         checkClosed();
606         lock();
607         scope(exit) unlock();
608         checkIndex(parameterIndex);
609         try {
610             this.statement.setArg(parameterIndex-1, x);
611         } catch (Throwable e) {
612             throw new SQLException(e);
613         }
614     }
615     override void setUint(int parameterIndex, uint x) {
616         checkClosed();
617         lock();
618         scope(exit) unlock();
619         checkIndex(parameterIndex);
620         try {
621             this.statement.setArg(parameterIndex-1, x);
622         } catch (Throwable e) {
623             throw new SQLException(e);
624         }
625     }
626     override void setShort(int parameterIndex, short x) {
627         checkClosed();
628         lock();
629         scope(exit) unlock();
630         checkIndex(parameterIndex);
631         try {
632             this.statement.setArg(parameterIndex-1, x);
633         } catch (Throwable e) {
634             throw new SQLException(e);
635         }
636     }
637     override void setUshort(int parameterIndex, ushort x) {
638         checkClosed();
639         lock();
640         scope(exit) unlock();
641         checkIndex(parameterIndex);
642         try {
643             this.statement.setArg(parameterIndex-1, x);
644         } catch (Throwable e) {
645             throw new SQLException(e);
646         }
647     }
648     override void setByte(int parameterIndex, byte x) {
649         checkClosed();
650         lock();
651         scope(exit) unlock();
652         checkIndex(parameterIndex);
653         try {
654             this.statement.setArg(parameterIndex-1, x);
655         } catch (Throwable e) {
656             throw new SQLException(e);
657         }
658     }
659     override void setUbyte(int parameterIndex, ubyte x) {
660         checkClosed();
661         lock();
662         scope(exit) unlock();
663         checkIndex(parameterIndex);
664         try {
665             this.statement.setArg(parameterIndex-1, x);
666         } catch (Throwable e) {
667             throw new SQLException(e);
668         }
669     }
670     override void setBytes(int parameterIndex, byte[] x) {
671         checkClosed();
672         lock();
673         scope(exit) unlock();
674         checkIndex(parameterIndex);
675         try {
676             if (x.ptr is null) {
677                 setNull(parameterIndex);
678             } else {
679                 this.statement.setArg(parameterIndex-1, x);
680             }
681         } catch (Throwable e) {
682             throw new SQLException(e);
683         }
684     }
685     override void setUbytes(int parameterIndex, ubyte[] x) {
686         checkClosed();
687         lock();
688         scope(exit) unlock();
689         checkIndex(parameterIndex);
690         try {
691             if (x.ptr is null) {
692                 setNull(parameterIndex);
693             } else {
694                 this.statement.setArg(parameterIndex-1, x);
695             }
696         } catch (Throwable e) {
697             throw new SQLException(e);
698         }
699     }
700     override void setString(int parameterIndex, string x) {
701         checkClosed();
702         lock();
703         scope(exit) unlock();
704         checkIndex(parameterIndex);
705         try {
706             if (x.ptr is null) {
707                 setNull(parameterIndex);
708             } else {
709                 this.statement.setArg(parameterIndex-1, x);
710             }
711         } catch (Throwable e) {
712             throw new SQLException(e);
713         }
714     }
715 	override void setDateTime(int parameterIndex, DateTime x) {
716 		checkClosed();
717 		lock();
718 		scope(exit) unlock();
719 		checkIndex(parameterIndex);
720         try {
721 		    this.statement.setArg(parameterIndex-1, x);
722         } catch (Throwable e) {
723             throw new SQLException(e);
724         }
725 	}
726 	override void setDate(int parameterIndex, Date x) {
727 		checkClosed();
728 		lock();
729 		scope(exit) unlock();
730 		checkIndex(parameterIndex);
731         try {
732     		this.statement.setArg(parameterIndex-1, x);
733         } catch (Throwable e) {
734             throw new SQLException(e);
735         }
736 	}
737 	override void setTime(int parameterIndex, TimeOfDay x) {
738 		checkClosed();
739 		lock();
740 		scope(exit) unlock();
741 		checkIndex(parameterIndex);
742         try {
743 		    this.statement.setArg(parameterIndex-1, x);
744         } catch (Throwable e) {
745             throw new SQLException(e);
746         }
747 	}
748 	override void setVariant(int parameterIndex, Variant x) {
749         checkClosed();
750         lock();
751         scope(exit) unlock();
752         checkIndex(parameterIndex);
753         try {
754             if (x == null) {
755                 setNull(parameterIndex);
756             } else {
757                 this.statement.setArg(parameterIndex-1, x);
758             }
759         } catch (Throwable e) {
760             throw new SQLException(e);
761         }
762     }
763     override void setNull(int parameterIndex) {
764         checkClosed();
765         lock();
766         scope(exit) unlock();
767         checkIndex(parameterIndex);
768         try {
769             this.statement.setNullArg(parameterIndex-1);
770         } catch (Throwable e) {
771             throw new SQLException(e);
772         }
773     }
774     override void setNull(int parameterIndex, int sqlType) {
775         checkClosed();
776         lock();
777         scope(exit) unlock();
778         try {
779             setNull(parameterIndex);
780         } catch (Throwable e) {
781             throw new SQLException(e);
782         }
783     }
784 }
785 
786 class MySQLResultSet : ResultSetImpl {
787     private MySQLStatement stmt;
788     private ResultRange results;
789     ResultSetMetaData metadata;
790     private bool closed;
791     private int currentRowIndex;
792     private ulong rowCount;
793     private int[string] columnMap;
794     private bool lastIsNull;
795     private int columnCount;
796 
797     Variant getValue(int columnIndex) {
798 		checkClosed();
799         enforce!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
800         enforce!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
801         Row[] rs = results.array;
802         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
803 		Variant res;
804 		if (!lastIsNull)
805 		    res = rs[currentRowIndex][columnIndex - 1];
806         return res;
807     }
808 
809 	void checkClosed() {
810 		if (closed)
811 			throw new SQLException("Result set is already closed");
812 	}
813 
814 public:
815 
816     void lock() {
817         stmt.lock();
818     }
819 
820     void unlock() {
821         stmt.unlock();
822     }
823 
824     this(MySQLStatement stmt, ResultRange results, ResultSetMetaData metadata) {
825         this.stmt = stmt;
826         this.results = results;
827         this.metadata = metadata;
828         try {
829             closed = false;
830             //rowCount = cast(int)results.array.length;
831             currentRowIndex = -1;
832 			foreach(key, val; results.colNameIndicies) {
833 			    columnMap[key] = cast(int)val;
834 			}
835     		columnCount = cast(int)results.colNames.length;
836         } catch (Throwable e) {
837             throw new SQLException(e);
838         }
839     }
840 
841 	void onStatementClosed() {
842 		closed = true;
843 	}
844     string decodeTextBlob(ubyte[] data) {
845         char[] res = new char[data.length];
846         foreach (i, ch; data) {
847             res[i] = cast(char)ch;
848         }
849         return to!string(res);
850     }
851 
852     // ResultSet interface implementation
853 
854     //Retrieves the number, types and properties of this ResultSet object's columns
855     override ResultSetMetaData getMetaData() {
856         checkClosed();
857         lock();
858         scope(exit) unlock();
859         return metadata;
860     }
861 
862     override void close() {
863         checkClosed();
864         lock();
865         scope(exit) unlock();
866         stmt.closeResultSet();
867        	closed = true;
868     }
869     override bool first() {
870 		checkClosed();
871         lock();
872         scope(exit) unlock();
873         currentRowIndex = 0;
874         return currentRowIndex >= 0 && currentRowIndex < rowCount;
875     }
876     override bool isFirst() {
877 		checkClosed();
878         lock();
879         scope(exit) unlock();
880         return rowCount > 0 && currentRowIndex == 0;
881     }
882     override bool isLast() {
883 		checkClosed();
884         lock();
885         scope(exit) unlock();
886         return rowCount > 0 && currentRowIndex == rowCount - 1;
887     }
888     override bool next() {
889 		checkClosed();
890         lock();
891         scope(exit) unlock();
892         if (currentRowIndex + 1 >= rowCount)
893             return false;
894         currentRowIndex++;
895         return true;
896     }
897     
898     override int findColumn(string columnName) {
899 		checkClosed();
900         lock();
901         scope(exit) unlock();
902         int * p = (columnName in columnMap);
903         if (!p)
904             throw new SQLException("Column " ~ columnName ~ " not found");
905         return *p + 1;
906     }
907 
908     override bool getBoolean(int columnIndex) {
909         checkClosed();
910         lock();
911         scope(exit) unlock();
912         Variant v = getValue(columnIndex);
913         if (lastIsNull)
914             return false;
915         if (v.convertsTo!(bool))
916             return v.get!(bool);
917         if (v.convertsTo!(int))
918             return v.get!(int) != 0;
919         if (v.convertsTo!(long))
920             return v.get!(long) != 0;
921         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
922     }
923     override ubyte getUbyte(int columnIndex) {
924         checkClosed();
925         lock();
926         scope(exit) unlock();
927         Variant v = getValue(columnIndex);
928         if (lastIsNull)
929             return 0;
930         if (v.convertsTo!(ubyte))
931             return v.get!(ubyte);
932         if (v.convertsTo!(long))
933             return to!ubyte(v.get!(long));
934         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
935     }
936     override byte getByte(int columnIndex) {
937         checkClosed();
938         lock();
939         scope(exit) unlock();
940         Variant v = getValue(columnIndex);
941         if (lastIsNull)
942             return 0;
943         if (v.convertsTo!(byte))
944             return v.get!(byte);
945         if (v.convertsTo!(long))
946             return to!byte(v.get!(long));
947         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
948     }
949     override short getShort(int columnIndex) {
950         checkClosed();
951         lock();
952         scope(exit) unlock();
953         Variant v = getValue(columnIndex);
954         if (lastIsNull)
955             return 0;
956         if (v.convertsTo!(short))
957             return v.get!(short);
958         if (v.convertsTo!(long))
959             return to!short(v.get!(long));
960         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
961     }
962     override ushort getUshort(int columnIndex) {
963         checkClosed();
964         lock();
965         scope(exit) unlock();
966         Variant v = getValue(columnIndex);
967         if (lastIsNull)
968             return 0;
969         if (v.convertsTo!(ushort))
970             return v.get!(ushort);
971         if (v.convertsTo!(long))
972             return to!ushort(v.get!(long));
973         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
974     }
975     override int getInt(int columnIndex) {
976         checkClosed();
977         lock();
978         scope(exit) unlock();
979         Variant v = getValue(columnIndex);
980         if (lastIsNull)
981             return 0;
982         if (v.convertsTo!(int))
983             return v.get!(int);
984         if (v.convertsTo!(long))
985             return to!int(v.get!(long));
986         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
987     }
988     override uint getUint(int columnIndex) {
989         checkClosed();
990         lock();
991         scope(exit) unlock();
992         Variant v = getValue(columnIndex);
993         if (lastIsNull)
994             return 0;
995         if (v.convertsTo!(uint))
996             return v.get!(uint);
997         if (v.convertsTo!(ulong))
998             return to!int(v.get!(ulong));
999         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
1000     }
1001     override long getLong(int columnIndex) {
1002         checkClosed();
1003         lock();
1004         scope(exit) unlock();
1005         Variant v = getValue(columnIndex);
1006         if (lastIsNull)
1007             return 0;
1008         if (v.convertsTo!(long))
1009             return v.get!(long);
1010         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1011     }
1012     override ulong getUlong(int columnIndex) {
1013         checkClosed();
1014         lock();
1015         scope(exit) unlock();
1016         Variant v = getValue(columnIndex);
1017         if (lastIsNull)
1018             return 0;
1019         if (v.convertsTo!(ulong))
1020             return v.get!(ulong);
1021         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1022     }
1023     override double getDouble(int columnIndex) {
1024         checkClosed();
1025         lock();
1026         scope(exit) unlock();
1027         Variant v = getValue(columnIndex);
1028         if (lastIsNull)
1029             return 0;
1030         if (v.convertsTo!(double))
1031             return v.get!(double);
1032         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1033     }
1034     override float getFloat(int columnIndex) {
1035         checkClosed();
1036         lock();
1037         scope(exit) unlock();
1038         Variant v = getValue(columnIndex);
1039         if (lastIsNull)
1040             return 0;
1041         if (v.convertsTo!(float))
1042             return v.get!(float);
1043         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1044     }
1045     override byte[] getBytes(int columnIndex) {
1046         checkClosed();
1047         lock();
1048         scope(exit) unlock();
1049         Variant v = getValue(columnIndex);
1050         if (lastIsNull)
1051             return null;
1052         if (v.convertsTo!(byte[])) {
1053             return v.get!(byte[]);
1054         }
1055         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1056     }
1057 	override ubyte[] getUbytes(int columnIndex) {
1058 		checkClosed();
1059 		lock();
1060 		scope(exit) unlock();
1061 		Variant v = getValue(columnIndex);
1062 		if (lastIsNull)
1063 			return null;
1064 		if (v.convertsTo!(ubyte[])) {
1065 			return v.get!(ubyte[]);
1066 		}
1067 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1068 	}
1069 	override string getString(int columnIndex) {
1070         checkClosed();
1071         lock();
1072         scope(exit) unlock();
1073         Variant v = getValue(columnIndex);
1074         if (lastIsNull)
1075             return null;
1076 		if (v.convertsTo!(ubyte[])) {
1077 			// assume blob encoding is utf-8
1078 			// TODO: check field encoding
1079             return decodeTextBlob(v.get!(ubyte[]));
1080 		}
1081         return v.toString();
1082     }
1083 	override std.datetime.DateTime getDateTime(int columnIndex) {
1084 		checkClosed();
1085 		lock();
1086 		scope(exit) unlock();
1087 		Variant v = getValue(columnIndex);
1088 		if (lastIsNull)
1089 			return DateTime();
1090 		if (v.convertsTo!(DateTime)) {
1091 			return v.get!DateTime();
1092 		}
1093 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1094 	}
1095 	override std.datetime.Date getDate(int columnIndex) {
1096 		checkClosed();
1097 		lock();
1098 		scope(exit) unlock();
1099 		Variant v = getValue(columnIndex);
1100 		if (lastIsNull)
1101 			return Date();
1102 		if (v.convertsTo!(Date)) {
1103 			return v.get!Date();
1104 		}
1105 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1106 	}
1107 	override std.datetime.TimeOfDay getTime(int columnIndex) {
1108 		checkClosed();
1109 		lock();
1110 		scope(exit) unlock();
1111 		Variant v = getValue(columnIndex);
1112 		if (lastIsNull)
1113 			return TimeOfDay();
1114 		if (v.convertsTo!(TimeOfDay)) {
1115 			return v.get!TimeOfDay();
1116 		}
1117 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1118 	}
1119 
1120     override Variant getVariant(int columnIndex) {
1121         checkClosed();
1122         lock();
1123         scope(exit) unlock();
1124         Variant v = getValue(columnIndex);
1125         if (lastIsNull) {
1126             Variant vnull = null;
1127             return vnull;
1128         }
1129         return v;
1130     }
1131     override bool wasNull() {
1132         checkClosed();
1133         lock();
1134         scope(exit) unlock();
1135         return lastIsNull;
1136     }
1137     override bool isNull(int columnIndex) {
1138         checkClosed();
1139         lock();
1140         scope(exit) unlock();
1141         enforce!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1142         enforce!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1143         return results.array[currentRowIndex].isNull(columnIndex - 1);
1144     }
1145 
1146     //Retrieves the Statement object that produced this ResultSet object.
1147     override Statement getStatement() {
1148         checkClosed();
1149         lock();
1150         scope(exit) unlock();
1151         return stmt;
1152     }
1153 
1154     //Retrieves the current row number
1155     override int getRow() {
1156         checkClosed();
1157         lock();
1158         scope(exit) unlock();
1159         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1160             return 0;
1161         return currentRowIndex + 1;
1162     }
1163 
1164     //Retrieves the fetch size for this ResultSet object.
1165     override ulong getFetchSize() {
1166         checkClosed();
1167         lock();
1168         scope(exit) unlock();
1169         return rowCount;
1170     }
1171 }
1172 
1173 // sample URL:
1174 // mysql://localhost:3306/DatabaseName
1175 class MySQLDriver : Driver {
1176     // helper function
1177     public static string generateUrl(string host, ushort port, string dbname) {
1178         return "mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1179     }
1180 	public static string[string] setUserAndPassword(string username, string password) {
1181 		string[string] params;
1182         params["user"] = username;
1183         params["password"] = password;
1184 		return params;
1185     }
1186     override ddbc.core.Connection connect(string url, string[string] params) {
1187         //writeln("MySQLDriver.connect " ~ url);
1188         return new MySQLConnection(url, params);
1189     }
1190 }
1191 
1192 unittest {
1193     static if (MYSQL_TESTS_ENABLED) {
1194 
1195         DataSource ds = createUnitTestMySQLDataSource();
1196 
1197         auto conn = ds.getConnection();
1198         scope(exit) conn.close();
1199         auto stmt = conn.createStatement();
1200         scope(exit) stmt.close();
1201 
1202         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1203         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1204         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1205         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1206         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1207         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1208         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1209         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1210         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1211         
1212         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1213         ps.setString(1, null);
1214         ps.setLong(2, 3);
1215         assert(ps.executeUpdate() == 1);
1216         
1217         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1218 
1219         // testing result set meta data
1220         ResultSetMetaData meta = rs.getMetaData();
1221         assert(meta.getColumnCount() == 4);
1222         assert(meta.getColumnName(1) == "id");
1223         assert(meta.getColumnLabel(1) == "id");
1224         assert(meta.isNullable(1) == false);
1225         assert(meta.isNullable(2) == true);
1226         assert(meta.isNullable(3) == true);
1227         assert(meta.getColumnName(2) == "name");
1228         assert(meta.getColumnLabel(2) == "name_alias");
1229         assert(meta.getColumnName(3) == "comment");
1230 
1231         int rowCount = rs.getFetchSize();
1232         assert(rowCount == 6);
1233         int index = 1;
1234         while (rs.next()) {
1235             assert(!rs.isNull(1));
1236             ubyte[] bytes = rs.getUbytes(3);
1237             int rowIndex = rs.getRow();
1238             assert(rowIndex == index);
1239             long id = rs.getLong(1);
1240             assert(id == index);
1241             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1242             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1243             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1244 			if (id == 1) {
1245 				DateTime ts = rs.getDateTime(4);
1246 				assert(ts == DateTime(2013,02,02,12,30,25));
1247 			}
1248 			if (id == 4) {
1249                 assert(rs.getString(2) == "name4_x");
1250                 assert(rs.isNull(3));
1251             }
1252             if (id == 5) {
1253                 assert(rs.isNull(2));
1254                 assert(!rs.isNull(3));
1255             }
1256             if (id == 6) {
1257                 assert(!rs.isNull(2));
1258                 assert(rs.isNull(3));
1259             }
1260             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1261             index++;
1262         }
1263         
1264         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1265 		scope(exit) ps2.close();
1266         ps2.setLong(1, 3);
1267         rs = ps2.executeQuery();
1268         while (rs.next()) {
1269             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1270             index++;
1271         }
1272 
1273 		// checking last insert ID for prepared statement
1274 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1275 		scope(exit) ps3.close();
1276 		Variant newId;
1277 		assert(ps3.executeUpdate(newId) == 1);
1278 		//writeln("Generated insert id = " ~ newId.toString());
1279 		assert(newId.get!ulong > 0);
1280 
1281 		// checking last insert ID for normal statement
1282 		Statement stmt4 = conn.createStatement();
1283 		scope(exit) stmt4.close();
1284 		Variant newId2;
1285 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1286 		//writeln("Generated insert id = " ~ newId2.toString());
1287 		assert(newId2.get!ulong > 0);
1288 
1289 	}
1290 }
1291 
1292 __gshared static this() {
1293     // register MySQLDriver
1294     import ddbc.common;
1295     DriverFactory.registerDriverFactory("mysql", delegate() { return new MySQLDriver(); });
1296 }
1297 
1298 
1299 } else { // version(USE_MYSQL)
1300     version(unittest) {
1301         immutable bool MYSQL_TESTS_ENABLED = false;
1302     }
1303 }