1 /**
2  * DDBC - D DataBase Connector - abstraction layer for RDBMS access, with interface similar to JDBC. 
3  * 
4  * Source file ddbc/drivers/mysqlddbc.d.
5  *
6  * DDBC library attempts to provide implementation independent interface to different databases.
7  * 
8  * Set of supported RDBMSs can be extended by writing Drivers for particular DBs.
9  * Currently it only includes MySQL driver.
10  * 
11  * JDBC documentation can be found here:
12  * $(LINK http://docs.oracle.com/javase/1.5.0/docs/api/java/sql/package-summary.html)$(BR)
13  *
14  * This module contains implementation of MySQL Driver which uses patched version of 
15  * MYSQLN (native D implementation of MySQL connector, written by Steve Teale)
16  * 
17  * Current version of driver implements only unidirectional readonly resultset, which with fetching full result to memory on creation. 
18  *
19  * You can find usage examples in unittest{} sections.
20  *
21  * Copyright: Copyright 2013
22  * License:   $(LINK www.boost.org/LICENSE_1_0.txt, Boost License 1.0).
23  * Author:   Vadim Lopatin
24  */
25 module ddbc.drivers.mysqlddbc;
26 
27 import std.algorithm;
28 import std.conv : to;
29 import std.datetime;
30 import std.exception;
31 import std.stdio;
32 import std.string;
33 import std.variant;
34 import core.sync.mutex;
35 import ddbc.common;
36 import ddbc.core;
37 
38 version(USE_MYSQL) {
39 
40 import mysql.connection;
41 import mysql.commands;// : Command;
42 import mysql.prepared;
43 import mysql.protocol.constants;
44 import mysql.protocol.packets : FieldDescription, ParamDescription;
45 
46 version(unittest) {
47     /*
48         To allow unit tests using MySQL server,
49         run mysql client using admin privileges, e.g. for MySQL server on localhost:
50         > mysql -uroot
51 
52         Create test user and test DB:
53         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'%' IDENTIFIED BY 'testpassword';
54         mysql> GRANT ALL PRIVILEGES ON *.* TO testuser@'localhost' IDENTIFIED BY 'testpassword';
55         mysql> CREATE DATABASE testdb;
56      */
57     /// change to false to disable tests on real MySQL server
58     immutable bool MYSQL_TESTS_ENABLED = true;
59     /// change parameters if necessary
60     const string MYSQL_UNITTEST_HOST = "localhost";
61     const int    MYSQL_UNITTEST_PORT = 3306;
62     const string MYSQL_UNITTEST_USER = "testuser";
63     const string MYSQL_UNITTEST_PASSWORD = "testpassword";
64     const string MYSQL_UNITTEST_DB = "testdb";
65 
66     static if (MYSQL_TESTS_ENABLED) {
67         /// use this data source for tests
68         
69         DataSource createUnitTestMySQLDataSource() {
70             string url = makeDDBCUrl("mysql", MYSQL_UNITTEST_HOST, MYSQL_UNITTEST_PORT, MYSQL_UNITTEST_DB);
71             string[string] params;
72             setUserAndPassword(params, MYSQL_UNITTEST_USER, MYSQL_UNITTEST_PASSWORD);
73             return createConnectionPool(url, params);
74         }
75     }
76 }
77 
78 SqlType fromMySQLType(int t) {
79 	switch(t) {
80 	case SQLType.DECIMAL:
81 		case SQLType.TINY: return SqlType.TINYINT;
82 		case SQLType.SHORT: return SqlType.SMALLINT;
83 		case SQLType.INT: return SqlType.INTEGER;
84 		case SQLType.FLOAT: return SqlType.FLOAT;
85 		case SQLType.DOUBLE: return SqlType.DOUBLE;
86 		case SQLType.NULL: return SqlType.NULL;
87 		case SQLType.TIMESTAMP: return SqlType.DATETIME;
88 		case SQLType.LONGLONG: return SqlType.BIGINT;
89 		case SQLType.INT24: return SqlType.INTEGER;
90 		case SQLType.DATE: return SqlType.DATE;
91 		case SQLType.TIME: return SqlType.TIME;
92 		case SQLType.DATETIME: return SqlType.DATETIME;
93 		case SQLType.YEAR: return SqlType.SMALLINT;
94 		case SQLType.NEWDATE: return SqlType.DATE;
95 		case SQLType.VARCHAR: return SqlType.VARCHAR;
96 		case SQLType.BIT: return SqlType.BIT;
97 		case SQLType.NEWDECIMAL: return SqlType.DECIMAL;
98 		case SQLType.ENUM: return SqlType.OTHER;
99 		case SQLType.SET: return SqlType.OTHER;
100 		case SQLType.TINYBLOB: return SqlType.BLOB;
101 		case SQLType.MEDIUMBLOB: return SqlType.BLOB;
102 		case SQLType.LONGBLOB: return SqlType.BLOB;
103 		case SQLType.BLOB: return SqlType.BLOB;
104 		case SQLType.VARSTRING: return SqlType.VARCHAR;
105 		case SQLType.STRING: return SqlType.VARCHAR;
106 		case SQLType.GEOMETRY: return SqlType.OTHER;
107 		default: return SqlType.OTHER;
108 	}
109 }
110 
111 class MySQLConnection : ddbc.core.Connection {
112 private:
113     string url;
114     string[string] params;
115     string dbName;
116     string username;
117     string password;
118     string hostname;
119     int port = 3306;
120     mysql.connection.Connection conn;
121     bool closed;
122     bool autocommit;
123     Mutex mutex;
124 
125 
126 	MySQLStatement [] activeStatements;
127 
128 	void closeUnclosedStatements() {
129 		MySQLStatement [] list = activeStatements.dup;
130 		foreach(stmt; list) {
131 			stmt.close();
132 		}
133 	}
134 
135 	void checkClosed() {
136 		if (closed)
137 			throw new SQLException("Connection is already closed");
138 	}
139 
140 public:
141 
142     void lock() {
143         mutex.lock();
144     }
145 
146     void unlock() {
147         mutex.unlock();
148     }
149 
150     mysql.connection.Connection getConnection() { return conn; }
151 
152 
153 	void onStatementClosed(MySQLStatement stmt) {
154         myRemove(activeStatements, stmt);
155 	}
156 
157     this(string url, string[string] params) {
158         //writeln("MySQLConnection() creating connection");
159         mutex = new Mutex();
160         this.url = url;
161         this.params = params;
162         try {
163             //writeln("parsing url " ~ url);
164             extractParamsFromURL(url, this.params);
165             string dbName = "";
166     		ptrdiff_t firstSlashes = std..string.indexOf(url, "//");
167     		ptrdiff_t lastSlash = std..string.lastIndexOf(url, '/');
168     		ptrdiff_t hostNameStart = firstSlashes >= 0 ? firstSlashes + 2 : 0;
169     		ptrdiff_t hostNameEnd = lastSlash >=0 && lastSlash > firstSlashes + 1 ? lastSlash : url.length;
170             if (hostNameEnd < url.length - 1) {
171                 dbName = url[hostNameEnd + 1 .. $];
172             }
173             hostname = url[hostNameStart..hostNameEnd];
174             if (hostname.length == 0)
175                 hostname = "localhost";
176     		ptrdiff_t portDelimiter = std..string.indexOf(hostname, ":");
177             if (portDelimiter >= 0) {
178                 string portString = hostname[portDelimiter + 1 .. $];
179                 hostname = hostname[0 .. portDelimiter];
180                 if (portString.length > 0)
181                     port = to!int(portString);
182                 if (port < 1 || port > 65535)
183                     port = 3306;
184             }
185             if ("user" in this.params)
186                 username = this.params["user"];
187             if ("password" in this.params)
188                 password = this.params["password"];
189 
190             //writeln("host " ~ hostname ~ " : " ~ to!string(port) ~ " db=" ~ dbName ~ " user=" ~ username ~ " pass=" ~ password);
191 
192             conn = new mysql.connection.Connection(hostname, username, password, dbName, cast(ushort)port);
193             closed = false;
194             setAutoCommit(true);
195         } catch (Throwable e) {
196             //writeln(e.msg);
197             throw new SQLException(e);
198         }
199 
200         //writeln("MySQLConnection() connection created");
201     }
202     override void close() {
203 		checkClosed();
204 
205         lock();
206         scope(exit) unlock();
207         try {
208             closeUnclosedStatements();
209 
210             conn.close();
211             closed = true;
212         } catch (Throwable e) {
213             throw new SQLException(e);
214         }
215     }
216     override void commit() {
217         checkClosed();
218 
219         lock();
220         scope(exit) unlock();
221 
222         try {
223             Statement stmt = createStatement();
224             scope(exit) stmt.close();
225             stmt.executeUpdate("COMMIT");
226         } catch (Throwable e) {
227             throw new SQLException(e);
228         }
229     }
230     override Statement createStatement() {
231         checkClosed();
232 
233         lock();
234         scope(exit) unlock();
235 
236         try {
237             MySQLStatement stmt = new MySQLStatement(this);
238     		activeStatements ~= stmt;
239             return stmt;
240         } catch (Throwable e) {
241             throw new SQLException(e);
242         }
243     }
244 
245     PreparedStatement prepareStatement(string sql) {
246         checkClosed();
247 
248         lock();
249         scope(exit) unlock();
250 
251         try {
252             MySQLPreparedStatement stmt = new MySQLPreparedStatement(this, sql);
253             activeStatements ~= stmt;
254             return stmt;
255         } catch (Throwable e) {
256             throw new SQLException(e.msg ~ " while execution of query " ~ sql);
257         }
258     }
259 
260     override string getCatalog() {
261         return dbName;
262     }
263 
264     /// Sets the given catalog name in order to select a subspace of this Connection object's database in which to work.
265     override void setCatalog(string catalog) {
266         checkClosed();
267         if (dbName == catalog)
268             return;
269 
270         lock();
271         scope(exit) unlock();
272 
273         try {
274             conn.selectDB(catalog);
275             dbName = catalog;
276         } catch (Throwable e) {
277             throw new SQLException(e);
278         }
279     }
280 
281     override bool isClosed() {
282         return closed;
283     }
284 
285     override void rollback() {
286         checkClosed();
287 
288         lock();
289         scope(exit) unlock();
290 
291         try {
292             Statement stmt = createStatement();
293             scope(exit) stmt.close();
294             stmt.executeUpdate("ROLLBACK");
295         } catch (Throwable e) {
296             throw new SQLException(e);
297         }
298     }
299     override bool getAutoCommit() {
300         return autocommit;
301     }
302     override void setAutoCommit(bool autoCommit) {
303         checkClosed();
304         if (this.autocommit == autoCommit)
305             return;
306         lock();
307         scope(exit) unlock();
308 
309         try {
310             Statement stmt = createStatement();
311             scope(exit) stmt.close();
312             stmt.executeUpdate("SET autocommit=" ~ (autoCommit ? "1" : "0"));
313             this.autocommit = autoCommit;
314         } catch (Throwable e) {
315             throw new SQLException(e);
316         }
317     }
318 }
319 
320 class MySQLStatement : Statement {
321 private:
322     MySQLConnection conn;
323     Command * cmd;
324     mysql.connection.ResultSet rs;
325 	MySQLResultSet resultSet;
326 
327     bool closed;
328 
329 public:
330     void checkClosed() {
331         enforceEx!SQLException(!closed, "Statement is already closed");
332     }
333 
334     void lock() {
335         conn.lock();
336     }
337     
338     void unlock() {
339         conn.unlock();
340     }
341 
342     this(MySQLConnection conn) {
343         this.conn = conn;
344     }
345 
346     ResultSetMetaData createMetadata(FieldDescription[] fields) {
347         ColumnMetadataItem[] res = new ColumnMetadataItem[fields.length];
348         foreach(i, field; fields) {
349             ColumnMetadataItem item = new ColumnMetadataItem();
350             item.schemaName = field.db;
351             item.name = field.originalName;
352             item.label = field.name;
353             item.precision = field.length;
354             item.scale = field.scale;
355             item.isNullable = !field.notNull;
356             item.isSigned = !field.unsigned;
357 			item.type = fromMySQLType(field.type);
358             // TODO: fill more params
359             res[i] = item;
360         }
361         return new ResultSetMetaDataImpl(res);
362     }
363     ParameterMetaData createMetadata(ParamDescription[] fields) {
364         ParameterMetaDataItem[] res = new ParameterMetaDataItem[fields.length];
365         foreach(i, field; fields) {
366             ParameterMetaDataItem item = new ParameterMetaDataItem();
367             item.precision = field.length;
368             item.scale = field.scale;
369             item.isNullable = !field.notNull;
370             item.isSigned = !field.unsigned;
371 			item.type = fromMySQLType(field.type);
372 			// TODO: fill more params
373             res[i] = item;
374         }
375         return new ParameterMetaDataImpl(res);
376     }
377 public:
378     MySQLConnection getConnection() {
379         checkClosed();
380         return conn;
381     }
382     override ddbc.core.ResultSet executeQuery(string query) {
383         checkClosed();
384         lock();
385         scope(exit) unlock();
386 		try {
387 	        rs = querySet(conn.getConnection(), query);
388     	    resultSet = new MySQLResultSet(this, rs, createMetadata(conn.getConnection().resultFieldDescriptions));
389         	return resultSet;
390 		} catch (Throwable e) {
391             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
392         }
393 	}
394     override int executeUpdate(string query) {
395         checkClosed();
396         lock();
397         scope(exit) unlock();
398 		ulong rowsAffected = 0;
399 		try {
400 			rowsAffected = exec(conn.getConnection(), query);
401 	        return cast(int)rowsAffected;
402 		} catch (Throwable e) {
403 			throw new SQLException(e.msg ~ " - while execution of query " ~ query);
404 		}
405     }
406 	override int executeUpdate(string query, out Variant insertId) {
407 		checkClosed();
408 		lock();
409 		scope(exit) unlock();
410         try {
411     		ulong rowsAffected = exec(conn.getConnection(), query);
412     		insertId = Variant(conn.getConnection().lastInsertID);
413     		return cast(int)rowsAffected;
414         } catch (Throwable e) {
415             throw new SQLException(e.msg ~ " - while execution of query " ~ query);
416         }
417 	}
418 	override void close() {
419         checkClosed();
420         lock();
421         scope(exit) unlock();
422         try {
423             closeResultSet();
424             closed = true;
425             conn.onStatementClosed(this);
426         } catch (Throwable e) {
427             throw new SQLException(e);
428         }
429     }
430     void closeResultSet() {
431         if (cmd == null) {
432             return;
433         }
434         Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
435         p.release();
436 
437         cmd.destroy();
438         cmd = null;
439 		if (resultSet !is null) {
440 			resultSet.onStatementClosed();
441 			resultSet = null;
442 		}
443     }
444 }
445 
446 class MySQLPreparedStatement : MySQLStatement, PreparedStatement {
447     string query;
448     int paramCount;
449     ResultSetMetaData metadata;
450     ParameterMetaData paramMetadata;
451     this(MySQLConnection conn, string query) {
452         super(conn);
453         this.query = query;
454         try {
455             auto p = prepare(conn.getConnection(), query);
456             paramCount = p.numArgs;
457         } catch (Throwable e) {
458             throw new SQLException(e);
459         }
460     }
461     void checkIndex(int index) {
462         if (index < 1 || index > paramCount)
463             throw new SQLException("Parameter index " ~ to!string(index) ~ " is out of range");
464     }
465     Variant getParam(int index) {
466         checkIndex(index);
467         Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
468         return p.getArg( cast(ushort)(index - 1) );
469     }
470 public:
471 
472     /// Retrieves a ResultSetMetaData object that contains information about the columns of the ResultSet object that will be returned when this PreparedStatement object is executed.
473     override ResultSetMetaData getMetaData() {
474         checkClosed();
475         lock();
476         scope(exit) unlock();
477         try {
478             if (metadata is null) {
479                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
480                 metadata = createMetadata(p.preparedFieldDescriptions);
481             }
482             return metadata;
483         } catch (Throwable e) {
484             throw new SQLException(e);
485         }
486     }
487 
488     /// Retrieves the number, types and properties of this PreparedStatement object's parameters.
489     override ParameterMetaData getParameterMetaData() {
490         checkClosed();
491         lock();
492         scope(exit) unlock();
493         try {
494             if (paramMetadata is null) {
495                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
496                 paramMetadata = createMetadata(p.preparedParamDescriptions);
497             }
498             return paramMetadata;
499         } catch (Throwable e) {
500             throw new SQLException(e);
501         }
502     }
503 
504     override int executeUpdate() {
505         checkClosed();
506         lock();
507         scope(exit) unlock();
508         try {
509             ulong rowsAffected = 0;
510             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
511             rowsAffected = p.exec();
512             return cast(int)rowsAffected;
513         } catch (Throwable e) {
514             throw new SQLException(e);
515         }
516     }
517 
518 	override int executeUpdate(out Variant insertId) {
519 		checkClosed();
520 		lock();
521 		scope(exit) unlock();
522         try {
523     		ulong rowsAffected = 0;
524     		Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
525     		rowsAffected = p.exec();
526     		insertId = conn.getConnection().lastInsertID;
527     		return cast(int)rowsAffected;
528         } catch (Throwable e) {
529             throw new SQLException(e);
530         }
531 	}
532 
533     override ddbc.core.ResultSet executeQuery() {
534         checkClosed();
535         lock();
536         scope(exit) unlock();
537         try {
538             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
539             rs = p.querySet();
540             resultSet = new MySQLResultSet(this, rs, getMetaData());
541             return resultSet;
542         } catch (Throwable e) {
543             throw new SQLException(e);
544         }
545     }
546     
547     override void clearParameters() {
548         checkClosed();
549         lock();
550         scope(exit) unlock();
551         try {
552             for (int i = 1; i <= paramCount; i++)
553                 setNull(i);
554         } catch (Throwable e) {
555             throw new SQLException(e);
556         }
557     }
558     
559 	override void setFloat(int parameterIndex, float x) {
560 		checkClosed();
561 		lock();
562 		scope(exit) unlock();
563         checkIndex(parameterIndex);
564         try {
565     		Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
566     		p.setArg(parameterIndex-1, x);
567         } catch (Throwable e) {
568             throw new SQLException(e);
569         }
570 	}
571 	override void setDouble(int parameterIndex, double x){
572 		checkClosed();
573 		lock();
574 		scope(exit) unlock();
575         checkIndex(parameterIndex);
576         try {
577     		Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
578     		p.setArg(parameterIndex-1, x);
579         } catch (Throwable e) {
580             throw new SQLException(e);
581         }
582 	}
583 	override void setBoolean(int parameterIndex, bool x) {
584         checkClosed();
585         lock();
586         scope(exit) unlock();
587         checkIndex(parameterIndex);
588         try {
589             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
590             p.setArg(parameterIndex-1, x);
591         } catch (Throwable e) {
592             throw new SQLException(e);
593         }
594     }
595     override void setLong(int parameterIndex, long x) {
596         checkClosed();
597         lock();
598         scope(exit) unlock();
599         checkIndex(parameterIndex);
600         try {
601             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
602             p.setArg(parameterIndex-1, x);
603         } catch (Throwable e) {
604             throw new SQLException(e);
605         }
606     }
607     override void setUlong(int parameterIndex, ulong x) {
608         checkClosed();
609         lock();
610         scope(exit) unlock();
611         checkIndex(parameterIndex);
612         try {
613             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
614             p.setArg(parameterIndex-1, x);
615         } catch (Throwable e) {
616             throw new SQLException(e);
617         }
618     }
619     override void setInt(int parameterIndex, int x) {
620         checkClosed();
621         lock();
622         scope(exit) unlock();
623         checkIndex(parameterIndex);
624         try {
625             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
626             p.setArg(parameterIndex-1, x);
627         } catch (Throwable e) {
628             throw new SQLException(e);
629         }
630     }
631     override void setUint(int parameterIndex, uint x) {
632         checkClosed();
633         lock();
634         scope(exit) unlock();
635         checkIndex(parameterIndex);
636         try {
637             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
638             p.setArg(parameterIndex-1, x);
639         } catch (Throwable e) {
640             throw new SQLException(e);
641         }
642     }
643     override void setShort(int parameterIndex, short x) {
644         checkClosed();
645         lock();
646         scope(exit) unlock();
647         checkIndex(parameterIndex);
648         try {
649             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
650             p.setArg(parameterIndex-1, x);
651         } catch (Throwable e) {
652             throw new SQLException(e);
653         }
654     }
655     override void setUshort(int parameterIndex, ushort x) {
656         checkClosed();
657         lock();
658         scope(exit) unlock();
659         checkIndex(parameterIndex);
660         try {
661             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
662             p.setArg(parameterIndex-1, x);
663         } catch (Throwable e) {
664             throw new SQLException(e);
665         }
666     }
667     override void setByte(int parameterIndex, byte x) {
668         checkClosed();
669         lock();
670         scope(exit) unlock();
671         checkIndex(parameterIndex);
672         try {
673             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
674             p.setArg(parameterIndex-1, x);
675         } catch (Throwable e) {
676             throw new SQLException(e);
677         }
678     }
679     override void setUbyte(int parameterIndex, ubyte x) {
680         checkClosed();
681         lock();
682         scope(exit) unlock();
683         checkIndex(parameterIndex);
684         try {
685             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
686             p.setArg(parameterIndex-1, x);
687         } catch (Throwable e) {
688             throw new SQLException(e);
689         }
690     }
691     override void setBytes(int parameterIndex, byte[] x) {
692         checkClosed();
693         lock();
694         scope(exit) unlock();
695         checkIndex(parameterIndex);
696         try {
697             if (x.ptr is null) {
698                 setNull(parameterIndex);
699             } else {
700                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
701                 p.setArg(parameterIndex-1, x);
702             }
703         } catch (Throwable e) {
704             throw new SQLException(e);
705         }
706     }
707     override void setUbytes(int parameterIndex, ubyte[] x) {
708         checkClosed();
709         lock();
710         scope(exit) unlock();
711         checkIndex(parameterIndex);
712         try {
713             if (x.ptr is null) {
714                 setNull(parameterIndex);
715             } else {
716                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
717                 p.setArg(parameterIndex-1, x);
718             }
719         } catch (Throwable e) {
720             throw new SQLException(e);
721         }
722     }
723     override void setString(int parameterIndex, string x) {
724         checkClosed();
725         lock();
726         scope(exit) unlock();
727         checkIndex(parameterIndex);
728         try {
729             if (x.ptr is null) {
730                 setNull(parameterIndex);
731             } else {
732                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
733                 p.setArg(parameterIndex-1, x);
734             }
735         } catch (Throwable e) {
736             throw new SQLException(e);
737         }
738     }
739 	override void setDateTime(int parameterIndex, DateTime x) {
740 		checkClosed();
741 		lock();
742 		scope(exit) unlock();
743 		checkIndex(parameterIndex);
744         try {
745 		    Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
746 		    p.setArg(parameterIndex-1, x);
747         } catch (Throwable e) {
748             throw new SQLException(e);
749         }
750 	}
751 	override void setDate(int parameterIndex, Date x) {
752 		checkClosed();
753 		lock();
754 		scope(exit) unlock();
755 		checkIndex(parameterIndex);
756         try {
757     		Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
758     		p.setArg(parameterIndex-1, x);
759         } catch (Throwable e) {
760             throw new SQLException(e);
761         }
762 	}
763 	override void setTime(int parameterIndex, TimeOfDay x) {
764 		checkClosed();
765 		lock();
766 		scope(exit) unlock();
767 		checkIndex(parameterIndex);
768         try {
769 		    Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
770 		    p.setArg(parameterIndex-1, x);
771         } catch (Throwable e) {
772             throw new SQLException(e);
773         }
774 	}
775 	override void setVariant(int parameterIndex, Variant x) {
776         checkClosed();
777         lock();
778         scope(exit) unlock();
779         checkIndex(parameterIndex);
780         try {
781             if (x == null) {
782                 setNull(parameterIndex);
783             } else {
784                 Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
785                 p.setArg(parameterIndex-1, x);
786             }
787         } catch (Throwable e) {
788             throw new SQLException(e);
789         }
790     }
791     override void setNull(int parameterIndex) {
792         checkClosed();
793         lock();
794         scope(exit) unlock();
795         checkIndex(parameterIndex);
796         try {
797             Prepared p = prepare(conn.getConnection(), to!string(cmd.sql));
798             p.setNullArg(parameterIndex-1);
799         } catch (Throwable e) {
800             throw new SQLException(e);
801         }
802     }
803     override void setNull(int parameterIndex, int sqlType) {
804         checkClosed();
805         lock();
806         scope(exit) unlock();
807         try {
808             setNull(parameterIndex);
809         } catch (Throwable e) {
810             throw new SQLException(e);
811         }
812     }
813 }
814 
815 class MySQLResultSet : ResultSetImpl {
816     private MySQLStatement stmt;
817     private mysql.connection.ResultSet rs;
818     ResultSetMetaData metadata;
819     private bool closed;
820     private int currentRowIndex;
821     private int rowCount;
822     private int[string] columnMap;
823     private bool lastIsNull;
824     private int columnCount;
825 
826     Variant getValue(int columnIndex) {
827 		checkClosed();
828         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
829         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
830         lastIsNull = rs[currentRowIndex].isNull(columnIndex - 1);
831 		Variant res;
832 		if (!lastIsNull)
833 		    res = rs[currentRowIndex][columnIndex - 1];
834         return res;
835     }
836 
837 	void checkClosed() {
838 		if (closed)
839 			throw new SQLException("Result set is already closed");
840 	}
841 
842 public:
843 
844     void lock() {
845         stmt.lock();
846     }
847 
848     void unlock() {
849         stmt.unlock();
850     }
851 
852     this(MySQLStatement stmt, mysql.connection.ResultSet resultSet, ResultSetMetaData metadata) {
853         this.stmt = stmt;
854         this.rs = resultSet;
855         this.metadata = metadata;
856         try {
857             closed = false;
858             rowCount = cast(int)rs.length;
859             currentRowIndex = -1;
860 			foreach(key, val; rs.colNameIndicies)
861 				columnMap[key] = cast(int)val;
862     		columnCount = cast(int)rs.colNames.length;
863         } catch (Throwable e) {
864             throw new SQLException(e);
865         }
866     }
867 
868 	void onStatementClosed() {
869 		closed = true;
870 	}
871     string decodeTextBlob(ubyte[] data) {
872         char[] res = new char[data.length];
873         foreach (i, ch; data) {
874             res[i] = cast(char)ch;
875         }
876         return to!string(res);
877     }
878 
879     // ResultSet interface implementation
880 
881     //Retrieves the number, types and properties of this ResultSet object's columns
882     override ResultSetMetaData getMetaData() {
883         checkClosed();
884         lock();
885         scope(exit) unlock();
886         return metadata;
887     }
888 
889     override void close() {
890         checkClosed();
891         lock();
892         scope(exit) unlock();
893         stmt.closeResultSet();
894        	closed = true;
895     }
896     override bool first() {
897 		checkClosed();
898         lock();
899         scope(exit) unlock();
900         currentRowIndex = 0;
901         return currentRowIndex >= 0 && currentRowIndex < rowCount;
902     }
903     override bool isFirst() {
904 		checkClosed();
905         lock();
906         scope(exit) unlock();
907         return rowCount > 0 && currentRowIndex == 0;
908     }
909     override bool isLast() {
910 		checkClosed();
911         lock();
912         scope(exit) unlock();
913         return rowCount > 0 && currentRowIndex == rowCount - 1;
914     }
915     override bool next() {
916 		checkClosed();
917         lock();
918         scope(exit) unlock();
919         if (currentRowIndex + 1 >= rowCount)
920             return false;
921         currentRowIndex++;
922         return true;
923     }
924     
925     override int findColumn(string columnName) {
926 		checkClosed();
927         lock();
928         scope(exit) unlock();
929         int * p = (columnName in columnMap);
930         if (!p)
931             throw new SQLException("Column " ~ columnName ~ " not found");
932         return *p + 1;
933     }
934 
935     override bool getBoolean(int columnIndex) {
936         checkClosed();
937         lock();
938         scope(exit) unlock();
939         Variant v = getValue(columnIndex);
940         if (lastIsNull)
941             return false;
942         if (v.convertsTo!(bool))
943             return v.get!(bool);
944         if (v.convertsTo!(int))
945             return v.get!(int) != 0;
946         if (v.convertsTo!(long))
947             return v.get!(long) != 0;
948         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to boolean");
949     }
950     override ubyte getUbyte(int columnIndex) {
951         checkClosed();
952         lock();
953         scope(exit) unlock();
954         Variant v = getValue(columnIndex);
955         if (lastIsNull)
956             return 0;
957         if (v.convertsTo!(ubyte))
958             return v.get!(ubyte);
959         if (v.convertsTo!(long))
960             return to!ubyte(v.get!(long));
961         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte");
962     }
963     override byte getByte(int columnIndex) {
964         checkClosed();
965         lock();
966         scope(exit) unlock();
967         Variant v = getValue(columnIndex);
968         if (lastIsNull)
969             return 0;
970         if (v.convertsTo!(byte))
971             return v.get!(byte);
972         if (v.convertsTo!(long))
973             return to!byte(v.get!(long));
974         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte");
975     }
976     override short getShort(int columnIndex) {
977         checkClosed();
978         lock();
979         scope(exit) unlock();
980         Variant v = getValue(columnIndex);
981         if (lastIsNull)
982             return 0;
983         if (v.convertsTo!(short))
984             return v.get!(short);
985         if (v.convertsTo!(long))
986             return to!short(v.get!(long));
987         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to short");
988     }
989     override ushort getUshort(int columnIndex) {
990         checkClosed();
991         lock();
992         scope(exit) unlock();
993         Variant v = getValue(columnIndex);
994         if (lastIsNull)
995             return 0;
996         if (v.convertsTo!(ushort))
997             return v.get!(ushort);
998         if (v.convertsTo!(long))
999             return to!ushort(v.get!(long));
1000         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ushort");
1001     }
1002     override int getInt(int columnIndex) {
1003         checkClosed();
1004         lock();
1005         scope(exit) unlock();
1006         Variant v = getValue(columnIndex);
1007         if (lastIsNull)
1008             return 0;
1009         if (v.convertsTo!(int))
1010             return v.get!(int);
1011         if (v.convertsTo!(long))
1012             return to!int(v.get!(long));
1013         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to int");
1014     }
1015     override uint getUint(int columnIndex) {
1016         checkClosed();
1017         lock();
1018         scope(exit) unlock();
1019         Variant v = getValue(columnIndex);
1020         if (lastIsNull)
1021             return 0;
1022         if (v.convertsTo!(uint))
1023             return v.get!(uint);
1024         if (v.convertsTo!(ulong))
1025             return to!int(v.get!(ulong));
1026         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to uint");
1027     }
1028     override long getLong(int columnIndex) {
1029         checkClosed();
1030         lock();
1031         scope(exit) unlock();
1032         Variant v = getValue(columnIndex);
1033         if (lastIsNull)
1034             return 0;
1035         if (v.convertsTo!(long))
1036             return v.get!(long);
1037         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to long");
1038     }
1039     override ulong getUlong(int columnIndex) {
1040         checkClosed();
1041         lock();
1042         scope(exit) unlock();
1043         Variant v = getValue(columnIndex);
1044         if (lastIsNull)
1045             return 0;
1046         if (v.convertsTo!(ulong))
1047             return v.get!(ulong);
1048         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ulong");
1049     }
1050     override double getDouble(int columnIndex) {
1051         checkClosed();
1052         lock();
1053         scope(exit) unlock();
1054         Variant v = getValue(columnIndex);
1055         if (lastIsNull)
1056             return 0;
1057         if (v.convertsTo!(double))
1058             return v.get!(double);
1059         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to double");
1060     }
1061     override float getFloat(int columnIndex) {
1062         checkClosed();
1063         lock();
1064         scope(exit) unlock();
1065         Variant v = getValue(columnIndex);
1066         if (lastIsNull)
1067             return 0;
1068         if (v.convertsTo!(float))
1069             return v.get!(float);
1070         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to float");
1071     }
1072     override byte[] getBytes(int columnIndex) {
1073         checkClosed();
1074         lock();
1075         scope(exit) unlock();
1076         Variant v = getValue(columnIndex);
1077         if (lastIsNull)
1078             return null;
1079         if (v.convertsTo!(byte[])) {
1080             return v.get!(byte[]);
1081         }
1082         throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to byte[]");
1083     }
1084 	override ubyte[] getUbytes(int columnIndex) {
1085 		checkClosed();
1086 		lock();
1087 		scope(exit) unlock();
1088 		Variant v = getValue(columnIndex);
1089 		if (lastIsNull)
1090 			return null;
1091 		if (v.convertsTo!(ubyte[])) {
1092 			return v.get!(ubyte[]);
1093 		}
1094 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to ubyte[]");
1095 	}
1096 	override string getString(int columnIndex) {
1097         checkClosed();
1098         lock();
1099         scope(exit) unlock();
1100         Variant v = getValue(columnIndex);
1101         if (lastIsNull)
1102             return null;
1103 		if (v.convertsTo!(ubyte[])) {
1104 			// assume blob encoding is utf-8
1105 			// TODO: check field encoding
1106             return decodeTextBlob(v.get!(ubyte[]));
1107 		}
1108         return v.toString();
1109     }
1110 	override std.datetime.DateTime getDateTime(int columnIndex) {
1111 		checkClosed();
1112 		lock();
1113 		scope(exit) unlock();
1114 		Variant v = getValue(columnIndex);
1115 		if (lastIsNull)
1116 			return DateTime();
1117 		if (v.convertsTo!(DateTime)) {
1118 			return v.get!DateTime();
1119 		}
1120 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to DateTime");
1121 	}
1122 	override std.datetime.Date getDate(int columnIndex) {
1123 		checkClosed();
1124 		lock();
1125 		scope(exit) unlock();
1126 		Variant v = getValue(columnIndex);
1127 		if (lastIsNull)
1128 			return Date();
1129 		if (v.convertsTo!(Date)) {
1130 			return v.get!Date();
1131 		}
1132 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to Date");
1133 	}
1134 	override std.datetime.TimeOfDay getTime(int columnIndex) {
1135 		checkClosed();
1136 		lock();
1137 		scope(exit) unlock();
1138 		Variant v = getValue(columnIndex);
1139 		if (lastIsNull)
1140 			return TimeOfDay();
1141 		if (v.convertsTo!(TimeOfDay)) {
1142 			return v.get!TimeOfDay();
1143 		}
1144 		throw new SQLException("Cannot convert field " ~ to!string(columnIndex) ~ " to TimeOfDay");
1145 	}
1146 
1147     override Variant getVariant(int columnIndex) {
1148         checkClosed();
1149         lock();
1150         scope(exit) unlock();
1151         Variant v = getValue(columnIndex);
1152         if (lastIsNull) {
1153             Variant vnull = null;
1154             return vnull;
1155         }
1156         return v;
1157     }
1158     override bool wasNull() {
1159         checkClosed();
1160         lock();
1161         scope(exit) unlock();
1162         return lastIsNull;
1163     }
1164     override bool isNull(int columnIndex) {
1165         checkClosed();
1166         lock();
1167         scope(exit) unlock();
1168         enforceEx!SQLException(columnIndex >= 1 && columnIndex <= columnCount, "Column index out of bounds: " ~ to!string(columnIndex));
1169         enforceEx!SQLException(currentRowIndex >= 0 && currentRowIndex < rowCount, "No current row in result set");
1170         return rs[currentRowIndex].isNull(columnIndex - 1);
1171     }
1172 
1173     //Retrieves the Statement object that produced this ResultSet object.
1174     override Statement getStatement() {
1175         checkClosed();
1176         lock();
1177         scope(exit) unlock();
1178         return stmt;
1179     }
1180 
1181     //Retrieves the current row number
1182     override int getRow() {
1183         checkClosed();
1184         lock();
1185         scope(exit) unlock();
1186         if (currentRowIndex <0 || currentRowIndex >= rowCount)
1187             return 0;
1188         return currentRowIndex + 1;
1189     }
1190 
1191     //Retrieves the fetch size for this ResultSet object.
1192     override int getFetchSize() {
1193         checkClosed();
1194         lock();
1195         scope(exit) unlock();
1196         return rowCount;
1197     }
1198 }
1199 
1200 // sample URL:
1201 // mysql://localhost:3306/DatabaseName
1202 class MySQLDriver : Driver {
1203     // helper function
1204     public static string generateUrl(string host, ushort port, string dbname) {
1205         return "mysql://" ~ host ~ ":" ~ to!string(port) ~ "/" ~ dbname;
1206     }
1207 	public static string[string] setUserAndPassword(string username, string password) {
1208 		string[string] params;
1209         params["user"] = username;
1210         params["password"] = password;
1211 		return params;
1212     }
1213     override ddbc.core.Connection connect(string url, string[string] params) {
1214         //writeln("MySQLDriver.connect " ~ url);
1215         return new MySQLConnection(url, params);
1216     }
1217 }
1218 
1219 unittest {
1220     static if (MYSQL_TESTS_ENABLED) {
1221 
1222         DataSource ds = createUnitTestMySQLDataSource();
1223 
1224         auto conn = ds.getConnection();
1225         scope(exit) conn.close();
1226         auto stmt = conn.createStatement();
1227         scope(exit) stmt.close();
1228 
1229         assert(stmt.executeUpdate("DROP TABLE IF EXISTS ddbct1") == 0);
1230         assert(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS ddbct1 (id bigint not null primary key AUTO_INCREMENT, name varchar(250), comment mediumtext, ts datetime)") == 0);
1231         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=1, name='name1', comment='comment for line 1', ts='20130202123025'") == 1);
1232         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=2, name='name2', comment='comment for line 2 - can be very long'") == 1);
1233         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=3, name='name3', comment='this is line 3'") == 1);
1234         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=4, name='name4', comment=NULL") == 1);
1235         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=5, name=NULL, comment=''") == 1);
1236         assert(stmt.executeUpdate("INSERT INTO ddbct1 SET id=6, name='', comment=NULL") == 1);
1237         assert(stmt.executeUpdate("UPDATE ddbct1 SET name=concat(name, '_x') WHERE id IN (3, 4)") == 2);
1238         
1239         PreparedStatement ps = conn.prepareStatement("UPDATE ddbct1 SET name=? WHERE id=?");
1240         ps.setString(1, null);
1241         ps.setLong(2, 3);
1242         assert(ps.executeUpdate() == 1);
1243         
1244         auto rs = stmt.executeQuery("SELECT id, name name_alias, comment, ts FROM ddbct1 ORDER BY id");
1245 
1246         // testing result set meta data
1247         ResultSetMetaData meta = rs.getMetaData();
1248         assert(meta.getColumnCount() == 4);
1249         assert(meta.getColumnName(1) == "id");
1250         assert(meta.getColumnLabel(1) == "id");
1251         assert(meta.isNullable(1) == false);
1252         assert(meta.isNullable(2) == true);
1253         assert(meta.isNullable(3) == true);
1254         assert(meta.getColumnName(2) == "name");
1255         assert(meta.getColumnLabel(2) == "name_alias");
1256         assert(meta.getColumnName(3) == "comment");
1257 
1258         int rowCount = rs.getFetchSize();
1259         assert(rowCount == 6);
1260         int index = 1;
1261         while (rs.next()) {
1262             assert(!rs.isNull(1));
1263             ubyte[] bytes = rs.getUbytes(3);
1264             int rowIndex = rs.getRow();
1265             assert(rowIndex == index);
1266             long id = rs.getLong(1);
1267             assert(id == index);
1268             //writeln("field2 = '" ~ rs.getString(2) ~ "'");
1269             //writeln("field3 = '" ~ rs.getString(3) ~ "'");
1270             //writeln("wasNull = " ~ to!string(rs.wasNull()));
1271 			if (id == 1) {
1272 				DateTime ts = rs.getDateTime(4);
1273 				assert(ts == DateTime(2013,02,02,12,30,25));
1274 			}
1275 			if (id == 4) {
1276                 assert(rs.getString(2) == "name4_x");
1277                 assert(rs.isNull(3));
1278             }
1279             if (id == 5) {
1280                 assert(rs.isNull(2));
1281                 assert(!rs.isNull(3));
1282             }
1283             if (id == 6) {
1284                 assert(!rs.isNull(2));
1285                 assert(rs.isNull(3));
1286             }
1287             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)) ~ "\t[" ~ to!string(bytes.length) ~ "]");
1288             index++;
1289         }
1290         
1291         PreparedStatement ps2 = conn.prepareStatement("SELECT id, name, comment FROM ddbct1 WHERE id >= ?");
1292 		scope(exit) ps2.close();
1293         ps2.setLong(1, 3);
1294         rs = ps2.executeQuery();
1295         while (rs.next()) {
1296             //writeln(to!string(rs.getLong(1)) ~ "\t" ~ rs.getString(2) ~ "\t" ~ strNull(rs.getString(3)));
1297             index++;
1298         }
1299 
1300 		// checking last insert ID for prepared statement
1301 		PreparedStatement ps3 = conn.prepareStatement("INSERT INTO ddbct1 (name) values ('New String 1')");
1302 		scope(exit) ps3.close();
1303 		Variant newId;
1304 		assert(ps3.executeUpdate(newId) == 1);
1305 		//writeln("Generated insert id = " ~ newId.toString());
1306 		assert(newId.get!ulong > 0);
1307 
1308 		// checking last insert ID for normal statement
1309 		Statement stmt4 = conn.createStatement();
1310 		scope(exit) stmt4.close();
1311 		Variant newId2;
1312 		assert(stmt.executeUpdate("INSERT INTO ddbct1 (name) values ('New String 2')", newId2) == 1);
1313 		//writeln("Generated insert id = " ~ newId2.toString());
1314 		assert(newId2.get!ulong > 0);
1315 
1316 	}
1317 }
1318 
1319 __gshared static this() {
1320     // register MySQLDriver
1321     import ddbc.common;
1322     DriverFactory.registerDriverFactory("mysql", delegate() { return new MySQLDriver(); });
1323 }
1324 
1325 
1326 } else { // version(USE_MYSQL)
1327     version(unittest) {
1328         immutable bool MYSQL_TESTS_ENABLED = false;
1329     }
1330 }