1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.base.impl.PoolBase;
19 
20 import hunt.database.base.impl.Connection;
21 import hunt.database.base.impl.SqlClientBase;
22 import hunt.database.base.impl.SqlConnectionImpl;
23 
24 import hunt.database.base.Exceptions;
25 import hunt.database.base.PoolOptions;
26 import hunt.database.base.Pool;
27 import hunt.database.base.SqlConnection;
28 import hunt.database.base.Transaction;
29 import hunt.database.base.impl.command.CommandBase;
30 import hunt.database.base.impl.command.CommandResponse;
31 import hunt.database.base.impl.command.CommandScheduler;
32 import hunt.database.base.AsyncResult;
33 
34 import hunt.concurrency.Future;
35 import hunt.concurrency.FuturePromise;
36 import hunt.Exceptions;
37 import hunt.Functions;
38 import hunt.logging;
39 import hunt.util.pool;
40 
41 import core.atomic;
42 import core.time;
43 
44 import std.conv;
45 
46 
47 alias DbPoolOptions = hunt.database.base.PoolOptions.PoolOptions;
48 alias ObjectPoolOptions = hunt.util.pool.PoolOptions;
49 
50 alias DbConnectionPool = ObjectPool!DbConnection;
51 
52 /** 
53  * 
54  */
55 class DbConnectionFactory : ObjectFactory!(DbConnection) {
56 
57     private Consumer!(AsyncDbConnectionHandler) connector;
58     private shared int counter = 0;
59     private DbPoolOptions _options;
60 
61     this(DbPoolOptions options) {
62         _options = options;
63     }
64 
65     override DbConnection makeObject() {
66         int c = atomicOp!("+=")(counter, 1);
67         string name = "DbFactory-" ~ c.to!string();
68         version(HUNT_DEBUG) {
69             tracef("Making a DB connection for %s", name);
70         }
71 
72         FuturePromise!DbConnection promise = new FuturePromise!DbConnection(name);
73 
74         connector( (DbConnectionAsyncResult ar) {
75 
76             if (ar.succeeded()) {
77                 DbConnection conn = ar.result();
78                 version(HUNT_DEBUG) {
79                     infof("A new DB connection %d created", conn.getProcessId());
80                 }
81                 promise.succeeded(conn);
82             } else {
83                 version(HUNT_DEBUG) {
84                     warning(ar.cause());
85                 }
86 
87                 version(HUNT_DB_DEBUG) warning(ar);
88 
89                 promise.failed(ar.cause());
90             }
91         });    
92 
93         DbConnection r = promise.get(_options.awaittingTimeout);
94 
95         version(HUNT_DEBUG) {
96             infof("DB connection making finished for %s", name);
97         }
98 
99         return r;
100     }
101 
102     override void destroyObject(DbConnection p) {
103         if(p is null) {
104             warning("The connection is null");
105             return;
106         }
107         version(HUNT_DB_DEBUG) {
108             tracef("Connection [%d] disconnected: %s", p.getProcessId(), p.isConnected());
109         }
110         p.close();
111     }
112 
113     override bool isValid(DbConnection p) {
114         if(p is null) {
115             return false;
116         } else {
117             return p.isConnected();
118         }
119     }
120 }
121 
122 /**
123  * Todo :
124  *
125  * - handle timeout when acquiring a connection
126  * - for per statement pooling, have several physical connection and use the less busy one to avoid head of line blocking effect
127  *
128  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
129  * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
130  */
131 abstract class PoolBase(P) : SqlClientBase!(P), Pool { //  extends PoolBase!(P)
132 
133     private DbConnectionPool pool;
134     private DbPoolOptions _options;
135     private shared int _promiseCounter = 0;
136 
137     this(DbPoolOptions options) {
138         this._options = options;
139         int maxSize = options.getMaxSize();
140         if (maxSize < 1) {
141             throw new IllegalArgumentException("Pool max size must be > 0");
142         }
143         // this.pool = new ConnectionPool(&this.connect, maxSize, options.getMaxWaitQueueSize());
144 
145         
146         ObjectPoolOptions opOptions = new ObjectPoolOptions();
147         opOptions.name = "DbPool";
148         opOptions.size = options.getMaxSize();
149         opOptions.maxWaitQueueSize = options.getMaxWaitQueueSize();
150 
151         DbConnectionFactory factory = new DbConnectionFactory(options);
152         factory.connector = &connect;
153 
154         pool = new DbConnectionPool(factory, opOptions);
155     }
156 
157     abstract void connect(AsyncDbConnectionHandler completionHandler);
158 
159 
160     int nextPromiseId() {
161         import core.atomic;
162         int c = atomicOp!("+=")(_promiseCounter, 1);
163         return c;
164     }
165 
166     SqlConnection getConnection() {
167         // pool.logStatus();
168         size_t times = 0;
169         Duration dur = _options.awaittingTimeout();
170         SqlConnection conn = tryToBorrowConnection(dur);
171 
172         version(HUNT_DB_DEBUG) {
173             if(conn is null) {
174                 throw new DatabaseException("Can't get a valid DB connection.");
175             }
176         } else {
177             while(times < _options.retry() && ((conn is null) || !conn.isConnected())) {
178                 times++;
179                 warningf("Try to get a connection again, times: %d.", times);
180 
181                 // Destory the broken connection
182                 if(conn !is null) {
183                     conn.close();
184                 }
185 
186                 conn = tryToBorrowConnection(dur);
187             }
188 
189             if(times > 0 && times == _options.retry()) {
190                 throw new DatabaseException("Can't get a working DB connection.");
191             }
192         }
193         return conn;
194     }
195 
196     private SqlConnection tryToBorrowConnection(Duration dur) {
197         SqlConnection conn = null;
198 
199         try {
200             // https://github.com/eclipse-vertx/vertx-sql-client/issues/463
201             version(HUNT_DB_DEBUG) tracef("try to get a connection in %s", dur);
202             DbConnection dbConn =  pool.borrow(dur);
203             version(HUNT_DB_DEBUG) tracef("Got a DB connection (id=%d)", dbConn.getProcessId());
204 
205             conn = wrap(dbConn);
206             dbConn.initHolder(cast(DbConnection.Holder)conn);
207 
208             conn.closeHandler(() {
209                 version(HUNT_DB_DEBUG) {
210                     tracef("Returning a DB connection (id=%d), %s", 
211                         dbConn.getProcessId(), (cast(Object)dbConn).toString());
212                 }
213                 dbConn.initHolder(null);
214                 // The borrowed object must be returned to the pool
215                 pool.returnObject(dbConn);
216 
217                 // if(dbConn.isConnected()) {
218                 //     pool.returnObject(dbConn);
219                 // } else {
220                 //     warningf("Dropped a closed db connection %s", (cast(Object)dbConn).toString());
221                 // }
222             });
223                          
224 
225         } catch(Throwable ex) { 
226             debug {
227                 warning(ex.msg);
228                 infof("Failed to borrow. %s", pool.toString());
229             }
230 
231             version(HUNT_DB_DEBUG) {
232                 warning(ex);
233             }
234         }
235 
236         return conn;
237     }
238 
239     Transaction begin() {
240         SqlConnection conn = getConnection();
241         return conn.begin(true);
242     }
243 
244 
245     protected abstract SqlConnection wrap(DbConnection conn);
246 
247     protected void doClose() {
248         pool.close();
249     }
250 
251     override
252     void close() {
253         doClose();
254     }
255 
256     int available() { return cast(int)pool.getNumActive(); }
257     
258     int waiters() { return cast(int)pool.getNumWaiters(); }
259 
260     int maxSize() { return cast(int)pool.size(); }
261     
262     int size() { return cast(int)pool.size(); }
263 
264     override string toString() {
265         return pool.toString();
266     }
267 }