1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv : to;
29 import std.datetime : Date, DateTime, TimeOfDay;
30 import std.datetime.date;
31 import std.datetime.systime;
32 import std.exception;
33 
34 // For backwards compatibily
35 // 'enforceEx' will be removed with 2.089
36 static if(__VERSION__ < 2080) {
37     alias enforceHelper = enforceEx;
38 } else {
39     alias enforceHelper = enforce;
40 }
41 
42 static if(__traits(compiles, (){ import std.experimental.logger; } )) {
43     import std.experimental.logger;
44 }
45 import std.stdio;
46 import std..string;
47 import std.variant;
48 import core.sync.mutex;
49 import ddbc.common;
50 import ddbc.core;
51 
52 version(USE_MYSQL) {
53 
54 import std.array;
55 import mysql.connection : prepare;
56 import mysql.commands : query, exec;
57 import mysql.prepared;
58 import mysql.protocol.constants;
59 import mysql.protocol.packets : FieldDescription, ParamDescription;
60 import mysql.result : Row, ResultRange;
61 
62 version(unittest) {
63     /*
64         To allow unit tests using MySQL server,
65         run mysql client using admin privileges, e.g. for MySQL server on localhost:
66         > mysql -uroot
67 
68         Create test user and test DB:
69 
70         mysql> CREATE DATABASE IF NOT EXISTS testdb;
71         mysql> CREATE USER 'travis'@'localhost' IDENTIFIED BY '';
72         mysql> GRANT ALL PRIVILEGES ON testdb.* TO 'travis'@'localhost';
73 
74         mysql> CREATE USER 'testuser'@'localhost';
75         mysql> GRANT ALL PRIVILEGES ON testdb.* TO 'testuser'@'localhost' IDENTIFIED BY 'testpassword';
76         mysql> FLUSH PRIVILEGES;
77      */
78     /// change to false to disable tests on real MySQL server
79     immutable bool MYSQL_TESTS_ENABLED = true;
80     /// change parameters if necessary
81     const string MYSQL_UNITTEST_HOST = "localhost";
82     const int    MYSQL_UNITTEST_PORT = 3306;
83     const string MYSQL_UNITTEST_USER = "travis"; // "testuser";
84     const string MYSQL_UNITTEST_PASSWORD = ""; // "testpassword";
85     const string MYSQL_UNITTEST_DB = "testdb";
86 
87     static if (MYSQL_TESTS_ENABLED) {
88         /// use this data source for tests
89         
90         DataSource createUnitTestMySQLDataSource() {
91             string url = makeDDBCUrl("mysql", MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
92             string[string] params;
93             setUserAndPassword(params, MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
94             return createConnectionPool(url, params);
95         }
96     }
97 }
98 
99 SqlType fromMySQLType(int t) {
100 	switch(t) {
101 	case SQLType.DECIMAL:
102 		case SQLType.TINY: return SqlType.TINYINT;
103 		case SQLType.SHORT: return SqlType.SMALLINT;
104 		case SQLType.INT: return SqlType.INTEGER;
105 		case SQLType.FLOAT: return SqlType.FLOAT;
106 		case SQLType.DOUBLE: return SqlType.DOUBLE;
107 		case SQLType.NULL: return SqlType.NULL;
108 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
109 		case SQLType.LONGLONG: return SqlType.BIGINT;
110 		case SQLType.INT24: return SqlType.INTEGER;
111 		case SQLType.DATE: return SqlType.DATE;
112 		case SQLType.TIME: return SqlType.TIME;
113 		case SQLType.DATETIME: return SqlType.DATETIME;
114 		case SQLType.YEAR: return SqlType.SMALLINT;
115 		case SQLType.NEWDATE: return SqlType.DATE;
116 		case SQLType.VARCHAR: return SqlType.VARCHAR;
117 		case SQLType.BIT: return SqlType.BIT;
118 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
119 		case SQLType.ENUM: return SqlType.OTHER;
120 		case SQLType.SET: return SqlType.OTHER;
121 		case SQLType.TINYBLOB: return SqlType.BLOB;
122 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
123 		case SQLType.LONGBLOB: return SqlType.BLOB;
124 		case SQLType.BLOB: return SqlType.BLOB;
125 		case SQLType.VARSTRING: return SqlType.VARCHAR;
126 		case SQLType.STRING: return SqlType.VARCHAR;
127 		case SQLType.GEOMETRY: return SqlType.OTHER;
128 		default: return SqlType.OTHER;
129 	}
130 }
131 
132 class MySQLConnection : ddbc.core.Connection {
133 private:
134     string url;
135     string[string] params;
136     string dbName;
137     string username;
138     string password;
139     string hostname;
140     int port = 3306;
141     mysql.connection.Connection conn;
142     bool closed;
143     bool autocommit;
144     Mutex mutex;
145 
146 
147 	MySQLStatement [] activeStatements;
148 
149 	void closeUnclosedStatements() {
150 		MySQLStatement [] list = activeStatements.dup;
151 		foreach(stmt; list) {
152 			stmt.close();
153 		}
154 	}
155 
156 	void checkClosed() {
157 		if (closed)
158 			throw new SQLException("Connection is already closed");
159 	}
160 
161 public:
162 
163     void lock() {
164         mutex.lock();
165     }
166 
167     void unlock() {
168         mutex.unlock();
169     }
170 
171     mysql.connection.Connection getConnection() { return conn; }
172 
173 
174 	void onStatementClosed(MySQLStatement stmt) {
175         myRemove(activeStatements, stmt);
176 	}
177 
178     this(string url, string[string] params) {
179         //writeln("MySQLConnection() creating connection");
180         mutex = new Mutex();
181         this.url = url;
182         this.params = params;
183         try {
184             //writeln("parsing url " ~ url);
185             extractParamsFromURL(url, this.params);
186             string dbName = "";
187     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
188     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
189     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
190     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
191             if (hostNameEnd < url.length - 1) {
192                 dbName = url[hostNameEnd + 1 .. $];
193             }
194             hostname = url[hostNameStart..hostNameEnd];
195             if (hostname.length == 0)
196                 hostname = "localhost";
197     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
198             if (portDelimiter >= 0) {
199                 string portString = hostname[portDelimiter + 1 .. $];
200                 hostname = hostname[0 .. portDelimiter];
201                 if (portString.length > 0)
202                     port = to!int(portString);
203                 if (port < 1 || port > 65535)
204                     port = 3306;
205             }
206             if ("user" in this.params)
207                 username = this.params["user"];
208             if ("password" in this.params)
209                 password = this.params["password"];
210 
211             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
212 
213             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
214             closed = false;
215             setAutoCommit(true);
216         } catch (Throwable e) {
217             //writeln(e.msg);
218             throw new SQLException(e);
219         }
220 
221         //writeln("MySQLConnection() connection created");
222     }
223     override void close() {
224 		checkClosed();
225 
226         lock();
227         scope(exit) unlock();
228         try {
229             closeUnclosedStatements();
230 
231             conn.close();
232             closed = true;
233         } catch (Throwable e) {
234             throw new SQLException(e);
235         }
236     }
237     override void commit() {
238         checkClosed();
239 
240         lock();
241         scope(exit) unlock();
242 
243         try {
244             Statement stmt = createStatement();
245             scope(exit) stmt.close();
246             stmt.executeUpdate("COMMIT");
247         } catch (Throwable e) {
248             throw new SQLException(e);
249         }
250     }
251     override Statement createStatement() {
252         checkClosed();
253 
254         lock();
255         scope(exit) unlock();
256 
257         try {
258             MySQLStatement stmt = new MySQLStatement(this);
259     		activeStatements ~= stmt;
260             return stmt;
261         } catch (Throwable e) {
262             throw new SQLException(e);
263         }
264     }
265 
266     PreparedStatement prepareStatement(string sql) {
267         checkClosed();
268 
269         lock();
270         scope(exit) unlock();
271 
272         try {
273             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
274             activeStatements ~= stmt;
275             return stmt;
276         } catch (Throwable e) {
277             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
278         }
279     }
280 
281     override string getCatalog() {
282         return dbName;
283     }
284 
285     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
286     override void setCatalog(string catalog) {
287         checkClosed();
288         if (dbName == catalog)
289             return;
290 
291         lock();
292         scope(exit) unlock();
293 
294         try {
295             conn.selectDB(catalog);
296             dbName = catalog;
297         } catch (Throwable e) {
298             throw new SQLException(e);
299         }
300     }
301 
302     override bool isClosed() {
303         return closed;
304     }
305 
306     override void rollback() {
307         checkClosed();
308 
309         lock();
310         scope(exit) unlock();
311 
312         try {
313             Statement stmt = createStatement();
314             scope(exit) stmt.close();
315             stmt.executeUpdate("ROLLBACK");
316         } catch (Throwable e) {
317             throw new SQLException(e);
318         }
319     }
320     override bool getAutoCommit() {
321         return autocommit;
322     }
323     override void setAutoCommit(bool autoCommit) {
324         checkClosed();
325         if (this.autocommit == autoCommit)
326             return;
327         lock();
328         scope(exit) unlock();
329 
330         try {
331             Statement stmt = createStatement();
332             scope(exit) stmt.close();
333             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
334             this.autocommit = autoCommit;
335         } catch (Throwable e) {
336             throw new SQLException(e);
337         }
338     }
339 }
340 
341 class MySQLStatement : Statement {
342 private:
343     MySQLConnection conn;
344     ResultRange results;
345 	MySQLResultSet resultSet;
346 
347     bool closed;
348 
349 public:
350     void checkClosed() {
351         enforceHelper!SQLException(!closed, "Statement is already closed");
352     }
353 
354     void lock() {
355         conn.lock();
356     }
357     
358     void unlock() {
359         conn.unlock();
360     }
361 
362     this(MySQLConnection conn) {
363         this.conn = conn;
364     }
365 
366     ResultSetMetaData createMetadata(FieldDescription[] fields) {
367         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
368         foreach(i, field; fields) {
369             ColumnMetadataItem item = new ColumnMetadataItem();
370             item.schemaName = field.db;
371             item.name = field.originalName;
372             item.label = field.name;
373             item.precision = field.length;
374             item.scale = field.scale;
375             item.isNullable = !field.notNull;
376             item.isSigned = !field.unsigned;
377 			item.type = fromMySQLType(field.type);
378             // TODO: fill more params
379             res[i] = item;
380         }
381         return new ResultSetMetaDataImpl(res);
382     }
383     ParameterMetaData createMetadata(ParamDescription[] fields) {
384         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
385         foreach(i, field; fields) {
386             ParameterMetaDataItem item = new ParameterMetaDataItem();
387             item.precision = field.length;
388             item.scale = field.scale;
389             item.isNullable = !field.notNull;
390             item.isSigned = !field.unsigned;
391 			item.type = fromMySQLType(field.type);
392 			// TODO: fill more params
393             res[i] = item;
394         }
395         return new ParameterMetaDataImpl(res);
396     }
397 public:
398     MySQLConnection getConnection() {
399         checkClosed();
400         return conn;
401     }
402     override ddbc.core.ResultSet executeQuery(string queryString) {
403         checkClosed();
404         lock();
405         scope(exit) unlock();
406 
407         static if(__traits(compiles, (){ import std.experimental.logger; } )) {
408             sharedLog.tracef(queryString);
409         }
410 
411 		try {
412 	        results = query(conn.getConnection(), queryString);
413     	    resultSet = new MySQLResultSet(this, results, createMetadata(conn.getConnection().resultFieldDescriptions));
414         	return resultSet;
415 		} catch (Throwable e) {
416             throw new SQLException(e.msg ~ " - while execution of query " ~ queryString);
417         }
418 	}
419     override int executeUpdate(string query) {
420         checkClosed();
421         lock();
422         scope(exit) unlock();
423 		ulong rowsAffected = 0;
424 
425         static if(__traits(compiles, (){ import std.experimental.logger; } )) {
426             sharedLog.trace(query);
427         }
428         
429 		try {
430 			rowsAffected = exec(conn.getConnection(), query);
431 	        return cast(int)rowsAffected;
432 		} catch (Throwable e) {
433 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
434 		}
435     }
436 	override int executeUpdate(string query, out Variant insertId) {
437 		checkClosed();
438 		lock();
439 		scope(exit) unlock();
440         try {
441     		ulong rowsAffected = exec(conn.getConnection(), query);
442     		insertId = Variant(conn.getConnection().lastInsertID);
443     		return cast(int)rowsAffected;
444         } catch (Throwable e) {
445             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
446         }
447 	}
448 	override void close() {
449         checkClosed();
450         lock();
451         scope(exit) unlock();
452         try {
453             closeResultSet();
454             closed = true;
455             conn.onStatementClosed(this);
456         } catch (Throwable e) {
457             throw new SQLException(e);
458         }
459     }
460     void closeResultSet() {
461 		if (resultSet !is null) {
462 			resultSet.onStatementClosed();
463 			resultSet = null;
464 		}
465     }
466 }
467 
468 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
469 
470     private Prepared statement;
471     private int paramCount;
472     private ResultSetMetaData metadata;
473     private ParameterMetaData paramMetadata;
474 
475     this(MySQLConnection conn, string queryString) {
476         super(conn);
477 
478         try {
479             this.statement = prepare(conn.getConnection(), queryString);
480             this.paramCount = this.statement.numArgs;
481         } catch (Throwable e) {
482             throw new SQLException(e);
483         }
484     }
485     void checkIndex(int index) {
486         if (index < 1 || index > paramCount)
487             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
488     }
489     Variant getParam(int index) {
490         checkIndex(index);
491         return this.statement.getArg( cast(ushort)(index - 1) );
492     }
493 public:
494 
495     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
496     override ResultSetMetaData getMetaData() {
497         checkClosed();
498         lock();
499         scope(exit) unlock();
500         try {
501             if (metadata is null) {
502                 metadata = createMetadata(this.statement.preparedFieldDescriptions);
503             }
504             return metadata;
505         } catch (Throwable e) {
506             throw new SQLException(e);
507         }
508     }
509 
510     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
511     override ParameterMetaData getParameterMetaData() {
512         checkClosed();
513         lock();
514         scope(exit) unlock();
515         try {
516             if (paramMetadata is null) {
517                 paramMetadata = createMetadata(this.statement.preparedParamDescriptions);
518             }
519             return paramMetadata;
520         } catch (Throwable e) {
521             throw new SQLException(e);
522         }
523     }
524 
525     override int executeUpdate() {
526         checkClosed();
527         lock();
528         scope(exit) unlock();
529         try {
530             ulong rowsAffected = 0;
531             rowsAffected = conn.getConnection().exec(statement);
532             return cast(int)rowsAffected;
533         } catch (Throwable e) {
534             throw new SQLException(e);
535         }
536     }
537 
538 	override int executeUpdate(out Variant insertId) {
539 		checkClosed();
540 		lock();
541 		scope(exit) unlock();
542         try {
543     		ulong rowsAffected = 0;
544     		rowsAffected = conn.getConnection().exec(statement);
545     		insertId = conn.getConnection().lastInsertID;
546     		return cast(int)rowsAffected;
547         } catch (Throwable e) {
548             throw new SQLException(e);
549         }
550 	}
551 
552     override ddbc.core.ResultSet executeQuery() {
553         checkClosed();
554         lock();
555         scope(exit) unlock();
556 
557         static if(__traits(compiles, (){ import std.experimental.logger; } )) {
558             sharedLog.trace(statement.sql());
559         }
560 
561         try {
562             results = query(conn.getConnection(), statement);
563             resultSet = new MySQLResultSet(this, results, getMetaData());
564             return resultSet;
565         } catch (Throwable e) {
566             throw new SQLException(e);
567         }
568     }
569     
570     override void clearParameters() {
571         checkClosed();
572         lock();
573         scope(exit) unlock();
574         try {
575             for (int i = 1; i <= paramCount; i++)
576                 setNull(i);
577         } catch (Throwable e) {
578             throw new SQLException(e);
579         }
580     }
581     
582 	override void setFloat(int parameterIndex, float x) {
583 		checkClosed();
584 		lock();
585 		scope(exit) unlock();
586         checkIndex(parameterIndex);
587         try {
588     		this.statement.setArg(parameterIndex-1, x);
589         } catch (Throwable e) {
590             throw new SQLException(e);
591         }
592 	}
593 	override void setDouble(int parameterIndex, double x){
594 		checkClosed();
595 		lock();
596 		scope(exit) unlock();
597         checkIndex(parameterIndex);
598         try {
599     		this.statement.setArg(parameterIndex-1, x);
600         } catch (Throwable e) {
601             throw new SQLException(e);
602         }
603 	}
604 	override void setBoolean(int parameterIndex, bool x) {
605         checkClosed();
606         lock();
607         scope(exit) unlock();
608         checkIndex(parameterIndex);
609         try {
610             this.statement.setArg(parameterIndex-1, x);
611         } catch (Throwable e) {
612             throw new SQLException(e);
613         }
614     }
615     override void setLong(int parameterIndex, long x) {
616         checkClosed();
617         lock();
618         scope(exit) unlock();
619         checkIndex(parameterIndex);
620         try {
621             this.statement.setArg(parameterIndex-1, x);
622         } catch (Throwable e) {
623             throw new SQLException(e);
624         }
625     }
626     override void setUlong(int parameterIndex, ulong x) {
627         checkClosed();
628         lock();
629         scope(exit) unlock();
630         checkIndex(parameterIndex);
631         try {
632             this.statement.setArg(parameterIndex-1, x);
633         } catch (Throwable e) {
634             throw new SQLException(e);
635         }
636     }
637     override void setInt(int parameterIndex, int x) {
638         checkClosed();
639         lock();
640         scope(exit) unlock();
641         checkIndex(parameterIndex);
642         try {
643             this.statement.setArg(parameterIndex-1, x);
644         } catch (Throwable e) {
645             throw new SQLException(e);
646         }
647     }
648     override void setUint(int parameterIndex, uint x) {
649         checkClosed();
650         lock();
651         scope(exit) unlock();
652         checkIndex(parameterIndex);
653         try {
654             this.statement.setArg(parameterIndex-1, x);
655         } catch (Throwable e) {
656             throw new SQLException(e);
657         }
658     }
659     override void setShort(int parameterIndex, short x) {
660         checkClosed();
661         lock();
662         scope(exit) unlock();
663         checkIndex(parameterIndex);
664         try {
665             this.statement.setArg(parameterIndex-1, x);
666         } catch (Throwable e) {
667             throw new SQLException(e);
668         }
669     }
670     override void setUshort(int parameterIndex, ushort x) {
671         checkClosed();
672         lock();
673         scope(exit) unlock();
674         checkIndex(parameterIndex);
675         try {
676             this.statement.setArg(parameterIndex-1, x);
677         } catch (Throwable e) {
678             throw new SQLException(e);
679         }
680     }
681     override void setByte(int parameterIndex, byte x) {
682         checkClosed();
683         lock();
684         scope(exit) unlock();
685         checkIndex(parameterIndex);
686         try {
687             this.statement.setArg(parameterIndex-1, x);
688         } catch (Throwable e) {
689             throw new SQLException(e);
690         }
691     }
692     override void setUbyte(int parameterIndex, ubyte x) {
693         checkClosed();
694         lock();
695         scope(exit) unlock();
696         checkIndex(parameterIndex);
697         try {
698             this.statement.setArg(parameterIndex-1, x);
699         } catch (Throwable e) {
700             throw new SQLException(e);
701         }
702     }
703     override void setBytes(int parameterIndex, byte[] x) {
704         checkClosed();
705         lock();
706         scope(exit) unlock();
707         checkIndex(parameterIndex);
708         try {
709             if (x.ptr is null) {
710                 setNull(parameterIndex);
711             } else {
712                 this.statement.setArg(parameterIndex-1, x);
713             }
714         } catch (Throwable e) {
715             throw new SQLException(e);
716         }
717     }
718     override void setUbytes(int parameterIndex, ubyte[] x) {
719         checkClosed();
720         lock();
721         scope(exit) unlock();
722         checkIndex(parameterIndex);
723         try {
724             if (x.ptr is null) {
725                 setNull(parameterIndex);
726             } else {
727                 this.statement.setArg(parameterIndex-1, x);
728             }
729         } catch (Throwable e) {
730             throw new SQLException(e);
731         }
732     }
733     override void setString(int parameterIndex, string x) {
734         checkClosed();
735         lock();
736         scope(exit) unlock();
737         checkIndex(parameterIndex);
738         try {
739             if (x.ptr is null) {
740                 setNull(parameterIndex);
741             } else {
742                 this.statement.setArg(parameterIndex-1, x);
743             }
744         } catch (Throwable e) {
745             throw new SQLException(e);
746         }
747     }
748 
749     override void setSysTime(int parameterIndex, SysTime x) {
750         checkClosed();
751         lock();
752         scope(exit) unlock();
753         checkIndex(parameterIndex);
754         try {
755             this.statement.setArg(parameterIndex-1, x);
756         } catch (Throwable e) {
757             throw new SQLException(e);
758         }
759     }
760 
761 	override void setDateTime(int parameterIndex, DateTime x) {
762 		checkClosed();
763 		lock();
764 		scope(exit) unlock();
765 		checkIndex(parameterIndex);
766         try {
767 		    this.statement.setArg(parameterIndex-1, x);
768         } catch (Throwable e) {
769             throw new SQLException(e);
770         }
771 	}
772 	override void setDate(int parameterIndex, Date x) {
773 		checkClosed();
774 		lock();
775 		scope(exit) unlock();
776 		checkIndex(parameterIndex);
777         try {
778     		this.statement.setArg(parameterIndex-1, x);
779         } catch (Throwable e) {
780             throw new SQLException(e);
781         }
782 	}
783 	override void setTime(int parameterIndex, TimeOfDay x) {
784 		checkClosed();
785 		lock();
786 		scope(exit) unlock();
787 		checkIndex(parameterIndex);
788         try {
789 		    this.statement.setArg(parameterIndex-1, x);
790         } catch (Throwable e) {
791             throw new SQLException(e);
792         }
793 	}
794 	override void setVariant(int parameterIndex, Variant x) {
795         checkClosed();
796         lock();
797         scope(exit) unlock();
798         checkIndex(parameterIndex);
799         try {
800             if (x == null) {
801                 setNull(parameterIndex);
802             } else {
803                 this.statement.setArg(parameterIndex-1, x);
804             }
805         } catch (Throwable e) {
806             throw new SQLException(e);
807         }
808     }
809     override void setNull(int parameterIndex) {
810         checkClosed();
811         lock();
812         scope(exit) unlock();
813         checkIndex(parameterIndex);
814         try {
815             this.statement.setNullArg(parameterIndex-1);
816         } catch (Throwable e) {
817             throw new SQLException(e);
818         }
819     }
820     override void setNull(int parameterIndex, int sqlType) {
821         checkClosed();
822         lock();
823         scope(exit) unlock();
824         try {
825             setNull(parameterIndex);
826         } catch (Throwable e) {
827             throw new SQLException(e);
828         }
829     }
830 
831     override string toString() {
832         return to!string(this.statement.sql());
833     }
834 }
835 
836 class MySQLResultSet : ResultSetImpl {
837     private MySQLStatement stmt;
838     private ResultRange results;
839     ResultSetMetaData metadata;
840     private bool closed;
841     private int currentRowIndex;
842     private ulong rowCount;
843     private int[string] columnMap;
844     private bool lastIsNull;
845     private int columnCount;
846 
847     Variant getValue(int columnIndex) {
848 		checkClosed();
849         enforceHelper!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
850         enforceHelper!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
851         Row[] rs = results.array;
852         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
853 		Variant res;
854 		if (!lastIsNull)
855 		    res = rs[currentRowIndex][columnIndex - 1];
856         return res;
857     }
858 
859 	void checkClosed() {
860 		if (closed)
861 			throw new SQLException("Result set is already closed");
862 	}
863 
864 public:
865 
866     void lock() {
867         stmt.lock();
868     }
869 
870     void unlock() {
871         stmt.unlock();
872     }
873 
874     this(MySQLStatement stmt, ResultRange results, ResultSetMetaData metadata) {
875         this.stmt = stmt;
876         this.results = results;
877         this.metadata = metadata;
878         try {
879             closed = false;
880             //rowCount = cast(int)results.array.length;
881             currentRowIndex = -1;
882 			foreach(key, val; results.colNameIndicies) {
883 			    columnMap[key] = cast(int)val;
884 			}
885     		columnCount = cast(int)results.colNames.length;
886         } catch (Throwable e) {
887             throw new SQLException(e);
888         }
889     }
890 
891 	void onStatementClosed() {
892 		closed = true;
893 	}
894     string decodeTextBlob(ubyte[] data) {
895         char[] res = new char[data.length];
896         foreach (i, ch; data) {
897             res[i] = cast(char)ch;
898         }
899         return to!string(res);
900     }
901 
902     // ResultSet interface implementation
903 
904     //Retrieves the number, types and properties of this ResultSet object's columns
905     override ResultSetMetaData getMetaData() {
906         checkClosed();
907         lock();
908         scope(exit) unlock();
909         return metadata;
910     }
911 
912     override void close() {
913         checkClosed();
914         lock();
915         scope(exit) unlock();
916         stmt.closeResultSet();
917        	closed = true;
918     }
919     override bool first() {
920 		checkClosed();
921         lock();
922         scope(exit) unlock();
923         currentRowIndex = 0;
924         return currentRowIndex >= 0 && currentRowIndex < rowCount;
925     }
926     override bool isFirst() {
927 		checkClosed();
928         lock();
929         scope(exit) unlock();
930         return rowCount > 0 && currentRowIndex == 0;
931     }
932     override bool isLast() {
933 		checkClosed();
934         lock();
935         scope(exit) unlock();
936         return rowCount > 0 && currentRowIndex == rowCount - 1;
937     }
938     override bool next() {
939 		checkClosed();
940         lock();
941         scope(exit) unlock();
942         if (currentRowIndex + 1 >= rowCount)
943             return false;
944         currentRowIndex++;
945         return true;
946     }
947     
948     override int findColumn(string columnName) {
949 		checkClosed();
950         lock();
951         scope(exit) unlock();
952         int * p = (columnName in columnMap);
953         if (!p)
954             throw new SQLException("Column " ~ columnName ~ " not found");
955         return *p + 1;
956     }
957 
958     override bool getBoolean(int columnIndex) {
959         checkClosed();
960         lock();
961         scope(exit) unlock();
962         Variant v = getValue(columnIndex);
963         if (lastIsNull)
964             return false;
965         if (v.convertsTo!(bool))
966             return v.get!(bool);
967         if (v.convertsTo!(int))
968             return v.get!(int) != 0;
969         if (v.convertsTo!(long))
970             return v.get!(long) != 0;
971         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
972     }
973     override ubyte getUbyte(int columnIndex) {
974         checkClosed();
975         lock();
976         scope(exit) unlock();
977         Variant v = getValue(columnIndex);
978         if (lastIsNull)
979             return 0;
980         if (v.convertsTo!(ubyte))
981             return v.get!(ubyte);
982         if (v.convertsTo!(long))
983             return to!ubyte(v.get!(long));
984         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
985     }
986     override byte getByte(int columnIndex) {
987         checkClosed();
988         lock();
989         scope(exit) unlock();
990         Variant v = getValue(columnIndex);
991         if (lastIsNull)
992             return 0;
993         if (v.convertsTo!(byte))
994             return v.get!(byte);
995         if (v.convertsTo!(long))
996             return to!byte(v.get!(long));
997         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
998     }
999     override short getShort(int columnIndex) {
1000         checkClosed();
1001         lock();
1002         scope(exit) unlock();
1003         Variant v = getValue(columnIndex);
1004         if (lastIsNull)
1005             return 0;
1006         if (v.convertsTo!(short))
1007             return v.get!(short);
1008         if (v.convertsTo!(long))
1009             return to!short(v.get!(long));
1010         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
1011     }
1012     override ushort getUshort(int columnIndex) {
1013         checkClosed();
1014         lock();
1015         scope(exit) unlock();
1016         Variant v = getValue(columnIndex);
1017         if (lastIsNull)
1018             return 0;
1019         if (v.convertsTo!(ushort))
1020             return v.get!(ushort);
1021         if (v.convertsTo!(long))
1022             return to!ushort(v.get!(long));
1023         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
1024     }
1025     override int getInt(int columnIndex) {
1026         checkClosed();
1027         lock();
1028         scope(exit) unlock();
1029         Variant v = getValue(columnIndex);
1030         if (lastIsNull)
1031             return 0;
1032         if (v.convertsTo!(int))
1033             return v.get!(int);
1034         if (v.convertsTo!(long))
1035             return to!int(v.get!(long));
1036         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
1037     }
1038     override uint getUint(int columnIndex) {
1039         checkClosed();
1040         lock();
1041         scope(exit) unlock();
1042         Variant v = getValue(columnIndex);
1043         if (lastIsNull)
1044             return 0;
1045         if (v.convertsTo!(uint))
1046             return v.get!(uint);
1047         if (v.convertsTo!(ulong))
1048             return to!int(v.get!(ulong));
1049         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
1050     }
1051     override long getLong(int columnIndex) {
1052         checkClosed();
1053         lock();
1054         scope(exit) unlock();
1055         Variant v = getValue(columnIndex);
1056         if (lastIsNull)
1057             return 0;
1058         if (v.convertsTo!(long))
1059             return v.get!(long);
1060         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1061     }
1062     override ulong getUlong(int columnIndex) {
1063         checkClosed();
1064         lock();
1065         scope(exit) unlock();
1066         Variant v = getValue(columnIndex);
1067         if (lastIsNull)
1068             return 0;
1069         if (v.convertsTo!(ulong))
1070             return v.get!(ulong);
1071         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1072     }
1073     override double getDouble(int columnIndex) {
1074         checkClosed();
1075         lock();
1076         scope(exit) unlock();
1077         Variant v = getValue(columnIndex);
1078         if (lastIsNull)
1079             return 0;
1080         if (v.convertsTo!(double))
1081             return v.get!(double);
1082         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1083     }
1084     override float getFloat(int columnIndex) {
1085         checkClosed();
1086         lock();
1087         scope(exit) unlock();
1088         Variant v = getValue(columnIndex);
1089         if (lastIsNull)
1090             return 0;
1091         if (v.convertsTo!(float))
1092             return v.get!(float);
1093         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1094     }
1095     override byte[] getBytes(int columnIndex) {
1096         checkClosed();
1097         lock();
1098         scope(exit) unlock();
1099         Variant v = getValue(columnIndex);
1100         if (lastIsNull)
1101             return null;
1102         if (v.convertsTo!(byte[])) {
1103             return v.get!(byte[]);
1104         }
1105         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1106     }
1107 	override ubyte[] getUbytes(int columnIndex) {
1108 		checkClosed();
1109 		lock();
1110 		scope(exit) unlock();
1111 		Variant v = getValue(columnIndex);
1112 		if (lastIsNull)
1113 			return null;
1114 		if (v.convertsTo!(ubyte[])) {
1115 			return v.get!(ubyte[]);
1116 		}
1117 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1118 	}
1119 	override string getString(int columnIndex) {
1120         checkClosed();
1121         lock();
1122         scope(exit) unlock();
1123         Variant v = getValue(columnIndex);
1124         if (lastIsNull)
1125             return null;
1126 		if (v.convertsTo!(ubyte[])) {
1127 			// assume blob encoding is utf-8
1128 			// TODO: check field encoding
1129             return decodeTextBlob(v.get!(ubyte[]));
1130 		}
1131         return v.toString();
1132     }
1133 
1134     override SysTime getSysTime(int columnIndex) {
1135         checkClosed();
1136         lock();
1137         scope(exit) unlock();
1138         Variant v = getValue(columnIndex);
1139         if (lastIsNull)
1140             return SysTime();
1141         if (v.convertsTo!(SysTime)) {
1142             return v.get!SysTime();
1143         }
1144         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to SysTime");
1145     }
1146 
1147 	override DateTime getDateTime(int columnIndex) {
1148 		checkClosed();
1149 		lock();
1150 		scope(exit) unlock();
1151 		Variant v = getValue(columnIndex);
1152 		if (lastIsNull)
1153 			return DateTime();
1154 		if (v.convertsTo!(DateTime)) {
1155 			return v.get!DateTime();
1156 		}
1157 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1158 	}
1159 	override Date getDate(int columnIndex) {
1160 		checkClosed();
1161 		lock();
1162 		scope(exit) unlock();
1163 		Variant v = getValue(columnIndex);
1164 		if (lastIsNull)
1165 			return Date();
1166 		if (v.convertsTo!(Date)) {
1167 			return v.get!Date();
1168 		}
1169 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1170 	}
1171 	override TimeOfDay getTime(int columnIndex) {
1172 		checkClosed();
1173 		lock();
1174 		scope(exit) unlock();
1175 		Variant v = getValue(columnIndex);
1176 		if (lastIsNull)
1177 			return TimeOfDay();
1178 		if (v.convertsTo!(TimeOfDay)) {
1179 			return v.get!TimeOfDay();
1180 		}
1181 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1182 	}
1183 
1184     override Variant getVariant(int columnIndex) {
1185         checkClosed();
1186         lock();
1187         scope(exit) unlock();
1188         Variant v = getValue(columnIndex);
1189         if (lastIsNull) {
1190             Variant vnull = null;
1191             return vnull;
1192         }
1193         return v;
1194     }
1195     override bool wasNull() {
1196         checkClosed();
1197         lock();
1198         scope(exit) unlock();
1199         return lastIsNull;
1200     }
1201     override bool isNull(int columnIndex) {
1202         checkClosed();
1203         lock();
1204         scope(exit) unlock();
1205         enforceHelper!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1206         enforceHelper!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1207         return results.array[currentRowIndex].isNull(columnIndex - 1);
1208     }
1209 
1210     //Retrieves the Statement object that produced this ResultSet object.
1211     override Statement getStatement() {
1212         checkClosed();
1213         lock();
1214         scope(exit) unlock();
1215         return stmt;
1216     }
1217 
1218     //Retrieves the current row number
1219     override int getRow() {
1220         checkClosed();
1221         lock();
1222         scope(exit) unlock();
1223         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1224             return 0;
1225         return currentRowIndex + 1;
1226     }
1227 
1228     //Retrieves the fetch size for this ResultSet object.
1229     override ulong getFetchSize() {
1230         checkClosed();
1231         lock();
1232         scope(exit) unlock();
1233         return rowCount;
1234     }
1235 }
1236 
1237 // sample URL:
1238 // mysql://localhost:3306/DatabaseName
1239 class MySQLDriver : Driver {
1240     // helper function
1241     public static string generateUrl(string host, ushort port, string dbname) {
1242         return "ddbc:mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1243     }
1244 	public static string[string] setUserAndPassword(string username, string password) {
1245 		string[string] params;
1246         params["user"] = username;
1247         params["password"] = password;
1248 		return params;
1249     }
1250     override ddbc.core.Connection connect(string url, string[string] params) {
1251         //writeln("MySQLDriver.connect " ~ url);
1252         return new MySQLConnection(url, params);
1253     }
1254 }
1255 
1256 unittest {
1257     static if (MYSQL_TESTS_ENABLED) {
1258 
1259         DataSource ds = createUnitTestMySQLDataSource();
1260 
1261         auto conn = ds.getConnection();
1262         scope(exit) conn.close();
1263         auto stmt = conn.createStatement();
1264         scope(exit) stmt.close();
1265 
1266         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1267         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1268         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1269         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1270         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1271         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1272         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1273         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1274         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1275         
1276         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1277         ps.setString(1, null);
1278         ps.setLong(2, 3);
1279         assert(ps.executeUpdate() == 1);
1280         
1281         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1282 
1283         // testing result set meta data
1284         ResultSetMetaData meta = rs.getMetaData();
1285         assert(meta.getColumnCount() == 4);
1286         assert(meta.getColumnName(1) == "id");
1287         assert(meta.getColumnLabel(1) == "id");
1288         assert(meta.isNullable(1) == false);
1289         assert(meta.isNullable(2) == true);
1290         assert(meta.isNullable(3) == true);
1291         assert(meta.getColumnName(2) == "name");
1292         assert(meta.getColumnLabel(2) == "name_alias");
1293         assert(meta.getColumnName(3) == "comment");
1294 
1295         //auto rowCount = rs.getFetchSize();
1296         //assert(rowCount == 6, "Expected 6 rows but there were " ~ to!string(rowCount));
1297 
1298         int index = 1;
1299         while (rs.next()) {
1300             assert(!rs.isNull(1));
1301             ubyte[] bytes = rs.getUbytes(3);
1302             int rowIndex = rs.getRow();
1303             assert(rowIndex == index);
1304             long id = rs.getLong(1);
1305             assert(id == index);
1306             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1307             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1308             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1309 			if (id == 1) {
1310 				DateTime ts = rs.getDateTime(4);
1311 				assert(ts == DateTime(2013,02,02,12,30,25));
1312 			}
1313 			if (id == 4) {
1314                 assert(rs.getString(2) == "name4_x");
1315                 assert(rs.isNull(3));
1316             }
1317             if (id == 5) {
1318                 assert(rs.isNull(2));
1319                 assert(!rs.isNull(3));
1320             }
1321             if (id == 6) {
1322                 assert(!rs.isNull(2));
1323                 assert(rs.isNull(3));
1324             }
1325             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1326             index++;
1327         }
1328         
1329         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1330 		scope(exit) ps2.close();
1331         ps2.setLong(1, 3);
1332         rs = ps2.executeQuery();
1333         while (rs.next()) {
1334             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1335             index++;
1336         }
1337 
1338 		// checking last insert ID for prepared statement
1339 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1340 		scope(exit) ps3.close();
1341 		Variant newId;
1342 		assert(ps3.executeUpdate(newId) == 1);
1343 		//writeln("Generated insert id = " ~ newId.toString());
1344 		assert(newId.get!ulong > 0);
1345 
1346 		// checking last insert ID for normal statement
1347 		Statement stmt4 = conn.createStatement();
1348 		scope(exit) stmt4.close();
1349 		Variant newId2;
1350 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1351 		//writeln("Generated insert id = " ~ newId2.toString());
1352 		assert(newId2.get!ulong > 0);
1353 
1354 	}
1355 }
1356 
1357 __gshared static this() {
1358     // register MySQLDriver
1359     import ddbc.common;
1360     DriverFactory.registerDriverFactory("mysql", delegate() { return new MySQLDriver(); });
1361 }
1362 
1363 
1364 } else { // version(USE_MYSQL)
1365     version(unittest) {
1366         immutable bool MYSQL_TESTS_ENABLED = false;
1367     }
1368 }