1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.base.impl.PoolBase; 19 20 import hunt.database.base.impl.Connection; 21 import hunt.database.base.impl.SqlClientBase; 22 import hunt.database.base.impl.SqlConnectionImpl; 23 24 import hunt.database.base.Exceptions; 25 import hunt.database.base.PoolOptions; 26 import hunt.database.base.Pool; 27 import hunt.database.base.SqlConnection; 28 import hunt.database.base.Transaction; 29 import hunt.database.base.impl.command.CommandBase; 30 import hunt.database.base.impl.command.CommandResponse; 31 import hunt.database.base.impl.command.CommandScheduler; 32 import hunt.database.base.AsyncResult; 33 34 import hunt.concurrency.Future; 35 import hunt.concurrency.FuturePromise; 36 import hunt.Exceptions; 37 import hunt.Functions; 38 import hunt.logging; 39 import hunt.util.pool; 40 41 import core.atomic; 42 import core.time; 43 44 import std.conv; 45 46 47 alias DbPoolOptions = hunt.database.base.PoolOptions.PoolOptions; 48 alias ObjectPoolOptions = hunt.util.pool.PoolOptions; 49 50 alias DbConnectionPool = ObjectPool!DbConnection; 51 52 /** 53 * 54 */ 55 class DbConnectionFactory : ObjectFactory!(DbConnection) { 56 57 private Consumer!(AsyncDbConnectionHandler) connector; 58 private shared int counter = 0; 59 private DbPoolOptions _options; 60 61 this(DbPoolOptions options) { 62 _options = options; 63 } 64 65 override DbConnection makeObject() { 66 int c = atomicOp!("+=")(counter, 1); 67 string name = "DbFactory-" ~ c.to!string(); 68 version(HUNT_DEBUG) { 69 tracef("Making a DB connection for %s", name); 70 } 71 72 FuturePromise!DbConnection promise = new FuturePromise!DbConnection(name); 73 74 connector( (DbConnectionAsyncResult ar) { 75 76 if (ar.succeeded()) { 77 DbConnection conn = ar.result(); 78 version(HUNT_DEBUG) { 79 infof("A new DB connection %d created", conn.getProcessId()); 80 } 81 promise.succeeded(conn); 82 } else { 83 version(HUNT_DEBUG) { 84 warning(ar.cause()); 85 } 86 87 version(HUNT_DB_DEBUG) warning(ar); 88 89 promise.failed(ar.cause()); 90 } 91 }); 92 93 DbConnection r = promise.get(_options.awaittingTimeout); 94 95 version(HUNT_DEBUG) { 96 infof("DB connection making finished for %s", name); 97 } 98 99 return r; 100 } 101 102 override void destroyObject(DbConnection p) { 103 if(p is null) { 104 warning("The connection is null"); 105 return; 106 } 107 version(HUNT_DB_DEBUG) { 108 tracef("Connection [%d] disconnected: %s", p.getProcessId(), p.isConnected()); 109 } 110 p.close(); 111 } 112 113 override bool isValid(DbConnection p) { 114 if(p is null) { 115 return false; 116 } else { 117 return p.isConnected(); 118 } 119 } 120 } 121 122 /** 123 * Todo : 124 * 125 * - handle timeout when acquiring a connection 126 * - for per statement pooling, have several physical connection and use the less busy one to avoid head of line blocking effect 127 * 128 * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> 129 * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a> 130 */ 131 abstract class PoolBase(P) : SqlClientBase!(P), Pool { // extends PoolBase!(P) 132 133 private DbConnectionPool pool; 134 private DbPoolOptions _options; 135 private shared int _promiseCounter = 0; 136 137 this(DbPoolOptions options) { 138 this._options = options; 139 int maxSize = options.getMaxSize(); 140 if (maxSize < 1) { 141 throw new IllegalArgumentException("Pool max size must be > 0"); 142 } 143 // this.pool = new ConnectionPool(&this.connect, maxSize, options.getMaxWaitQueueSize()); 144 145 146 ObjectPoolOptions opOptions = new ObjectPoolOptions(); 147 opOptions.name = "DbPool"; 148 opOptions.size = options.getMaxSize(); 149 opOptions.maxWaitQueueSize = options.getMaxWaitQueueSize(); 150 151 DbConnectionFactory factory = new DbConnectionFactory(options); 152 factory.connector = &connect; 153 154 pool = new DbConnectionPool(factory, opOptions); 155 } 156 157 abstract void connect(AsyncDbConnectionHandler completionHandler); 158 159 160 int nextPromiseId() { 161 import core.atomic; 162 int c = atomicOp!("+=")(_promiseCounter, 1); 163 return c; 164 } 165 166 SqlConnection getConnection() { 167 // pool.logStatus(); 168 size_t times = 0; 169 Duration dur = _options.awaittingTimeout(); 170 SqlConnection conn = tryToBorrowConnection(dur); 171 172 version(HUNT_DB_DEBUG) { 173 if(conn is null) { 174 throw new DatabaseException("Can't get a valid DB connection."); 175 } 176 } else { 177 while(times < _options.retry() && ((conn is null) || !conn.isConnected())) { 178 times++; 179 warningf("Try to get a connection again, times: %d.", times); 180 181 // Destory the broken connection 182 if(conn !is null) { 183 conn.close(); 184 } 185 186 conn = tryToBorrowConnection(dur); 187 } 188 189 if(times > 0 && times == _options.retry()) { 190 throw new DatabaseException("Can't get a working DB connection."); 191 } 192 } 193 return conn; 194 } 195 196 private SqlConnection tryToBorrowConnection(Duration dur) { 197 SqlConnection conn = null; 198 199 try { 200 // https://github.com/eclipse-vertx/vertx-sql-client/issues/463 201 version(HUNT_DB_DEBUG) tracef("try to get a connection in %s", dur); 202 DbConnection dbConn = pool.borrow(dur); 203 version(HUNT_DB_DEBUG) tracef("Got a DB connection (id=%d)", dbConn.getProcessId()); 204 205 conn = wrap(dbConn); 206 dbConn.initHolder(cast(DbConnection.Holder)conn); 207 208 conn.closeHandler(() { 209 version(HUNT_DB_DEBUG) { 210 tracef("Returning a DB connection (id=%d), %s", 211 dbConn.getProcessId(), (cast(Object)dbConn).toString()); 212 } 213 dbConn.initHolder(null); 214 // The borrowed object must be returned to the pool 215 pool.returnObject(dbConn); 216 217 // if(dbConn.isConnected()) { 218 // pool.returnObject(dbConn); 219 // } else { 220 // warningf("Dropped a closed db connection %s", (cast(Object)dbConn).toString()); 221 // } 222 }); 223 224 225 } catch(Throwable ex) { 226 debug { 227 warning(ex.msg); 228 infof("Failed to borrow. %s", pool.toString()); 229 } 230 231 version(HUNT_DB_DEBUG) { 232 warning(ex); 233 } 234 } 235 236 return conn; 237 } 238 239 Transaction begin() { 240 SqlConnection conn = getConnection(); 241 return conn.begin(true); 242 } 243 244 245 protected abstract SqlConnection wrap(DbConnection conn); 246 247 protected void doClose() { 248 pool.close(); 249 } 250 251 override 252 void close() { 253 doClose(); 254 } 255 256 int available() { return cast(int)pool.getNumActive(); } 257 258 int waiters() { return cast(int)pool.getNumWaiters(); } 259 260 int maxSize() { return cast(int)pool.size(); } 261 262 int size() { return cast(int)pool.size(); } 263 264 override string toString() { 265 return pool.toString(); 266 } 267 }