1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.base.impl.SocketConnectionBase;
19 
20 import hunt.database.base.impl.command;
21 import hunt.database.base.impl.Connection;
22 import hunt.database.base.impl.Notice;
23 import hunt.database.base.impl.Notification;
24 import hunt.database.base.impl.PreparedStatement;
25 import hunt.database.base.impl.PreparedStatementCache;
26 import hunt.database.base.impl.StringLongSequence;
27 
28 import hunt.collection.ArrayDeque;
29 import hunt.collection.Deque;
30 import hunt.Exceptions;
31 import hunt.logging;
32 import hunt.net.AbstractConnection;
33 import hunt.net.Connection;
34 import hunt.net.Exceptions;
35 import hunt.Object;
36 import hunt.util.TypeUtils;
37 
38 import std.container.dlist;
39 import std.conv;
40 import std.range;
41 
42 /**
43  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
44  */
45 abstract class SocketConnectionBase : DbConnection {
46 
47     enum Status {
48         CLOSED, CONNECTED, CLOSING
49     }
50 
51     protected PreparedStatementCache psCache;
52     private int preparedStatementCacheSqlLimit;
53     private StringLongSequence psSeq; // = new StringLongSequence();
54     // private ArrayDeque<CommandBase<?>> pending = new ArrayDeque<>();
55     private DList!(ICommand) pending;
56     
57     private int inflight;
58     private Holder holder;
59     private int pipeliningLimit;
60 
61     protected AbstractConnection _socket;
62     protected Status status = Status.CONNECTED;
63 
64     this(AbstractConnection socket,
65                         bool cachePreparedStatements,
66                         int preparedStatementCacheSize,
67                         int preparedStatementCacheSqlLimit,
68                         int pipeliningLimit) {
69         this._socket = socket;
70         this.psSeq = new StringLongSequence();
71         this.pipeliningLimit = pipeliningLimit;
72         this.psCache = cachePreparedStatements ? new PreparedStatementCache(preparedStatementCacheSize, this) : null;
73         this.preparedStatementCacheSqlLimit = preparedStatementCacheSqlLimit;
74     }
75 
76     void initialization() {
77 
78         // ConnectionEventHandlerAdapter adapter = new ConnectionEventHandlerAdapter();
79         // adapter.onClosed(&handleClosed);
80         // adapter.onException(&handleException);
81         // adapter.onMessageReceived((Connection conn, Object msg) {
82         //     version(HUNT_DB_DEBUG) tracef("A message received. %s", typeid(msg));
83         //     try {
84         //         handleMessage(conn, msg);
85         //     } catch (Throwable e) {
86         //         handleException(conn, e);
87         //     }
88         // });
89 
90         // _socket.setHandler(adapter);
91     }
92 
93     AbstractConnection socket() {
94         return _socket;
95     }
96 
97     bool isSsl() {
98         return _socket.isSecured();
99     }
100 
101     bool isConnected() {
102         return status == Status.CONNECTED;
103     }
104 
105     override
106     void initHolder(Holder holder) {
107         this.holder = holder;
108     }
109 
110     override
111     int getProcessId() {
112         return _socket.getId();
113     }
114 
115     override
116     int getSecretKey() {
117         throw new UnsupportedOperationException();
118     }
119 
120     override
121     void close() {
122 
123         version(HUNT_DB_DEBUG) infof("A socket closing... status: %s", status);
124 
125         if (status == Status.CONNECTED) {
126             if(holder !is null) {
127                 holder.handleClosing();
128                 holder = null;
129             }
130             status = Status.CLOSING;
131             _socket.close();
132             // // Append directly since schedule checks the status and won't enqueue the command
133             // pending.add(CloseConnectionCommand.INSTANCE);
134             // checkPending();            
135         }
136         // if (Vertx.currentContext() == context) {
137         //     if (status == Status.CONNECTED) {
138         //         status = Status.CLOSING;
139         //         // Append directly since schedule checks the status and won't enqueue the command
140         //         pending.add(CloseConnectionCommand.INSTANCE);
141         //         checkPending();
142         //     }
143         // } else {
144         //     context.runOnContext(v -> close(holder));
145         // }
146     }
147 
148     // void onClosing(DbConnectionHandler handler) {
149     //     if(_closingHandler !is null) {
150     //         warning("The handler can't be reset.");
151     //         return;
152     //     }
153     //     _closingHandler = handler;
154     // }
155 
156     void schedule(ICommand cmd) {
157         if (!cmd.handlerExist()) {
158             version(HUNT_DEBUG) warningf(typeid(cast(Object)cmd).toString());
159             throw new IllegalArgumentException("No handler exists in command." ~ 
160                 TypeUtils.getSimpleName(typeid(cast(Object)cmd)));
161         }
162 
163         // Special handling for cache
164         PreparedStatementCache psCache = this.psCache;
165         PrepareStatementCommand psCmd = cast(PrepareStatementCommand) cmd;
166         if (psCache !is null && psCmd !is null) {
167             if (psCmd.sql().length > preparedStatementCacheSqlLimit) {
168                 // do not cache the statements
169                 return;
170             }
171             CachedPreparedStatement cached = psCache.get(psCmd.sql());
172             if (cached !is null) {
173                 psCmd.cached = cached;
174                 ResponseHandler!(PreparedStatement) handler = psCmd.handler;
175                 cached.get(handler);
176                 return;
177             }
178 
179             if (psCache.size() >= psCache.getCapacity() && !psCache.isReady()) {
180                 // only if the prepared statement is ready then it can be evicted
181                 version(HUNT_DB_DEBUG) info("do nothing");
182             } else {
183                 psCmd._statement = psSeq.next();
184                 psCmd.cached = cached = new CachedPreparedStatement();
185                 psCache.put(psCmd.sql(), cached);
186                 ResponseHandler!(PreparedStatement) a = psCmd.handler;
187                 (cast(CachedPreparedStatement) cached).get(a);
188                 psCmd.handler = (CommandResponse!(PreparedStatement) r) { cached.handle(r); };
189             }
190         }
191 
192         //
193         if (status == Status.CONNECTED) {
194             pending.insertBack(cmd);
195             checkPending();
196         } else {
197             cmd.fail(new IOException("Connection not open, the status is " ~ status.to!string()));
198         }
199     }
200 
201 
202     private void checkPending() {
203         if (inflight < pipeliningLimit) {
204             ICommand cmd;
205             while (inflight < pipeliningLimit && (cmd = pollPending()) !is null) {
206                 inflight++;
207                 version(HUNT_DB_DEBUG_MORE) {
208                     // tracef("chekcing %s ... ", typeid(cast(Object)cmd));
209                 } else version(HUNT_DB_DEBUG) {
210                     // trace("chekcing... ");
211                 } 
212                 _socket.encode(cast(Object)cmd);
213             }
214         }
215     }
216 
217     private ICommand pollPending() {
218         if(pending.empty())
219             return null;
220         ICommand c = pending.front;
221         pending.removeFront();
222         return c;
223 
224     }
225 
226     void handleMessage(Connection conn, Object msg) {
227         version(HUNT_DB_DEBUG_MORE) tracef("handling a message: %s", typeid(msg));
228 
229         ICommandResponse resp = cast(ICommandResponse) msg;
230         if (resp !is null) {
231             inflight--;
232             checkPending();
233             resp.notifyCommandResponse();
234             version(HUNT_DB_DEBUG_MORE) tracef("inflight=%d", inflight);
235             return;
236         } 
237 
238         Notification n = cast(Notification) msg;
239         if (n !is null) {
240             handleNotification(n);
241             return;
242         }
243 
244         Notice notice = cast(Notice) msg;
245         if (notice !is null) {
246             handleNotice(notice);
247         }
248 
249         version(HUNT_DB_DEBUG) warningf("Unhandled message: %s", typeid(msg));
250     }
251 
252     private void handleNotification(Notification response) {
253         if (holder !is null) {
254             holder.handleNotification(response.getProcessId(), response.getChannel(), response.getPayload());
255         }
256     }
257 
258     private void handleNotice(Notice notice) {
259         notice.log();
260     }
261 
262     void handleClosed(Connection conn) {
263         handleClose(cast(Throwable)null);
264     }
265 
266     void handleException(Connection c, Throwable t) {
267         DecoderException err = cast(DecoderException) t;
268         if (err !is null) {
269             t = err.next;
270         }
271         handleClose(t);
272     }
273 
274     private void handleClose(Throwable t) {
275         version(HUNT_DB_DEBUG) {
276             infof("Connection closed. Throwable: %s", t is null);
277         }
278 
279         if (status == Status.CLOSED) {
280             version(HUNT_DB_DEBUG) warning("The closed connection has been handled already.");
281             return;
282         }
283 
284         status = Status.CLOSED;
285         if (t !is null) {
286             synchronized (this) {
287                 if (holder !is null) {
288                     holder.handleException(t);
289                 }
290             }
291         }
292 
293         version(HUNT_DB_DEBUG) {
294             if(holder !is null) {
295                 tracef("pending: %d, holder: %s", pending[].walkLength(), typeid(cast(Object)holder));
296             }
297         }
298 
299         Throwable cause = t is null ? new Exception("closed") : t;
300         ICommand cmd;
301         while ((cmd = pollPending()) !is null) {
302             ICommand c = cmd;
303             c.fail(cause);
304         }
305 
306         if (holder !is null) {
307             holder.handleClosed();
308         }
309     }
310 
311     override string toString() {
312         import std.format;
313         if(_socket is null) {
314             return format("DbConnection: unknown");
315         } else {
316             return format("DbConnection %d, local: %s", 
317                 _socket.getId(), _socket.getLocalAddress());
318         }
319     }
320 }
321