1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.base.impl.SocketConnectionBase; 19 20 import hunt.database.base.impl.command; 21 import hunt.database.base.impl.Connection; 22 import hunt.database.base.impl.Notice; 23 import hunt.database.base.impl.Notification; 24 import hunt.database.base.impl.PreparedStatement; 25 import hunt.database.base.impl.PreparedStatementCache; 26 import hunt.database.base.impl.StringLongSequence; 27 28 import hunt.collection.ArrayDeque; 29 import hunt.collection.Deque; 30 import hunt.Exceptions; 31 import hunt.logging; 32 import hunt.net.AbstractConnection; 33 import hunt.net.Connection; 34 import hunt.net.Exceptions; 35 import hunt.Object; 36 import hunt.util.TypeUtils; 37 38 import std.container.dlist; 39 import std.conv; 40 import std.range; 41 42 /** 43 * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> 44 */ 45 abstract class SocketConnectionBase : DbConnection { 46 47 enum Status { 48 CLOSED, CONNECTED, CLOSING 49 } 50 51 protected PreparedStatementCache psCache; 52 private int preparedStatementCacheSqlLimit; 53 private StringLongSequence psSeq; // = new StringLongSequence(); 54 // private ArrayDeque<CommandBase<?>> pending = new ArrayDeque<>(); 55 private DList!(ICommand) pending; 56 57 private int inflight; 58 private Holder holder; 59 private int pipeliningLimit; 60 61 protected AbstractConnection _socket; 62 protected Status status = Status.CONNECTED; 63 64 this(AbstractConnection socket, 65 bool cachePreparedStatements, 66 int preparedStatementCacheSize, 67 int preparedStatementCacheSqlLimit, 68 int pipeliningLimit) { 69 this._socket = socket; 70 this.psSeq = new StringLongSequence(); 71 this.pipeliningLimit = pipeliningLimit; 72 this.psCache = cachePreparedStatements ? new PreparedStatementCache(preparedStatementCacheSize, this) : null; 73 this.preparedStatementCacheSqlLimit = preparedStatementCacheSqlLimit; 74 } 75 76 void initialization() { 77 78 // ConnectionEventHandlerAdapter adapter = new ConnectionEventHandlerAdapter(); 79 // adapter.onClosed(&handleClosed); 80 // adapter.onException(&handleException); 81 // adapter.onMessageReceived((Connection conn, Object msg) { 82 // version(HUNT_DB_DEBUG) tracef("A message received. %s", typeid(msg)); 83 // try { 84 // handleMessage(conn, msg); 85 // } catch (Throwable e) { 86 // handleException(conn, e); 87 // } 88 // }); 89 90 // _socket.setHandler(adapter); 91 } 92 93 AbstractConnection socket() { 94 return _socket; 95 } 96 97 bool isSsl() { 98 return _socket.isSecured(); 99 } 100 101 bool isConnected() { 102 return status == Status.CONNECTED; 103 } 104 105 override 106 void initHolder(Holder holder) { 107 this.holder = holder; 108 } 109 110 override 111 int getProcessId() { 112 return _socket.getId(); 113 } 114 115 override 116 int getSecretKey() { 117 throw new UnsupportedOperationException(); 118 } 119 120 override 121 void close() { 122 123 version(HUNT_DB_DEBUG) infof("A socket closing... status: %s", status); 124 125 if (status == Status.CONNECTED) { 126 if(holder !is null) { 127 holder.handleClosing(); 128 holder = null; 129 } 130 status = Status.CLOSING; 131 _socket.close(); 132 // // Append directly since schedule checks the status and won't enqueue the command 133 // pending.add(CloseConnectionCommand.INSTANCE); 134 // checkPending(); 135 } 136 // if (Vertx.currentContext() == context) { 137 // if (status == Status.CONNECTED) { 138 // status = Status.CLOSING; 139 // // Append directly since schedule checks the status and won't enqueue the command 140 // pending.add(CloseConnectionCommand.INSTANCE); 141 // checkPending(); 142 // } 143 // } else { 144 // context.runOnContext(v -> close(holder)); 145 // } 146 } 147 148 // void onClosing(DbConnectionHandler handler) { 149 // if(_closingHandler !is null) { 150 // warning("The handler can't be reset."); 151 // return; 152 // } 153 // _closingHandler = handler; 154 // } 155 156 void schedule(ICommand cmd) { 157 if (!cmd.handlerExist()) { 158 version(HUNT_DEBUG) warningf(typeid(cast(Object)cmd).toString()); 159 throw new IllegalArgumentException("No handler exists in command." ~ 160 TypeUtils.getSimpleName(typeid(cast(Object)cmd))); 161 } 162 163 // Special handling for cache 164 PreparedStatementCache psCache = this.psCache; 165 PrepareStatementCommand psCmd = cast(PrepareStatementCommand) cmd; 166 if (psCache !is null && psCmd !is null) { 167 if (psCmd.sql().length > preparedStatementCacheSqlLimit) { 168 // do not cache the statements 169 return; 170 } 171 CachedPreparedStatement cached = psCache.get(psCmd.sql()); 172 if (cached !is null) { 173 psCmd.cached = cached; 174 ResponseHandler!(PreparedStatement) handler = psCmd.handler; 175 cached.get(handler); 176 return; 177 } 178 179 if (psCache.size() >= psCache.getCapacity() && !psCache.isReady()) { 180 // only if the prepared statement is ready then it can be evicted 181 version(HUNT_DB_DEBUG) info("do nothing"); 182 } else { 183 psCmd._statement = psSeq.next(); 184 psCmd.cached = cached = new CachedPreparedStatement(); 185 psCache.put(psCmd.sql(), cached); 186 ResponseHandler!(PreparedStatement) a = psCmd.handler; 187 (cast(CachedPreparedStatement) cached).get(a); 188 psCmd.handler = (CommandResponse!(PreparedStatement) r) { cached.handle(r); }; 189 } 190 } 191 192 // 193 if (status == Status.CONNECTED) { 194 pending.insertBack(cmd); 195 checkPending(); 196 } else { 197 cmd.fail(new IOException("Connection not open, the status is " ~ status.to!string())); 198 } 199 } 200 201 202 private void checkPending() { 203 if (inflight < pipeliningLimit) { 204 ICommand cmd; 205 while (inflight < pipeliningLimit && (cmd = pollPending()) !is null) { 206 inflight++; 207 version(HUNT_DB_DEBUG_MORE) { 208 // tracef("chekcing %s ... ", typeid(cast(Object)cmd)); 209 } else version(HUNT_DB_DEBUG) { 210 // trace("chekcing... "); 211 } 212 _socket.encode(cast(Object)cmd); 213 } 214 } 215 } 216 217 private ICommand pollPending() { 218 if(pending.empty()) 219 return null; 220 ICommand c = pending.front; 221 pending.removeFront(); 222 return c; 223 224 } 225 226 void handleMessage(Connection conn, Object msg) { 227 version(HUNT_DB_DEBUG_MORE) tracef("handling a message: %s", typeid(msg)); 228 229 ICommandResponse resp = cast(ICommandResponse) msg; 230 if (resp !is null) { 231 inflight--; 232 checkPending(); 233 resp.notifyCommandResponse(); 234 version(HUNT_DB_DEBUG_MORE) tracef("inflight=%d", inflight); 235 return; 236 } 237 238 Notification n = cast(Notification) msg; 239 if (n !is null) { 240 handleNotification(n); 241 return; 242 } 243 244 Notice notice = cast(Notice) msg; 245 if (notice !is null) { 246 handleNotice(notice); 247 } 248 249 version(HUNT_DB_DEBUG) warningf("Unhandled message: %s", typeid(msg)); 250 } 251 252 private void handleNotification(Notification response) { 253 if (holder !is null) { 254 holder.handleNotification(response.getProcessId(), response.getChannel(), response.getPayload()); 255 } 256 } 257 258 private void handleNotice(Notice notice) { 259 notice.log(); 260 } 261 262 void handleClosed(Connection conn) { 263 handleClose(cast(Throwable)null); 264 } 265 266 void handleException(Connection c, Throwable t) { 267 DecoderException err = cast(DecoderException) t; 268 if (err !is null) { 269 t = err.next; 270 } 271 handleClose(t); 272 } 273 274 private void handleClose(Throwable t) { 275 version(HUNT_DB_DEBUG) { 276 infof("Connection closed. Throwable: %s", t is null); 277 } 278 279 if (status == Status.CLOSED) { 280 version(HUNT_DB_DEBUG) warning("The closed connection has been handled already."); 281 return; 282 } 283 284 status = Status.CLOSED; 285 if (t !is null) { 286 synchronized (this) { 287 if (holder !is null) { 288 holder.handleException(t); 289 } 290 } 291 } 292 293 version(HUNT_DB_DEBUG) { 294 if(holder !is null) { 295 tracef("pending: %d, holder: %s", pending[].walkLength(), typeid(cast(Object)holder)); 296 } 297 } 298 299 Throwable cause = t is null ? new Exception("closed") : t; 300 ICommand cmd; 301 while ((cmd = pollPending()) !is null) { 302 ICommand c = cmd; 303 c.fail(cause); 304 } 305 306 if (holder !is null) { 307 holder.handleClosed(); 308 } 309 } 310 311 override string toString() { 312 import std.format; 313 if(_socket is null) { 314 return format("DbConnection: unknown"); 315 } else { 316 return format("DbConnection %d, local: %s", 317 _socket.getId(), _socket.getLocalAddress()); 318 } 319 } 320 } 321