1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.base.impl.SqlConnectionImpl;
19 
20 import hunt.database.base.impl.Connection;
21 
22 import hunt.database.base.AsyncResult;
23 import hunt.database.base.Common;
24 import hunt.database.base.SqlConnection;
25 import hunt.database.base.impl.command.CommandResponse;
26 import hunt.database.base.impl.command.CommandBase;
27 import hunt.database.base.impl.NamedQueryDesc;
28 import hunt.database.base.impl.SqlConnectionBase;
29 import hunt.database.base.impl.TransactionImpl;
30 import hunt.database.base.PreparedQuery;
31 import hunt.database.base.RowSet;
32 import hunt.database.base.Transaction;
33 import hunt.database.base.Tuple;
34 
35 import hunt.concurrency.Future;
36 import hunt.Exceptions;
37 import hunt.logging;
38 import hunt.net.AbstractConnection;
39 import hunt.Object;
40 
41 import hunt.collection.List;
42 
43 /**
44  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
45  */
46 abstract class SqlConnectionImpl(C) : SqlConnectionBase!(C), SqlConnection, DbConnection.Holder //, 
47 { // if(is(C : SqlConnectionImpl))
48 
49     private ExceptionHandler _exceptionHandler;
50     private VoidHandler _closeHandler;
51     private TransactionImpl tx;
52     private bool _isClosed = false;
53 
54     this(DbConnection conn) {
55         super(conn);
56     }
57 
58     override C query(string sql, RowSetHandler handler) {
59         return super.query(sql, handler);
60     }
61 
62     override C prepare(string sql, PreparedQueryHandler handler) {
63         return super.prepare(sql, handler);
64     }
65 
66     override Future!PreparedQuery prepareAsync(string sql) {
67         return super.prepareAsync(sql);
68     }
69 
70     override PreparedQuery prepare(string sql) {
71         return super.prepare(sql);
72     }
73 
74     override C preparedQuery(string sql, RowSetHandler handler) {
75         return super.preparedQuery(sql, handler);
76     }
77 
78     override C preparedQuery(string sql, Tuple arguments, RowSetHandler handler) {
79         return super.preparedQuery(sql, arguments, handler);
80     }
81 
82     // override C preparedBatch(string sql, List!(Tuple) batch, RowSetHandler handler) {
83     //     return super.preparedBatch(sql, batch, handler);
84     // }
85 
86     // override protected AbstractNamedQueryDesc getNamedQueryDesc(string sql) {
87     //     return super.getNamedQueryDesc(sql);
88     // }
89 
90     // override Future!(NamedQuery) prepareNamedQueryAsync(string sql) {
91     //     return super.prepareNamedQueryAsync(sql);
92     // }
93 
94     // override NamedQuery prepareNamedQuery(string sql) {
95     //     return super.prepareNamedQuery(sql);
96     // }
97 
98     /** 
99      * Handle on connection closing
100      * Params:
101      *   conn = 
102      */
103     void handleClosing() {
104         version(HUNT_DB_DEBUG) { 
105             tracef("The db connection %d closing.", conn.getProcessId());
106         }
107         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-22T11:37:35+08:00
108         // Not thread-safe
109 
110         // _isClosed = true;
111 
112         // Make sure that the binded transaction is completed.
113         if (tx !is null && tx.status() != ST_COMPLETED) {
114             warningf("A transaction is forced to rollback on connection (id=%d)", conn.getProcessId());
115             tx.rollback();
116         }        
117     }
118 
119     /** 
120      * Handle on connection closed
121      */
122     void handleClosed() {
123         if(_isClosed) {
124             version(HUNT_DEBUG) { 
125                 warningf("The db connection %d has been closed already.", conn.getProcessId());
126             }
127             return;
128         }
129         _isClosed = true;
130 
131         version(HUNT_DB_DEBUG) { 
132             warningf("The db connection %d closed.", conn.getProcessId());
133         }
134 
135         VoidHandler handler = _closeHandler;
136         if (handler !is null) {
137             version (HUNT_DB_DEBUG) {
138                 infof("Closing a SQL connection %d with handler...", conn.getProcessId());
139             }
140             handler();
141         }
142     }
143 
144     // override
145     // void schedule(R)(CommandBase!(R) cmd, ResponseHandler!R handler) {
146     //     cmd.handler = (cr) {
147     //         // Tx might be gone ???
148     //         cr.scheduler = this;
149     //         handler(cr);
150     //     };
151     //     schedule(cmd);
152     // }
153 
154     override void schedule(ICommand cmd) {
155         if (tx !is null) {
156             tx.schedule(cmd);
157         } else {
158             conn.schedule(cmd);
159         }
160     }
161 
162     void handleException(Throwable err) {
163         EventHandler!(Throwable) handler = _exceptionHandler;
164         if (handler !is null) {
165             handler(err);
166         } else {
167             version (HUNT_DB_DEBUG_MORE) {
168                 warning(err);
169             } else {
170                 warning(err.msg);
171             }
172         }
173     }
174 
175     override bool isSSL() {
176         return conn.isSsl();
177     }
178 
179     override bool isConnected() {
180         return conn.isConnected();
181     }
182 
183     override C closeHandler(VoidHandler handler) {
184         _closeHandler = handler;
185         import hunt.Functions;
186         
187         
188         return cast(C) this;
189     }
190 
191     override C exceptionHandler(ExceptionHandler handler) {
192         _exceptionHandler = handler;
193         return cast(C) this;
194     }
195 
196     override Transaction begin() {
197         return begin(false);
198     }
199 
200     Transaction begin(bool closeOnEnd) {
201         if (tx !is null) {
202             throw new IllegalStateException();
203         }
204         tx = new TransactionImpl(conn, (v) {
205             tx = null;
206             if (closeOnEnd) {
207                 close();
208             }
209         });
210         return tx;
211     }
212 
213     abstract void handleNotification(int processId, string channel, string payload);
214 
215     override void close() {
216         version (HUNT_DB_DEBUG)
217             infof("Closing a SQL connection %d...", conn.getProcessId());
218 
219         if(_isClosed) {
220             warningf("The connection %d has been closed already.", conn.getProcessId());
221             return;
222         }
223 
224         handleClosing();
225 
226         VoidHandler handler = _closeHandler;
227         if(handler !is null) {
228             version (HUNT_DB_DEBUG) {
229                 infof("Closing a SQL connection %d with handler...", conn.getProcessId());
230             }
231             handler();
232         } else {
233             version (HUNT_DB_DEBUG)
234                 infof("Closing a DB connection in SQL connection %d...", conn.getProcessId());
235             conn.close();
236 
237             // if (tx !is null) {
238             //     tx.rollback((ar) { conn.close(this); });
239             //     tx = null;
240             // } else {
241             //     conn.close(this);
242             // }
243         }
244     }
245 }