1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.driver.postgresql.impl.PostgreSQLConnectionFactory; 19 20 import hunt.database.driver.postgresql.impl.codec.PgCodec; 21 import hunt.database.driver.postgresql.impl.PostgreSQLSocketConnection; 22 import hunt.database.driver.postgresql.PostgreSQLConnectOptions; 23 import hunt.database.driver.postgresql.SslMode; 24 25 import hunt.database.base.AsyncResult; 26 import hunt.database.base.Common; 27 import hunt.database.base.impl.Connection; 28 import hunt.database.base.impl.command.CommandResponse; 29 30 import hunt.collection.ArrayList; 31 import hunt.collection.HashMap; 32 import hunt.collection.Map; 33 import hunt.Exceptions; 34 import hunt.io.BufferUtils; 35 import hunt.io.channel; 36 import hunt.logging; 37 import hunt.Object; 38 39 import hunt.net.AbstractConnection; 40 import hunt.net.Connection; 41 import hunt.net.NetClient; 42 import hunt.net.NetClientOptions; 43 import hunt.net.NetUtil; 44 45 46 /** 47 * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> 48 */ 49 class PgConnectionFactory { 50 51 // private ArrayList!NetClient clients; 52 private NetClientOptions _netClientOptions; 53 private bool registerCloseHook; 54 private string host; 55 private int port; 56 private SslMode sslMode; 57 // private TrustOptions trustOptions; 58 private string hostnameVerificationAlgorithm; 59 private string database; 60 private string username; 61 private string password; 62 private Map!(string, string) properties; 63 private bool cachePreparedStatements; 64 private int preparedStatementCacheSize; 65 private int preparedStatementCacheSqlLimit; 66 private int pipeliningLimit; 67 private bool isUsingDomainSocket; 68 // private Closeable hook; 69 70 this(PgConnectOptions options) { 71 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-13T10:17:49+08:00 72 // remove clients 73 // clients = new ArrayList!NetClient(50); 74 _netClientOptions = new NetClientOptions(options); 75 76 // Make sure ssl=false as we will use STARTLS 77 _netClientOptions.setSsl(false); 78 79 this.sslMode = options.getSslMode(); 80 this.hostnameVerificationAlgorithm = _netClientOptions.getHostnameVerificationAlgorithm(); 81 // this.trustOptions = netClientOptions.getTrustOptions(); 82 this.host = options.getHost(); 83 this.port = options.getPort(); 84 this.database = options.getDatabase(); 85 this.username = options.getUser(); 86 this.password = options.getPassword(); 87 this.properties = new HashMap!(string, string)(options.getProperties()); 88 this.cachePreparedStatements = options.getCachePreparedStatements(); 89 this.pipeliningLimit = options.getPipeliningLimit(); 90 this.preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize(); 91 this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit(); 92 this.isUsingDomainSocket = options.isUsingDomainSocket(); 93 } 94 95 // // Called by hook 96 // private void close(AsyncVoidHandler completionHandler) { 97 // close(); 98 // if(completionHandler !is null) { 99 // completionHandler(null); 100 // } 101 // } 102 103 // void close() { 104 // foreach(NetClient client; clients) { 105 // if(client !is null) { 106 // // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-12T10:26:30+08:00 107 // // crashed here 108 // // warning(typeid(cast(Object)client)); 109 // client.close(); 110 // } 111 // } 112 // } 113 114 void connectAndInit(AsyncResultHandler!(DbConnection) completionHandler) { 115 connect( (ar) { 116 if (ar.succeeded()) { 117 PgSocketConnection conn = ar.result(); 118 conn.initialization(); 119 conn.sendStartupMessage(username, password, database, properties, 120 (r) { 121 if(completionHandler !is null) completionHandler(r); 122 } 123 ); 124 } else if(completionHandler !is null) { 125 completionHandler(failedResponse!(DbConnection)(ar.cause())); 126 } else { 127 warning("do nothing"); 128 } 129 }); 130 } 131 132 void connect(AsyncResultHandler!(PgSocketConnection) handler) { 133 doConnect(false, handler); 134 // switch (sslMode) { 135 // case DISABLE: 136 // doConnect(false, handler); 137 // break; 138 // case ALLOW: 139 // doConnect(false, ar -> { 140 // if (ar.succeeded()) { 141 // handler.handle(Future.succeededFuture(ar.result())); 142 // } else { 143 // doConnect(true, handler); 144 // } 145 // }); 146 // break; 147 // case PREFER: 148 // doConnect(true, ar -> { 149 // if (ar.succeeded()) { 150 // handler.handle(Future.succeededFuture(ar.result())); 151 // } else { 152 // doConnect(false, handler); 153 // } 154 // }); 155 // break; 156 // case VERIFY_FULL: 157 // if (hostnameVerificationAlgorithm is null || hostnameVerificationAlgorithm.isEmpty()) { 158 // handler.handle(Future.failedFuture(new IllegalArgumentException("Host verification algorithm must be specified under verify-full sslmode"))); 159 // return; 160 // } 161 // case VERIFY_CA: 162 // if (trustOptions is null) { 163 // handler.handle(Future.failedFuture(new IllegalArgumentException("Trust options must be specified under verify-full or verify-ca sslmode"))); 164 // return; 165 // } 166 // case REQUIRE: 167 // doConnect(true, handler); 168 // break; 169 // default: 170 // throw new IllegalArgumentException("Unsupported SSL mode"); 171 // } 172 } 173 174 private void doConnect(bool ssl, AsyncResultHandler!(PgSocketConnection) handler) { 175 176 version(HUNT_DB_DEBUG) tracef("Creating a DB connection in %s...", _netClientOptions.getConnectTimeout); 177 178 auto client = NetUtil.createNetClient(_netClientOptions); 179 180 client.setHandler(new class NetConnectionHandler { 181 182 PgSocketConnection pgConn; 183 184 override void connectionOpened(Connection connection) { 185 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress()); 186 AbstractConnection ac = cast(AbstractConnection)connection; 187 ac.setState(ConnectionState.Opened); 188 189 pgConn = newSocketConnection(ac); 190 if(handler !is null) { 191 try { 192 handler(succeededResult(pgConn)); 193 } catch(Throwable ex) { 194 version(HUNT_DB_DEBUG_MORE) warning(ex); 195 handler(failedResult!(PgSocketConnection)(ex)); 196 } 197 } 198 } 199 200 override void connectionClosed(Connection connection) { 201 version(HUNT_DB_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress()); 202 if(pgConn !is null) 203 pgConn.handleClosed(connection); 204 } 205 206 override DataHandleStatus messageReceived(Connection connection, Object message) { 207 DataHandleStatus resultStatus = DataHandleStatus.Done; 208 version(HUNT_DB_DEBUG_MORE) tracef("message type: %s", typeid(message).name); 209 try { 210 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-01-26T14:46:35+08:00 211 // 212 pgConn.handleMessage(connection, message); 213 } catch(Throwable t) { 214 exceptionCaught(connection, t); 215 } 216 217 return resultStatus; 218 } 219 220 override void exceptionCaught(Connection connection, Throwable t) { 221 version(HUNT_DEBUG) warning(t.msg); 222 version(HUNT_DB_DEBUG) warning(t); 223 if(pgConn !is null) { 224 pgConn.handleException(connection, t); 225 } 226 if(handler !is null) { 227 handler(failedResult!(PgSocketConnection)(t)); 228 } 229 } 230 231 override void failedOpeningConnection(int connectionId, Throwable t) { 232 warning(t); 233 234 handler(failedResult!(PgSocketConnection)(t)); 235 client.close(); 236 } 237 238 override void failedAcceptingConnection(int connectionId, Throwable t) { 239 warning(t); 240 handler(failedResult!(PgSocketConnection)(t)); 241 } 242 }); 243 244 245 version(HUNT_DB_DEBUG) { 246 trace("Setting PostgreSQL codec"); 247 } 248 client.setCodec(new PgCodec()); 249 250 try { 251 client.connect(host, port); 252 // clients.add(client); 253 } catch (Throwable e) { 254 // Client is closed 255 version(HUNT_DEBUG) { 256 warning(e.message); 257 } else { 258 warning(e); 259 } 260 261 if(handler !is null) { 262 handler(failedResult!PgSocketConnection(e)); 263 } 264 } 265 } 266 267 private PgSocketConnection newSocketConnection(AbstractConnection socket) { 268 return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, 269 preparedStatementCacheSqlLimit, pipeliningLimit); 270 } 271 }