1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv;
29 import std.datetime;
30 import std.exception;
31 import std.stdio;
32 import std..string;
33 import std.variant;
34 import core.sync.mutex;
35 import ddbc.common;
36 import ddbc.core;
37 
38 version(USE_MYSQL) {
39 
40 import mysql.connection;
41 
42 version(unittest) {
43     /*
44         To allow unit tests using MySQL server,
45         run mysql client using admin privileges, e.g. for MySQL server on localhost:
46         > mysql -uroot
47 
48         Create test user and test DB:
49         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'%' IDENTIFIED BY 'testpassword';
50         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'localhost' IDENTIFIED BY 'testpassword';
51         mysql> CREATE DATABASE testdb;
52      */
53     /// change to false to disable tests on real MySQL server
54     immutable bool MYSQL_TESTS_ENABLED = true;
55     /// change parameters if necessary
56     const string MYSQL_UNITTEST_HOST = "localhost";
57     const int    MYSQL_UNITTEST_PORT = 3306;
58     const string MYSQL_UNITTEST_USER = "testuser";
59     const string MYSQL_UNITTEST_PASSWORD = "testpassword";
60     const string MYSQL_UNITTEST_DB = "testdb";
61 
62     static if (MYSQL_TESTS_ENABLED) {
63         /// use this data source for tests
64         
65         DataSource createUnitTestMySQLDataSource() {
66             string url = makeDDBCUrl("mysql", MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
67             string[string] params;
68             setUserAndPassword(params, MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
69             return createConnectionPool(url, params);
70         }
71     }
72 }
73 
74 SqlType fromMySQLType(int t) {
75 	switch(t) {
76 	case SQLType.DECIMAL:
77 		case SQLType.TINY: return SqlType.TINYINT;
78 		case SQLType.SHORT: return SqlType.SMALLINT;
79 		case SQLType.INT: return SqlType.INTEGER;
80 		case SQLType.FLOAT: return SqlType.FLOAT;
81 		case SQLType.DOUBLE: return SqlType.DOUBLE;
82 		case SQLType.NULL: return SqlType.NULL;
83 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
84 		case SQLType.LONGLONG: return SqlType.BIGINT;
85 		case SQLType.INT24: return SqlType.INTEGER;
86 		case SQLType.DATE: return SqlType.DATE;
87 		case SQLType.TIME: return SqlType.TIME;
88 		case SQLType.DATETIME: return SqlType.DATETIME;
89 		case SQLType.YEAR: return SqlType.SMALLINT;
90 		case SQLType.NEWDATE: return SqlType.DATE;
91 		case SQLType.VARCHAR: return SqlType.VARCHAR;
92 		case SQLType.BIT: return SqlType.BIT;
93 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
94 		case SQLType.ENUM: return SqlType.OTHER;
95 		case SQLType.SET: return SqlType.OTHER;
96 		case SQLType.TINYBLOB: return SqlType.BLOB;
97 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
98 		case SQLType.LONGBLOB: return SqlType.BLOB;
99 		case SQLType.BLOB: return SqlType.BLOB;
100 		case SQLType.VARSTRING: return SqlType.VARCHAR;
101 		case SQLType.STRING: return SqlType.VARCHAR;
102 		case SQLType.GEOMETRY: return SqlType.OTHER;
103 		default: return SqlType.OTHER;
104 	}
105 }
106 
107 class MySQLConnection : ddbc.core.Connection {
108 private:
109     string url;
110     string[string] params;
111     string dbName;
112     string username;
113     string password;
114     string hostname;
115     int port = 3306;
116     mysql.connection.Connection conn;
117     bool closed;
118     bool autocommit;
119     Mutex mutex;
120 
121 
122 	MySQLStatement [] activeStatements;
123 
124 	void closeUnclosedStatements() {
125 		MySQLStatement [] list = activeStatements.dup;
126 		foreach(stmt; list) {
127 			stmt.close();
128 		}
129 	}
130 
131 	void checkClosed() {
132 		if (closed)
133 			throw new SQLException("Connection is already closed");
134 	}
135 
136 public:
137 
138     void lock() {
139         mutex.lock();
140     }
141 
142     void unlock() {
143         mutex.unlock();
144     }
145 
146     mysql.connection.Connection getConnection() { return conn; }
147 
148 
149 	void onStatementClosed(MySQLStatement stmt) {
150         myRemove(activeStatements, stmt);
151 	}
152 
153     this(string url, string[string] params) {
154         //writeln("MySQLConnection() creating connection");
155         mutex = new Mutex();
156         this.url = url;
157         this.params = params;
158         try {
159             //writeln("parsing url " ~ url);
160             extractParamsFromURL(url, this.params);
161             string dbName = "";
162     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
163     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
164     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
165     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
166             if (hostNameEnd < url.length - 1) {
167                 dbName = url[hostNameEnd + 1 .. $];
168             }
169             hostname = url[hostNameStart..hostNameEnd];
170             if (hostname.length == 0)
171                 hostname = "localhost";
172     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
173             if (portDelimiter >= 0) {
174                 string portString = hostname[portDelimiter + 1 .. $];
175                 hostname = hostname[0 .. portDelimiter];
176                 if (portString.length > 0)
177                     port = to!int(portString);
178                 if (port < 1 || port > 65535)
179                     port = 3306;
180             }
181             if ("user" in this.params)
182                 username = this.params["user"];
183             if ("password" in this.params)
184                 password = this.params["password"];
185 
186             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
187 
188             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
189             closed = false;
190             setAutoCommit(true);
191         } catch (Throwable e) {
192             //writeln(e.msg);
193             throw new SQLException(e);
194         }
195 
196         //writeln("MySQLConnection() connection created");
197     }
198     override void close() {
199 		checkClosed();
200 
201         lock();
202         scope(exit) unlock();
203         try {
204             closeUnclosedStatements();
205 
206             conn.close();
207             closed = true;
208         } catch (Throwable e) {
209             throw new SQLException(e);
210         }
211     }
212     override void commit() {
213         checkClosed();
214 
215         lock();
216         scope(exit) unlock();
217 
218         try {
219             Statement stmt = createStatement();
220             scope(exit) stmt.close();
221             stmt.executeUpdate("COMMIT");
222         } catch (Throwable e) {
223             throw new SQLException(e);
224         }
225     }
226     override Statement createStatement() {
227         checkClosed();
228 
229         lock();
230         scope(exit) unlock();
231 
232         try {
233             MySQLStatement stmt = new MySQLStatement(this);
234     		activeStatements ~= stmt;
235             return stmt;
236         } catch (Throwable e) {
237             throw new SQLException(e);
238         }
239     }
240 
241     PreparedStatement prepareStatement(string sql) {
242         checkClosed();
243 
244         lock();
245         scope(exit) unlock();
246 
247         try {
248             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
249             activeStatements ~= stmt;
250             return stmt;
251         } catch (Throwable e) {
252             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
253         }
254     }
255 
256     override string getCatalog() {
257         return dbName;
258     }
259 
260     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
261     override void setCatalog(string catalog) {
262         checkClosed();
263         if (dbName == catalog)
264             return;
265 
266         lock();
267         scope(exit) unlock();
268 
269         try {
270             conn.selectDB(catalog);
271             dbName = catalog;
272         } catch (Throwable e) {
273             throw new SQLException(e);
274         }
275     }
276 
277     override bool isClosed() {
278         return closed;
279     }
280 
281     override void rollback() {
282         checkClosed();
283 
284         lock();
285         scope(exit) unlock();
286 
287         try {
288             Statement stmt = createStatement();
289             scope(exit) stmt.close();
290             stmt.executeUpdate("ROLLBACK");
291         } catch (Throwable e) {
292             throw new SQLException(e);
293         }
294     }
295     override bool getAutoCommit() {
296         return autocommit;
297     }
298     override void setAutoCommit(bool autoCommit) {
299         checkClosed();
300         if (this.autocommit == autoCommit)
301             return;
302         lock();
303         scope(exit) unlock();
304 
305         try {
306             Statement stmt = createStatement();
307             scope(exit) stmt.close();
308             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
309             this.autocommit = autoCommit;
310         } catch (Throwable e) {
311             throw new SQLException(e);
312         }
313     }
314 }
315 
316 class MySQLStatement : Statement {
317 private:
318     MySQLConnection conn;
319     Command * cmd;
320     mysql.connection.ResultSet rs;
321 	MySQLResultSet resultSet;
322 
323     bool closed;
324 
325 public:
326     void checkClosed() {
327         enforceEx!SQLException(!closed, "Statement is already closed");
328     }
329 
330     void lock() {
331         conn.lock();
332     }
333     
334     void unlock() {
335         conn.unlock();
336     }
337 
338     this(MySQLConnection conn) {
339         this.conn = conn;
340     }
341 
342     ResultSetMetaData createMetadata(FieldDescription[] fields) {
343         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
344         foreach(i, field; fields) {
345             ColumnMetadataItem item = new ColumnMetadataItem();
346             item.schemaName = field.db;
347             item.name = field.originalName;
348             item.label = field.name;
349             item.precision = field.length;
350             item.scale = field.scale;
351             item.isNullable = !field.notNull;
352             item.isSigned = !field.unsigned;
353 			item.type = fromMySQLType(field.type);
354             // TODO: fill more params
355             res[i] = item;
356         }
357         return new ResultSetMetaDataImpl(res);
358     }
359     ParameterMetaData createMetadata(ParamDescription[] fields) {
360         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
361         foreach(i, field; fields) {
362             ParameterMetaDataItem item = new ParameterMetaDataItem();
363             item.precision = field.length;
364             item.scale = field.scale;
365             item.isNullable = !field.notNull;
366             item.isSigned = !field.unsigned;
367 			item.type = fromMySQLType(field.type);
368 			// TODO: fill more params
369             res[i] = item;
370         }
371         return new ParameterMetaDataImpl(res);
372     }
373 public:
374     MySQLConnection getConnection() {
375         checkClosed();
376         return conn;
377     }
378     override ddbc.core.ResultSet executeQuery(string query) {
379         checkClosed();
380         lock();
381         scope(exit) unlock();
382 		try {
383 			cmd = new Command(conn.getConnection(), query);
384 	        rs = cmd.execSQLResult();
385     	    resultSet = new MySQLResultSet(this, rs, createMetadata(cmd.resultFieldDescriptions));
386         	return resultSet;
387 		} catch (Throwable e) {
388             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
389         }
390 	}
391     override int executeUpdate(string query) {
392         checkClosed();
393         lock();
394         scope(exit) unlock();
395 		ulong rowsAffected = 0;
396 		try {
397 	        cmd = new Command(conn.getConnection(), query);
398 			cmd.execSQL(rowsAffected);
399 	        return cast(int)rowsAffected;
400 		} catch (Throwable e) {
401 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
402 		}
403     }
404 	override int executeUpdate(string query, out Variant insertId) {
405 		checkClosed();
406 		lock();
407 		scope(exit) unlock();
408         try {
409             cmd = new Command(conn.getConnection(), query);
410     		ulong rowsAffected = 0;
411     		cmd.execSQL(rowsAffected);
412     		insertId = Variant(cmd.lastInsertID);
413     		return cast(int)rowsAffected;
414         } catch (Throwable e) {
415             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
416         }
417 	}
418 	override void close() {
419         checkClosed();
420         lock();
421         scope(exit) unlock();
422         try {
423             closeResultSet();
424             closed = true;
425             conn.onStatementClosed(this);
426         } catch (Throwable e) {
427             throw new SQLException(e);
428         }
429     }
430     void closeResultSet() {
431         if (cmd == null) {
432             return;
433         }
434         cmd.releaseStatement();
435         delete cmd;
436         cmd = null;
437 		if (resultSet !is null) {
438 			resultSet.onStatementClosed();
439 			resultSet = null;
440 		}
441     }
442 }
443 
444 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
445     string query;
446     int paramCount;
447     ResultSetMetaData metadata;
448     ParameterMetaData paramMetadata;
449     this(MySQLConnection conn, string query) {
450         super(conn);
451         this.query = query;
452         try {
453             cmd = new Command(conn.getConnection(), query);
454             cmd.prepare();
455             paramCount = cmd.numParams;
456         } catch (Throwable e) {
457             throw new SQLException(e);
458         }
459     }
460     void checkIndex(int index) {
461         if (index < 1 || index > paramCount)
462             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
463     }
464     ref Variant getParam(int index) {
465         checkIndex(index);
466         return cmd.param(cast(ushort)(index - 1));
467     }
468 public:
469 
470     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
471     override ResultSetMetaData getMetaData() {
472         checkClosed();
473         lock();
474         scope(exit) unlock();
475         try {
476             if (metadata is null)
477                 metadata = createMetadata(cmd.preparedFieldDescriptions);
478             return metadata;
479         } catch (Throwable e) {
480             throw new SQLException(e);
481         }
482     }
483 
484     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
485     override ParameterMetaData getParameterMetaData() {
486         checkClosed();
487         lock();
488         scope(exit) unlock();
489         try {
490             if (paramMetadata is null)
491                 paramMetadata = createMetadata(cmd.preparedParamDescriptions);
492             return paramMetadata;
493         } catch (Throwable e) {
494             throw new SQLException(e);
495         }
496     }
497 
498     override int executeUpdate() {
499         checkClosed();
500         lock();
501         scope(exit) unlock();
502         try {
503             ulong rowsAffected = 0;
504             cmd.execPrepared(rowsAffected);
505             return cast(int)rowsAffected;
506         } catch (Throwable e) {
507             throw new SQLException(e);
508         }
509     }
510 
511 	override int executeUpdate(out Variant insertId) {
512 		checkClosed();
513 		lock();
514 		scope(exit) unlock();
515         try {
516     		ulong rowsAffected = 0;
517     		cmd.execPrepared(rowsAffected);
518     		insertId = cmd.lastInsertID;
519     		return cast(int)rowsAffected;
520         } catch (Throwable e) {
521             throw new SQLException(e);
522         }
523 	}
524 
525     override ddbc.core.ResultSet executeQuery() {
526         checkClosed();
527         lock();
528         scope(exit) unlock();
529         try {
530             rs = cmd.execPreparedResult();
531             resultSet = new MySQLResultSet(this, rs, getMetaData());
532             return resultSet;
533         } catch (Throwable e) {
534             throw new SQLException(e);
535         }
536     }
537     
538     override void clearParameters() {
539         checkClosed();
540         lock();
541         scope(exit) unlock();
542         try {
543             for (int i = 1; i <= paramCount; i++)
544                 setNull(i);
545         } catch (Throwable e) {
546             throw new SQLException(e);
547         }
548     }
549     
550 	override void setFloat(int parameterIndex, float x) {
551 		checkClosed();
552 		lock();
553 		scope(exit) unlock();
554         checkIndex(parameterIndex);
555         try {
556     		cmd.param(parameterIndex-1) = x;
557         } catch (Throwable e) {
558             throw new SQLException(e);
559         }
560 	}
561 	override void setDouble(int parameterIndex, double x){
562 		checkClosed();
563 		lock();
564 		scope(exit) unlock();
565         checkIndex(parameterIndex);
566         try {
567     		cmd.param(parameterIndex-1) = x;
568         } catch (Throwable e) {
569             throw new SQLException(e);
570         }
571 	}
572 	override void setBoolean(int parameterIndex, bool x) {
573         checkClosed();
574         lock();
575         scope(exit) unlock();
576         checkIndex(parameterIndex);
577         try {
578             cmd.param(parameterIndex-1) = x;
579         } catch (Throwable e) {
580             throw new SQLException(e);
581         }
582     }
583     override void setLong(int parameterIndex, long x) {
584         checkClosed();
585         lock();
586         scope(exit) unlock();
587         checkIndex(parameterIndex);
588         try {
589             cmd.param(parameterIndex-1) = x;
590         } catch (Throwable e) {
591             throw new SQLException(e);
592         }
593     }
594     override void setUlong(int parameterIndex, ulong x) {
595         checkClosed();
596         lock();
597         scope(exit) unlock();
598         checkIndex(parameterIndex);
599         try {
600             cmd.param(parameterIndex-1) = x;
601         } catch (Throwable e) {
602             throw new SQLException(e);
603         }
604     }
605     override void setInt(int parameterIndex, int x) {
606         checkClosed();
607         lock();
608         scope(exit) unlock();
609         checkIndex(parameterIndex);
610         try {
611             cmd.param(parameterIndex-1) = x;
612         } catch (Throwable e) {
613             throw new SQLException(e);
614         }
615     }
616     override void setUint(int parameterIndex, uint x) {
617         checkClosed();
618         lock();
619         scope(exit) unlock();
620         checkIndex(parameterIndex);
621         try {
622             cmd.param(parameterIndex-1) = x;
623         } catch (Throwable e) {
624             throw new SQLException(e);
625         }
626     }
627     override void setShort(int parameterIndex, short x) {
628         checkClosed();
629         lock();
630         scope(exit) unlock();
631         checkIndex(parameterIndex);
632         try {
633             cmd.param(parameterIndex-1) = x;
634         } catch (Throwable e) {
635             throw new SQLException(e);
636         }
637     }
638     override void setUshort(int parameterIndex, ushort x) {
639         checkClosed();
640         lock();
641         scope(exit) unlock();
642         checkIndex(parameterIndex);
643         try {
644             cmd.param(parameterIndex-1) = x;
645         } catch (Throwable e) {
646             throw new SQLException(e);
647         }
648     }
649     override void setByte(int parameterIndex, byte x) {
650         checkClosed();
651         lock();
652         scope(exit) unlock();
653         checkIndex(parameterIndex);
654         try {
655             cmd.param(parameterIndex-1) = x;
656         } catch (Throwable e) {
657             throw new SQLException(e);
658         }
659     }
660     override void setUbyte(int parameterIndex, ubyte x) {
661         checkClosed();
662         lock();
663         scope(exit) unlock();
664         checkIndex(parameterIndex);
665         try {
666             cmd.param(parameterIndex-1) = x;
667         } catch (Throwable e) {
668             throw new SQLException(e);
669         }
670     }
671     override void setBytes(int parameterIndex, byte[] x) {
672         checkClosed();
673         lock();
674         scope(exit) unlock();
675         checkIndex(parameterIndex);
676         try {
677             if (x.ptr is null)
678                 setNull(parameterIndex);
679             else
680                 cmd.param(parameterIndex-1) = x;
681         } catch (Throwable e) {
682             throw new SQLException(e);
683         }
684     }
685     override void setUbytes(int parameterIndex, ubyte[] x) {
686         checkClosed();
687         lock();
688         scope(exit) unlock();
689         checkIndex(parameterIndex);
690         try {
691             if (x.ptr is null)
692                 setNull(parameterIndex);
693             else
694                 cmd.param(parameterIndex-1) = x;
695         } catch (Throwable e) {
696             throw new SQLException(e);
697         }
698     }
699     override void setString(int parameterIndex, string x) {
700         checkClosed();
701         lock();
702         scope(exit) unlock();
703         checkIndex(parameterIndex);
704         try {
705             if (x.ptr is null)
706                 setNull(parameterIndex);
707             else
708                 cmd.param(parameterIndex-1) = x;
709         } catch (Throwable e) {
710             throw new SQLException(e);
711         }
712     }
713 	override void setDateTime(int parameterIndex, DateTime x) {
714 		checkClosed();
715 		lock();
716 		scope(exit) unlock();
717 		checkIndex(parameterIndex);
718         try {
719 		    cmd.param(parameterIndex-1) = x;
720         } catch (Throwable e) {
721             throw new SQLException(e);
722         }
723 	}
724 	override void setDate(int parameterIndex, Date x) {
725 		checkClosed();
726 		lock();
727 		scope(exit) unlock();
728 		checkIndex(parameterIndex);
729         try {
730     		cmd.param(parameterIndex-1) = x;
731         } catch (Throwable e) {
732             throw new SQLException(e);
733         }
734 	}
735 	override void setTime(int parameterIndex, TimeOfDay x) {
736 		checkClosed();
737 		lock();
738 		scope(exit) unlock();
739 		checkIndex(parameterIndex);
740         try {
741 		    cmd.param(parameterIndex-1) = x;
742         } catch (Throwable e) {
743             throw new SQLException(e);
744         }
745 	}
746 	override void setVariant(int parameterIndex, Variant x) {
747         checkClosed();
748         lock();
749         scope(exit) unlock();
750         checkIndex(parameterIndex);
751         try {
752             if (x == null)
753                 setNull(parameterIndex);
754             else
755                 cmd.param(parameterIndex-1) = x;
756         } catch (Throwable e) {
757             throw new SQLException(e);
758         }
759     }
760     override void setNull(int parameterIndex) {
761         checkClosed();
762         lock();
763         scope(exit) unlock();
764         checkIndex(parameterIndex);
765         try {
766             cmd.setNullParam(parameterIndex-1);
767         } catch (Throwable e) {
768             throw new SQLException(e);
769         }
770     }
771     override void setNull(int parameterIndex, int sqlType) {
772         checkClosed();
773         lock();
774         scope(exit) unlock();
775         try {
776             setNull(parameterIndex);
777         } catch (Throwable e) {
778             throw new SQLException(e);
779         }
780     }
781 }
782 
783 class MySQLResultSet : ResultSetImpl {
784     private MySQLStatement stmt;
785     private mysql.connection.ResultSet rs;
786     ResultSetMetaData metadata;
787     private bool closed;
788     private int currentRowIndex;
789     private int rowCount;
790     private int[string] columnMap;
791     private bool lastIsNull;
792     private int columnCount;
793 
794     Variant getValue(int columnIndex) {
795 		checkClosed();
796         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
797         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
798         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
799 		Variant res;
800 		if (!lastIsNull)
801 		    res = rs[currentRowIndex][columnIndex - 1];
802         return res;
803     }
804 
805 	void checkClosed() {
806 		if (closed)
807 			throw new SQLException("Result set is already closed");
808 	}
809 
810 public:
811 
812     void lock() {
813         stmt.lock();
814     }
815 
816     void unlock() {
817         stmt.unlock();
818     }
819 
820     this(MySQLStatement stmt, mysql.connection.ResultSet resultSet, ResultSetMetaData metadata) {
821         this.stmt = stmt;
822         this.rs = resultSet;
823         this.metadata = metadata;
824         try {
825             closed = false;
826             rowCount = cast(int)rs.length;
827             currentRowIndex = -1;
828 			foreach(key, val; rs.colNameIndicies)
829 				columnMap[key] = cast(int)val;
830     		columnCount = cast(int)rs.colNames.length;
831         } catch (Throwable e) {
832             throw new SQLException(e);
833         }
834     }
835 
836 	void onStatementClosed() {
837 		closed = true;
838 	}
839     string decodeTextBlob(ubyte[] data) {
840         char[] res = new char[data.length];
841         foreach (i, ch; data) {
842             res[i] = cast(char)ch;
843         }
844         return to!string(res);
845     }
846 
847     // ResultSet interface implementation
848 
849     //Retrieves the number, types and properties of this ResultSet object's columns
850     override ResultSetMetaData getMetaData() {
851         checkClosed();
852         lock();
853         scope(exit) unlock();
854         return metadata;
855     }
856 
857     override void close() {
858         checkClosed();
859         lock();
860         scope(exit) unlock();
861         stmt.closeResultSet();
862        	closed = true;
863     }
864     override bool first() {
865 		checkClosed();
866         lock();
867         scope(exit) unlock();
868         currentRowIndex = 0;
869         return currentRowIndex >= 0 && currentRowIndex < rowCount;
870     }
871     override bool isFirst() {
872 		checkClosed();
873         lock();
874         scope(exit) unlock();
875         return rowCount > 0 && currentRowIndex == 0;
876     }
877     override bool isLast() {
878 		checkClosed();
879         lock();
880         scope(exit) unlock();
881         return rowCount > 0 && currentRowIndex == rowCount - 1;
882     }
883     override bool next() {
884 		checkClosed();
885         lock();
886         scope(exit) unlock();
887         if (currentRowIndex + 1 >= rowCount)
888             return false;
889         currentRowIndex++;
890         return true;
891     }
892     
893     override int findColumn(string columnName) {
894 		checkClosed();
895         lock();
896         scope(exit) unlock();
897         int * p = (columnName in columnMap);
898         if (!p)
899             throw new SQLException("Column " ~ columnName ~ " not found");
900         return *p + 1;
901     }
902 
903     override bool getBoolean(int columnIndex) {
904         checkClosed();
905         lock();
906         scope(exit) unlock();
907         Variant v = getValue(columnIndex);
908         if (lastIsNull)
909             return false;
910         if (v.convertsTo!(bool))
911             return v.get!(bool);
912         if (v.convertsTo!(int))
913             return v.get!(int) != 0;
914         if (v.convertsTo!(long))
915             return v.get!(long) != 0;
916         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
917     }
918     override ubyte getUbyte(int columnIndex) {
919         checkClosed();
920         lock();
921         scope(exit) unlock();
922         Variant v = getValue(columnIndex);
923         if (lastIsNull)
924             return 0;
925         if (v.convertsTo!(ubyte))
926             return v.get!(ubyte);
927         if (v.convertsTo!(long))
928             return to!ubyte(v.get!(long));
929         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
930     }
931     override byte getByte(int columnIndex) {
932         checkClosed();
933         lock();
934         scope(exit) unlock();
935         Variant v = getValue(columnIndex);
936         if (lastIsNull)
937             return 0;
938         if (v.convertsTo!(byte))
939             return v.get!(byte);
940         if (v.convertsTo!(long))
941             return to!byte(v.get!(long));
942         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
943     }
944     override short getShort(int columnIndex) {
945         checkClosed();
946         lock();
947         scope(exit) unlock();
948         Variant v = getValue(columnIndex);
949         if (lastIsNull)
950             return 0;
951         if (v.convertsTo!(short))
952             return v.get!(short);
953         if (v.convertsTo!(long))
954             return to!short(v.get!(long));
955         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
956     }
957     override ushort getUshort(int columnIndex) {
958         checkClosed();
959         lock();
960         scope(exit) unlock();
961         Variant v = getValue(columnIndex);
962         if (lastIsNull)
963             return 0;
964         if (v.convertsTo!(ushort))
965             return v.get!(ushort);
966         if (v.convertsTo!(long))
967             return to!ushort(v.get!(long));
968         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
969     }
970     override int getInt(int columnIndex) {
971         checkClosed();
972         lock();
973         scope(exit) unlock();
974         Variant v = getValue(columnIndex);
975         if (lastIsNull)
976             return 0;
977         if (v.convertsTo!(int))
978             return v.get!(int);
979         if (v.convertsTo!(long))
980             return to!int(v.get!(long));
981         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
982     }
983     override uint getUint(int columnIndex) {
984         checkClosed();
985         lock();
986         scope(exit) unlock();
987         Variant v = getValue(columnIndex);
988         if (lastIsNull)
989             return 0;
990         if (v.convertsTo!(uint))
991             return v.get!(uint);
992         if (v.convertsTo!(ulong))
993             return to!int(v.get!(ulong));
994         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
995     }
996     override long getLong(int columnIndex) {
997         checkClosed();
998         lock();
999         scope(exit) unlock();
1000         Variant v = getValue(columnIndex);
1001         if (lastIsNull)
1002             return 0;
1003         if (v.convertsTo!(long))
1004             return v.get!(long);
1005         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1006     }
1007     override ulong getUlong(int columnIndex) {
1008         checkClosed();
1009         lock();
1010         scope(exit) unlock();
1011         Variant v = getValue(columnIndex);
1012         if (lastIsNull)
1013             return 0;
1014         if (v.convertsTo!(ulong))
1015             return v.get!(ulong);
1016         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1017     }
1018     override double getDouble(int columnIndex) {
1019         checkClosed();
1020         lock();
1021         scope(exit) unlock();
1022         Variant v = getValue(columnIndex);
1023         if (lastIsNull)
1024             return 0;
1025         if (v.convertsTo!(double))
1026             return v.get!(double);
1027         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1028     }
1029     override float getFloat(int columnIndex) {
1030         checkClosed();
1031         lock();
1032         scope(exit) unlock();
1033         Variant v = getValue(columnIndex);
1034         if (lastIsNull)
1035             return 0;
1036         if (v.convertsTo!(float))
1037             return v.get!(float);
1038         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1039     }
1040     override byte[] getBytes(int columnIndex) {
1041         checkClosed();
1042         lock();
1043         scope(exit) unlock();
1044         Variant v = getValue(columnIndex);
1045         if (lastIsNull)
1046             return null;
1047         if (v.convertsTo!(byte[])) {
1048             return v.get!(byte[]);
1049         }
1050         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1051     }
1052 	override ubyte[] getUbytes(int columnIndex) {
1053 		checkClosed();
1054 		lock();
1055 		scope(exit) unlock();
1056 		Variant v = getValue(columnIndex);
1057 		if (lastIsNull)
1058 			return null;
1059 		if (v.convertsTo!(ubyte[])) {
1060 			return v.get!(ubyte[]);
1061 		}
1062 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1063 	}
1064 	override string getString(int columnIndex) {
1065         checkClosed();
1066         lock();
1067         scope(exit) unlock();
1068         Variant v = getValue(columnIndex);
1069         if (lastIsNull)
1070             return null;
1071 		if (v.convertsTo!(ubyte[])) {
1072 			// assume blob encoding is utf-8
1073 			// TODO: check field encoding
1074             return decodeTextBlob(v.get!(ubyte[]));
1075 		}
1076         return v.toString();
1077     }
1078 	override std.datetime.DateTime getDateTime(int columnIndex) {
1079 		checkClosed();
1080 		lock();
1081 		scope(exit) unlock();
1082 		Variant v = getValue(columnIndex);
1083 		if (lastIsNull)
1084 			return DateTime();
1085 		if (v.convertsTo!(DateTime)) {
1086 			return v.get!DateTime();
1087 		}
1088 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1089 	}
1090 	override std.datetime.Date getDate(int columnIndex) {
1091 		checkClosed();
1092 		lock();
1093 		scope(exit) unlock();
1094 		Variant v = getValue(columnIndex);
1095 		if (lastIsNull)
1096 			return Date();
1097 		if (v.convertsTo!(Date)) {
1098 			return v.get!Date();
1099 		}
1100 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1101 	}
1102 	override std.datetime.TimeOfDay getTime(int columnIndex) {
1103 		checkClosed();
1104 		lock();
1105 		scope(exit) unlock();
1106 		Variant v = getValue(columnIndex);
1107 		if (lastIsNull)
1108 			return TimeOfDay();
1109 		if (v.convertsTo!(TimeOfDay)) {
1110 			return v.get!TimeOfDay();
1111 		}
1112 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1113 	}
1114 
1115     override Variant getVariant(int columnIndex) {
1116         checkClosed();
1117         lock();
1118         scope(exit) unlock();
1119         Variant v = getValue(columnIndex);
1120         if (lastIsNull) {
1121             Variant vnull = null;
1122             return vnull;
1123         }
1124         return v;
1125     }
1126     override bool wasNull() {
1127         checkClosed();
1128         lock();
1129         scope(exit) unlock();
1130         return lastIsNull;
1131     }
1132     override bool isNull(int columnIndex) {
1133         checkClosed();
1134         lock();
1135         scope(exit) unlock();
1136         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1137         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1138         return rs[currentRowIndex].isNull(columnIndex - 1);
1139     }
1140 
1141     //Retrieves the Statement object that produced this ResultSet object.
1142     override Statement getStatement() {
1143         checkClosed();
1144         lock();
1145         scope(exit) unlock();
1146         return stmt;
1147     }
1148 
1149     //Retrieves the current row number
1150     override int getRow() {
1151         checkClosed();
1152         lock();
1153         scope(exit) unlock();
1154         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1155             return 0;
1156         return currentRowIndex + 1;
1157     }
1158 
1159     //Retrieves the fetch size for this ResultSet object.
1160     override int getFetchSize() {
1161         checkClosed();
1162         lock();
1163         scope(exit) unlock();
1164         return rowCount;
1165     }
1166 }
1167 
1168 // sample URL:
1169 // mysql://localhost:3306/DatabaseName
1170 class MySQLDriver : Driver {
1171     // helper function
1172     public static string generateUrl(string host, ushort port, string dbname) {
1173         return "mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1174     }
1175 	public static string[string] setUserAndPassword(string username, string password) {
1176 		string[string] params;
1177         params["user"] = username;
1178         params["password"] = password;
1179 		return params;
1180     }
1181     override ddbc.core.Connection connect(string url, string[string] params) {
1182         //writeln("MySQLDriver.connect " ~ url);
1183         return new MySQLConnection(url, params);
1184     }
1185 }
1186 
1187 unittest {
1188     static if (MYSQL_TESTS_ENABLED) {
1189 
1190         import std.conv;
1191 
1192         DataSource ds = createUnitTestMySQLDataSource();
1193 
1194         auto conn = ds.getConnection();
1195         scope(exit) conn.close();
1196         auto stmt = conn.createStatement();
1197         scope(exit) stmt.close();
1198 
1199         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1200         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1201         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1202         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1203         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1204         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1205         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1206         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1207         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1208         
1209         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1210         ps.setString(1, null);
1211         ps.setLong(2, 3);
1212         assert(ps.executeUpdate() == 1);
1213         
1214         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1215 
1216         // testing result set meta data
1217         ResultSetMetaData meta = rs.getMetaData();
1218         assert(meta.getColumnCount() == 4);
1219         assert(meta.getColumnName(1) == "id");
1220         assert(meta.getColumnLabel(1) == "id");
1221         assert(meta.isNullable(1) == false);
1222         assert(meta.isNullable(2) == true);
1223         assert(meta.isNullable(3) == true);
1224         assert(meta.getColumnName(2) == "name");
1225         assert(meta.getColumnLabel(2) == "name_alias");
1226         assert(meta.getColumnName(3) == "comment");
1227 
1228         int rowCount = rs.getFetchSize();
1229         assert(rowCount == 6);
1230         int index = 1;
1231         while (rs.next()) {
1232             assert(!rs.isNull(1));
1233             ubyte[] bytes = rs.getUbytes(3);
1234             int rowIndex = rs.getRow();
1235             assert(rowIndex == index);
1236             long id = rs.getLong(1);
1237             assert(id == index);
1238             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1239             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1240             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1241 			if (id == 1) {
1242 				DateTime ts = rs.getDateTime(4);
1243 				assert(ts == DateTime(2013,02,02,12,30,25));
1244 			}
1245 			if (id == 4) {
1246                 assert(rs.getString(2) == "name4_x");
1247                 assert(rs.isNull(3));
1248             }
1249             if (id == 5) {
1250                 assert(rs.isNull(2));
1251                 assert(!rs.isNull(3));
1252             }
1253             if (id == 6) {
1254                 assert(!rs.isNull(2));
1255                 assert(rs.isNull(3));
1256             }
1257             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1258             index++;
1259         }
1260         
1261         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1262 		scope(exit) ps2.close();
1263         ps2.setLong(1, 3);
1264         rs = ps2.executeQuery();
1265         while (rs.next()) {
1266             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1267             index++;
1268         }
1269 
1270 		// checking last insert ID for prepared statement
1271 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1272 		scope(exit) ps3.close();
1273 		Variant newId;
1274 		assert(ps3.executeUpdate(newId) == 1);
1275 		//writeln("Generated insert id = " ~ newId.toString());
1276 		assert(newId.get!ulong > 0);
1277 
1278 		// checking last insert ID for normal statement
1279 		Statement stmt4 = conn.createStatement();
1280 		scope(exit) stmt4.close();
1281 		Variant newId2;
1282 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1283 		//writeln("Generated insert id = " ~ newId2.toString());
1284 		assert(newId2.get!ulong > 0);
1285 
1286 	}
1287 }
1288 
1289 __gshared static this() {
1290     // register MySQLDriver
1291     import ddbc.common;
1292     DriverFactory.registerDriverFactory("mysql", delegate() { return new MySQLDriver(); });
1293 }
1294 
1295 
1296 } else { // version(USE_MYSQL)
1297     version(unittest) {
1298         immutable bool MYSQL_TESTS_ENABLED = false;
1299     }
1300 }