1 module hunt.database.driver.mysql.impl.MySQLConnectionFactory;
2 
3 import hunt.database.driver.mysql.impl.codec.MySQLCodec;
4 import hunt.database.driver.mysql.impl.MySQLSocketConnection;
5 import hunt.database.driver.mysql.MySQLConnectOptions;
6 
7 import hunt.database.base.AsyncResult;
8 import hunt.database.base.Common;
9 import hunt.database.base.impl.Connection;
10 import hunt.database.base.impl.command.CommandResponse;
11 
12 import hunt.collection.ArrayList;
13 import hunt.collection.HashMap;
14 import hunt.collection.Map;
15 import hunt.Exceptions;
16 import hunt.io.BufferUtils;
17 import hunt.io.channel;
18 import hunt.logging;
19 import hunt.Object;
20 
21 import hunt.net.AbstractConnection;
22 import hunt.net.Connection;
23 import hunt.net.NetClient;
24 import hunt.net.NetClientOptions;
25 import hunt.net.NetUtil;
26 
27 import core.thread;
28 
29 /**
30  * 
31  */
32 class MySQLConnectionFactory {
33     // private ArrayList!NetClient clients;
34     private NetClientOptions _netClientOptions;
35     private string host;
36     private int port;
37     private string username;
38     private string password;
39     private string database;
40     private Map!(string, string) properties;
41     private bool ssl = false;
42     private bool cachePreparedStatements;
43     private int preparedStatementCacheSize;
44     private int preparedStatementCacheSqlLimit;
45 
46     this(MySQLConnectOptions options) {
47         // clients = new ArrayList!NetClient(50);
48 
49         _netClientOptions = new NetClientOptions(options);
50 
51         this.host = options.getHost();
52         this.port = options.getPort();
53         this.username = options.getUser();
54         this.password = options.getPassword();
55         this.database = options.getDatabase();
56         this.properties = new HashMap!(string, string)(options.getProperties());
57         properties.put("collation", options.getCollation());
58         this.cachePreparedStatements = options.getCachePreparedStatements();
59         this.preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize();
60         this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit();
61     }
62 
63     // // Called by hook
64     // private void close(AsyncVoidHandler completionHandler) {
65     //     close();
66     //     if(completionHandler !is null) {
67     //         completionHandler(cast(VoidAsyncResult)null);
68     //     }
69     // }
70 
71     // void close() {
72     //     foreach(client; clients)
73     //         client.close();
74 
75     //     clients.clear();
76     // }
77 
78     void connect(AsyncResultHandler!(DbConnection) completionHandler) {
79         // Promise!(NetSocket) promise = Promise.promise();
80         // promise.future().setHandler(ar1 -> {
81         //     if (ar1.succeeded()) {
82         //         NetSocketInternal socket = (NetSocketInternal) ar1.result();
83         //         MySQLSocketConnection conn = new MySQLSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlLimit, context);
84         //         conn.init();
85         //         conn.sendStartupMessage(username, password, database, properties, handler);
86         //     } else {
87         //         handler.handle(Future.failedFuture(ar1.cause()));
88         //     }
89         // });
90         // netClient.connect(port, host, promise);
91 
92         doConnect(false, (ar) {
93             if (ar.succeeded()) {
94                 MySQLSocketConnection conn = ar.result();
95                 conn.initialization();
96                 import hunt.collection.AbstractMap;
97                 // Map!(string, string) p = (cast(AbstractMap!(string, string))properties).clone();
98                 auto p = cast(Map!(string, string))properties.clone();
99                 conn.sendStartupMessage(username, password, database, p, 
100                     (r) { 
101                         if(completionHandler !is null) completionHandler(r); 
102                     }
103                 );
104             } else if(completionHandler !is null) {
105                 completionHandler(failedResponse!(DbConnection)(ar.cause()));
106             }
107         });        
108     }
109 
110 
111     private void doConnect(bool ssl, AsyncResultHandler!(MySQLSocketConnection) handler) {
112 
113         NetClient netClient = NetUtil.createNetClient(_netClientOptions);
114 
115         netClient.setHandler(new class NetConnectionHandler {
116 
117                 MySQLSocketConnection myConn;
118 
119                 override void connectionOpened(Connection connection) {
120                     version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress());
121 // FIXME: Needing refactor or cleanup -@zhangxueping at 2019-09-25T11:26:40+08:00
122 // 
123                     if(myConn is null) {
124                         myConn = newSocketConnection(cast(AbstractConnection)connection);
125                         if(handler !is null) 
126                             handler(succeededResult(myConn));
127                     } else {
128                         warning("MySQLSocketConnection has been opened already.");
129                     }
130                 }
131 
132                 override void connectionClosed(Connection connection) {
133                     version(HUNT_DEBUG) infof("The DB connection closed, remote: %s", connection.getRemoteAddress());
134                     if(myConn !is null)
135                         myConn.handleClosed(connection);
136                     
137                     // 
138                     // synchronized(this.outer) {
139                     //     clients.remove(netClient);
140                     // }
141                     // destroy(netClient);
142                     // version(HUNT_DB_DEBUG) {
143                     //     infof("Remaining clients: %d, threads: %d", 
144                     //         clients.size(), Thread.getAll().length);
145                     // }
146                 }
147 
148                 override DataHandleStatus messageReceived(Connection connection, Object message) {
149                     DataHandleStatus resultStatus = DataHandleStatus.Done;
150                     version(HUNT_DB_DEBUG_MORE) tracef("message type: %s", typeid(message).name);
151                     if(myConn is null) {
152                         // warningf("Waiting for the MySQLSocketConnection get ready");
153                         version(HUNT_DEBUG) warningf("MySQLSocketConnection is not ready.");
154                         
155                         // import std.stdio;
156                         while(myConn is null) {
157                             // warningf("Waiting for the MySQLSocketConnection get ready...");
158                             // write(".");
159                         }
160                         version(HUNT_DEBUG) warningf("MySQLSocketConnection is ready.");
161                     }
162 
163                     try {
164                         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-01-26T14:45:48+08:00
165                         // 
166                         myConn.handleMessage(connection, message);
167                     } catch(Throwable t) {
168                         exceptionCaught(connection, t);
169                     }
170 
171                     return resultStatus;
172                 }
173 
174                 override void exceptionCaught(Connection connection, Throwable t) {
175                     version(HUNT_DB_DEBUG) warning(t);
176                     else version(HUNT_DEBUG) warning(t.msg);
177                     
178                     if(myConn !is null) {
179                         myConn.handleException(connection, t);
180                     }
181                     if(handler !is null)
182                         handler(failedResult!(MySQLSocketConnection)(t));
183                     
184                     // synchronized(this.outer) {
185                     //     clients.remove(netClient);
186                     // }
187                     destroy(netClient);
188                     // version(HUNT_DB_DEBUG) {
189                     //     infof("Remaining clients: %d, threads: %d", 
190                     //         clients.size(), Thread.getAll().length);
191                     // }
192                 }
193 
194                 override void failedOpeningConnection(int connectionId, Throwable t) {
195                     warning(t);
196 
197                     handler(failedResult!(MySQLSocketConnection)(t));
198                     netClient.close(); 
199                 }
200 
201                 override void failedAcceptingConnection(int connectionId, Throwable t) {
202                     warning(t);
203                     handler(failedResult!(MySQLSocketConnection)(t));
204                 }
205             });
206 
207         version(HUNT_DEBUG) {
208             trace("Setting MySQL codec");
209         }
210         netClient.setCodec(new MySQLCodec());     
211 
212         try {
213             netClient.connect(host, port);
214             // clients.add(netClient);
215         } catch (Throwable e) {
216             // Client is closed
217             version(HUNT_DEBUG) {
218                 warning(e.message);
219             } else {
220                 warning(e);
221             }
222             
223             if(handler !is null)
224                 handler(failedResult!MySQLSocketConnection(e));
225         }
226     }
227 
228     private MySQLSocketConnection newSocketConnection(AbstractConnection socket) {
229         return new MySQLSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, 
230                 preparedStatementCacheSqlLimit);
231     }    
232 }