1 module hunt.database.driver.mysql.impl.MySQLConnectionFactory; 2 3 import hunt.database.driver.mysql.impl.codec.MySQLCodec; 4 import hunt.database.driver.mysql.impl.MySQLSocketConnection; 5 import hunt.database.driver.mysql.MySQLConnectOptions; 6 7 import hunt.database.base.AsyncResult; 8 import hunt.database.base.Common; 9 import hunt.database.base.impl.Connection; 10 import hunt.database.base.impl.command.CommandResponse; 11 12 import hunt.collection.ArrayList; 13 import hunt.collection.HashMap; 14 import hunt.collection.Map; 15 import hunt.Exceptions; 16 import hunt.io.BufferUtils; 17 import hunt.io.channel; 18 import hunt.logging; 19 import hunt.Object; 20 21 import hunt.net.AbstractConnection; 22 import hunt.net.Connection; 23 import hunt.net.NetClient; 24 import hunt.net.NetClientOptions; 25 import hunt.net.NetUtil; 26 27 import core.thread; 28 29 /** 30 * 31 */ 32 class MySQLConnectionFactory { 33 // private ArrayList!NetClient clients; 34 private NetClientOptions _netClientOptions; 35 private string host; 36 private int port; 37 private string username; 38 private string password; 39 private string database; 40 private Map!(string, string) properties; 41 private bool ssl = false; 42 private bool cachePreparedStatements; 43 private int preparedStatementCacheSize; 44 private int preparedStatementCacheSqlLimit; 45 46 this(MySQLConnectOptions options) { 47 // clients = new ArrayList!NetClient(50); 48 49 _netClientOptions = new NetClientOptions(options); 50 51 this.host = options.getHost(); 52 this.port = options.getPort(); 53 this.username = options.getUser(); 54 this.password = options.getPassword(); 55 this.database = options.getDatabase(); 56 this.properties = new HashMap!(string, string)(options.getProperties()); 57 properties.put("collation", options.getCollation()); 58 this.cachePreparedStatements = options.getCachePreparedStatements(); 59 this.preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize(); 60 this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit(); 61 } 62 63 // // Called by hook 64 // private void close(AsyncVoidHandler completionHandler) { 65 // close(); 66 // if(completionHandler !is null) { 67 // completionHandler(cast(VoidAsyncResult)null); 68 // } 69 // } 70 71 // void close() { 72 // foreach(client; clients) 73 // client.close(); 74 75 // clients.clear(); 76 // } 77 78 void connect(AsyncResultHandler!(DbConnection) completionHandler) { 79 // Promise!(NetSocket) promise = Promise.promise(); 80 // promise.future().setHandler(ar1 -> { 81 // if (ar1.succeeded()) { 82 // NetSocketInternal socket = (NetSocketInternal) ar1.result(); 83 // MySQLSocketConnection conn = new MySQLSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlLimit, context); 84 // conn.init(); 85 // conn.sendStartupMessage(username, password, database, properties, handler); 86 // } else { 87 // handler.handle(Future.failedFuture(ar1.cause())); 88 // } 89 // }); 90 // netClient.connect(port, host, promise); 91 92 doConnect(false, (ar) { 93 if (ar.succeeded()) { 94 MySQLSocketConnection conn = ar.result(); 95 conn.initialization(); 96 import hunt.collection.AbstractMap; 97 // Map!(string, string) p = (cast(AbstractMap!(string, string))properties).clone(); 98 auto p = cast(Map!(string, string))properties.clone(); 99 conn.sendStartupMessage(username, password, database, p, 100 (r) { 101 if(completionHandler !is null) completionHandler(r); 102 } 103 ); 104 } else if(completionHandler !is null) { 105 completionHandler(failedResponse!(DbConnection)(ar.cause())); 106 } 107 }); 108 } 109 110 111 private void doConnect(bool ssl, AsyncResultHandler!(MySQLSocketConnection) handler) { 112 113 NetClient netClient = NetUtil.createNetClient(_netClientOptions); 114 115 netClient.setHandler(new class NetConnectionHandler { 116 117 MySQLSocketConnection myConn; 118 119 override void connectionOpened(Connection connection) { 120 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress()); 121 // FIXME: Needing refactor or cleanup -@zhangxueping at 2019-09-25T11:26:40+08:00 122 // 123 if(myConn is null) { 124 myConn = newSocketConnection(cast(AbstractConnection)connection); 125 if(handler !is null) 126 handler(succeededResult(myConn)); 127 } else { 128 warning("MySQLSocketConnection has been opened already."); 129 } 130 } 131 132 override void connectionClosed(Connection connection) { 133 version(HUNT_DEBUG) infof("The DB connection closed, remote: %s", connection.getRemoteAddress()); 134 if(myConn !is null) 135 myConn.handleClosed(connection); 136 137 // 138 // synchronized(this.outer) { 139 // clients.remove(netClient); 140 // } 141 // destroy(netClient); 142 // version(HUNT_DB_DEBUG) { 143 // infof("Remaining clients: %d, threads: %d", 144 // clients.size(), Thread.getAll().length); 145 // } 146 } 147 148 override DataHandleStatus messageReceived(Connection connection, Object message) { 149 DataHandleStatus resultStatus = DataHandleStatus.Done; 150 version(HUNT_DB_DEBUG_MORE) tracef("message type: %s", typeid(message).name); 151 if(myConn is null) { 152 // warningf("Waiting for the MySQLSocketConnection get ready"); 153 version(HUNT_DEBUG) warningf("MySQLSocketConnection is not ready."); 154 155 // import std.stdio; 156 while(myConn is null) { 157 // warningf("Waiting for the MySQLSocketConnection get ready..."); 158 // write("."); 159 } 160 version(HUNT_DEBUG) warningf("MySQLSocketConnection is ready."); 161 } 162 163 try { 164 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-01-26T14:45:48+08:00 165 // 166 myConn.handleMessage(connection, message); 167 } catch(Throwable t) { 168 exceptionCaught(connection, t); 169 } 170 171 return resultStatus; 172 } 173 174 override void exceptionCaught(Connection connection, Throwable t) { 175 version(HUNT_DB_DEBUG) warning(t); 176 else version(HUNT_DEBUG) warning(t.msg); 177 178 if(myConn !is null) { 179 myConn.handleException(connection, t); 180 } 181 if(handler !is null) 182 handler(failedResult!(MySQLSocketConnection)(t)); 183 184 // synchronized(this.outer) { 185 // clients.remove(netClient); 186 // } 187 destroy(netClient); 188 // version(HUNT_DB_DEBUG) { 189 // infof("Remaining clients: %d, threads: %d", 190 // clients.size(), Thread.getAll().length); 191 // } 192 } 193 194 override void failedOpeningConnection(int connectionId, Throwable t) { 195 warning(t); 196 197 handler(failedResult!(MySQLSocketConnection)(t)); 198 netClient.close(); 199 } 200 201 override void failedAcceptingConnection(int connectionId, Throwable t) { 202 warning(t); 203 handler(failedResult!(MySQLSocketConnection)(t)); 204 } 205 }); 206 207 version(HUNT_DEBUG) { 208 trace("Setting MySQL codec"); 209 } 210 netClient.setCodec(new MySQLCodec()); 211 212 try { 213 netClient.connect(host, port); 214 // clients.add(netClient); 215 } catch (Throwable e) { 216 // Client is closed 217 version(HUNT_DEBUG) { 218 warning(e.message); 219 } else { 220 warning(e); 221 } 222 223 if(handler !is null) 224 handler(failedResult!MySQLSocketConnection(e)); 225 } 226 } 227 228 private MySQLSocketConnection newSocketConnection(AbstractConnection socket) { 229 return new MySQLSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, 230 preparedStatementCacheSqlLimit); 231 } 232 }