1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.base.impl.TransactionImpl; 18 19 import hunt.database.base.impl.Connection; 20 import hunt.database.base.impl.RowSetImpl; 21 import hunt.database.base.impl.SqlClientBase; 22 import hunt.database.base.impl.SqlConnectionBase; 23 import hunt.database.base.impl.SqlResultBuilder; 24 import hunt.database.base.impl.TxStatus; 25 26 import hunt.database.base.AsyncResult; 27 import hunt.database.base.Common; 28 import hunt.database.base.Exceptions; 29 import hunt.database.base.impl.command.CommandResponse; 30 import hunt.database.base.impl.command.CommandBase; 31 import hunt.database.base.impl.command.QueryCommandBase; 32 import hunt.database.base.impl.command.SimpleQueryCommand; 33 import hunt.database.base.impl.PreparedStatement; 34 import hunt.database.base.PreparedQuery; 35 import hunt.database.base.RowSet; 36 import hunt.database.base.SqlClient; 37 import hunt.database.base.Transaction; 38 39 import hunt.concurrency.Future; 40 import hunt.concurrency.FuturePromise; 41 import hunt.Exceptions; 42 import hunt.logging; 43 import hunt.Object; 44 45 import std.string; 46 import std.container.dlist; 47 48 enum int ST_BEGIN = 0; 49 enum int ST_PENDING = 1; 50 enum int ST_PROCESSING = 2; 51 enum int ST_COMPLETED = 3; 52 53 /** 54 * 55 */ 56 class TransactionImpl : SqlConnectionBase!(TransactionImpl), Transaction { 57 58 private AsyncVoidHandler disposeHandler; 59 // private Deque<CommandBase<?>> pending = new ArrayDeque<>(); 60 private DList!(ICommand) pending; 61 private AsyncVoidHandler failedHandler; 62 private int _status = ST_BEGIN; 63 64 this(DbConnection conn, AsyncVoidHandler disposeHandler) { 65 super(conn); 66 this.disposeHandler = disposeHandler; 67 doSchedule(doQuery("BEGIN", &afterBegin)); 68 } 69 70 int status() { 71 return _status; 72 } 73 74 override Transaction prepare(string sql, PreparedQueryHandler handler) { 75 return super.prepare(sql, handler); 76 } 77 alias prepare = SqlConnectionBase!(TransactionImpl).prepare; 78 79 override TransactionImpl query(string sql, RowSetHandler handler) { 80 return super.query(sql, handler); 81 } 82 83 // override Future!RowSet queryAsync(string sql) { 84 // return super.queryAsync(sql); 85 // } 86 87 // override RowSet query(string sql) { 88 // return super.query(sql); 89 // } 90 91 alias query = SqlClientBase!(TransactionImpl).query; 92 93 private void doSchedule(ICommand cmd) { 94 conn.schedule(cmd); 95 } 96 97 private void afterBegin(AsyncResult!RowSet ar) { 98 // synchronized (this) { 99 if (ar.succeeded()) { 100 _status = ST_PENDING; 101 } else { 102 _status = ST_COMPLETED; 103 } 104 checkPending(); 105 // } 106 } 107 108 private bool isComplete(ICommand cmd) { 109 IQueryCommand queryCmd = cast(IQueryCommand)cmd; 110 if (queryCmd !is null) { 111 string sql = queryCmd.sql().strip().toUpper(); 112 return sql == "COMMIT" || sql == "ROLLBACK"; 113 } 114 return false; 115 } 116 117 118 private void checkPending() { 119 synchronized (this) { 120 doCheckPending(); 121 } 122 } 123 124 private void doCheckPending() { 125 switch (_status) { 126 case ST_BEGIN: 127 break; 128 case ST_PENDING: { 129 ICommand cmd = pollPending(); 130 if (cmd !is null) { 131 if (isComplete(cmd)) { 132 _status = ST_COMPLETED; 133 } else { 134 wrap(cmd); 135 _status = ST_PROCESSING; 136 } 137 doSchedule(cmd); 138 } 139 break; 140 } 141 case ST_PROCESSING: 142 break; 143 case ST_COMPLETED: { 144 if (!pending.empty()) { 145 DatabaseException err = new DatabaseException("Transaction already completed"); 146 ICommand cmd; 147 while ((cmd = pollPending()) !is null) { 148 cmd.fail(err); 149 } 150 } 151 break; 152 } 153 154 default: 155 warning("unhandled _status: %d", _status); 156 break; 157 } 158 } 159 160 private ICommand pollPending() { 161 if(pending.empty()) 162 return null; 163 ICommand r = pending.front(); 164 pending.removeFront(); 165 return r; 166 } 167 168 // override 169 // <R> void schedule(CommandBase!(R) cmd, Handler<? super CommandResponse!(R)> handler) { 170 // cmd.handler = cr -> { 171 // cr.scheduler = this; 172 // handler.handle(cr); 173 // }; 174 // schedule(cmd); 175 // } 176 177 override void schedule(ICommand cmd) { 178 synchronized (this) { 179 pending.insertBack(cmd); 180 } 181 checkPending(); 182 } 183 184 private void wrap(ICommand cmd) { 185 auto rowSetCommand = cast(CommandBase!bool)cmd; 186 if(rowSetCommand !is null) { 187 wrap!(bool)(rowSetCommand); 188 return; 189 } 190 191 auto preparedStatementCommand = cast(CommandBase!PreparedStatement)cmd; 192 if(preparedStatementCommand !is null) { 193 wrap!(PreparedStatement)(preparedStatementCommand); 194 return; 195 } 196 197 version(HUNT_DB_DEBUG) trace(typeid(cast(Object)cmd)); 198 implementationMissing(false); 199 } 200 201 private void wrap(T)(CommandBase!T cmd) { 202 ResponseHandler!T handler = cmd.handler; 203 cmd.handler = (CommandResponse!T ar) { 204 synchronized (this) { 205 _status = ST_PENDING; 206 if (ar.txStatus() == TxStatus.FAILED) { 207 // We won't recover from this so rollback 208 ICommand c; 209 while ((c = pollPending()) !is null) { 210 c.fail(new DatabaseException("rollback exception")); 211 } 212 AsyncVoidHandler h = failedHandler; 213 if (h !is null) { 214 // context.runOnContext(h); 215 warning("running here"); 216 h(null); 217 } 218 schedule(doQuery("ROLLBACK", (ar2) { 219 trace("running here"); 220 disposeHandler(null); 221 handler(ar); 222 })); 223 } else { 224 handler(ar); 225 checkPending(); 226 } 227 } 228 }; 229 } 230 231 void commit() { 232 auto f = new FuturePromise!Void(); 233 while(_status == ST_BEGIN) { 234 version(HUNT_DB_DEBUG_MORE) warning("Waiting for the response for BEGIN"); 235 } 236 237 commit((VoidAsyncResult ar) { 238 if (ar.succeeded()) { 239 f.succeeded(null); 240 } else { 241 f.failed(ar.cause()); 242 } 243 }); 244 245 version(HUNT_DB_DEBUG) warning("try to get a commit result"); 246 import core.time; 247 f.get(awaittingTimeout); 248 } 249 250 void commit(AsyncVoidHandler handler) { 251 version(HUNT_DB_DEBUG) tracef("_status: %d", _status); 252 switch (_status) { 253 case ST_BEGIN: 254 case ST_PENDING: 255 case ST_PROCESSING: 256 schedule(doQuery("COMMIT", (ar) { 257 disposeHandler(null); 258 if (handler !is null) { 259 if (ar.succeeded()) { 260 handler(succeededResult(cast(Void)null)); 261 } else { 262 version(HUNT_DB_DEBUG) warning(ar.cause()); 263 handler(failedResult!Void(ar.cause())); 264 } 265 } 266 })); 267 break; 268 case ST_COMPLETED: 269 if (handler !is null) { 270 handler(failedResult!Void(new DatabaseException("Transaction already completed"))); 271 } 272 break; 273 274 default: 275 break; 276 } 277 } 278 279 void rollback() { 280 try { 281 // rollback(null); 282 auto f = new FuturePromise!Void("rollback"); 283 while(_status == ST_BEGIN) { 284 version(HUNT_DB_DEBUG_MORE) warning("Waiting for the response for BEGIN"); 285 } 286 287 rollback((VoidAsyncResult ar) { 288 if (ar.succeeded()) { 289 f.succeeded(null); 290 } else { 291 f.failed(ar.cause()); 292 } 293 }); 294 295 version(HUNT_DB_DEBUG) trace("try to get a rollback result"); 296 import core.time; 297 f.get(awaittingTimeout); 298 } catch(Throwable t) { 299 errorf("A transaction failed to rollback."); 300 warning(t); 301 } 302 } 303 304 void rollback(AsyncVoidHandler handler) { 305 schedule(doQuery("ROLLBACK", (RowSetAsyncResult ar) { 306 disposeHandler(null); 307 if (handler !is null) { 308 handler(succeededResult(cast(Void)null)); 309 } 310 })); 311 } 312 313 override 314 void close() { 315 rollback(); 316 } 317 318 override 319 Transaction abortHandler(AsyncVoidHandler handler) { 320 failedHandler = handler; 321 return this; 322 } 323 324 private ICommand doQuery(string sql, RowSetHandler handler) { 325 auto b = new SqlResultBuilder!(RowSet, RowSetImpl, RowSet)(RowSetImpl.FACTORY, handler); 326 auto cmd = new SimpleQueryCommand!(RowSet)(sql, false, b); 327 cmd.handler = (r) { b.handle(r); }; 328 return cmd; 329 } 330 }