1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.base.impl.TransactionImpl;
18 
19 import hunt.database.base.impl.Connection;
20 import hunt.database.base.impl.RowSetImpl;
21 import hunt.database.base.impl.SqlClientBase;
22 import hunt.database.base.impl.SqlConnectionBase;
23 import hunt.database.base.impl.SqlResultBuilder;
24 import hunt.database.base.impl.TxStatus;
25 
26 import hunt.database.base.AsyncResult;
27 import hunt.database.base.Common;
28 import hunt.database.base.Exceptions;
29 import hunt.database.base.impl.command.CommandResponse;
30 import hunt.database.base.impl.command.CommandBase;
31 import hunt.database.base.impl.command.QueryCommandBase;
32 import hunt.database.base.impl.command.SimpleQueryCommand;
33 import hunt.database.base.impl.PreparedStatement;
34 import hunt.database.base.PreparedQuery;
35 import hunt.database.base.RowSet;
36 import hunt.database.base.SqlClient;
37 import hunt.database.base.Transaction;
38 
39 import hunt.concurrency.Future;
40 import hunt.concurrency.FuturePromise;
41 import hunt.Exceptions;
42 import hunt.logging;
43 import hunt.Object;
44 
45 import std.string;
46 import std.container.dlist;
47 
48 enum int ST_BEGIN = 0;
49 enum int ST_PENDING = 1;
50 enum int ST_PROCESSING = 2;
51 enum int ST_COMPLETED = 3;
52 
53 /**
54  * 
55  */
56 class TransactionImpl : SqlConnectionBase!(TransactionImpl), Transaction {
57 
58     private AsyncVoidHandler disposeHandler;
59     // private Deque<CommandBase<?>> pending = new ArrayDeque<>();
60     private DList!(ICommand) pending;
61     private AsyncVoidHandler failedHandler;
62     private int _status = ST_BEGIN;
63 
64     this(DbConnection conn, AsyncVoidHandler disposeHandler) { 
65         super(conn); 
66         this.disposeHandler = disposeHandler;
67         doSchedule(doQuery("BEGIN", &afterBegin));
68     }
69 
70     int status() {
71         return _status;
72     }
73 
74     override Transaction prepare(string sql, PreparedQueryHandler handler) {
75         return super.prepare(sql, handler);
76     }
77     alias prepare = SqlConnectionBase!(TransactionImpl).prepare;
78 
79     override TransactionImpl query(string sql, RowSetHandler handler) {
80         return super.query(sql, handler);
81     }
82 
83     // override Future!RowSet queryAsync(string sql) {
84     //     return super.queryAsync(sql);
85     // }
86 
87     // override RowSet query(string sql) {
88     //     return super.query(sql);
89     // }
90     
91     alias query = SqlClientBase!(TransactionImpl).query;
92 
93     private void doSchedule(ICommand cmd) {
94         conn.schedule(cmd);
95     }
96 
97     private void afterBegin(AsyncResult!RowSet ar) {
98         // synchronized (this) {
99             if (ar.succeeded()) {
100                 _status = ST_PENDING;
101             } else {
102                 _status = ST_COMPLETED;
103             }
104             checkPending();
105         // }
106     }
107 
108     private bool isComplete(ICommand cmd) {
109         IQueryCommand queryCmd = cast(IQueryCommand)cmd;
110         if (queryCmd !is null) {
111             string sql = queryCmd.sql().strip().toUpper();
112             return sql == "COMMIT" || sql == "ROLLBACK";
113         }
114         return false;
115     }
116 
117 
118     private void checkPending() {
119         synchronized (this) {
120             doCheckPending();
121         }
122     }
123 
124     private void doCheckPending() {
125         switch (_status) {
126             case ST_BEGIN:
127                 break;
128             case ST_PENDING: {
129                 ICommand cmd = pollPending();
130                 if (cmd !is null) {
131                     if (isComplete(cmd)) {
132                         _status = ST_COMPLETED;
133                     } else {
134                         wrap(cmd);
135                         _status = ST_PROCESSING;
136                     }
137                     doSchedule(cmd);
138                 }
139                 break;
140             }
141             case ST_PROCESSING:
142                 break;
143             case ST_COMPLETED: {
144                 if (!pending.empty()) {
145                     DatabaseException err = new DatabaseException("Transaction already completed");
146                     ICommand cmd;
147                     while ((cmd = pollPending()) !is null) {
148                         cmd.fail(err);
149                     }
150                 }
151                 break;
152             }
153 
154             default:
155                 warning("unhandled _status: %d", _status);
156                 break;
157         }
158     }
159 
160     private ICommand pollPending() {
161         if(pending.empty())
162             return null;
163         ICommand r = pending.front();
164         pending.removeFront();
165         return r;
166     }
167 
168     // override
169     // <R> void schedule(CommandBase!(R) cmd, Handler<? super CommandResponse!(R)> handler) {
170     //     cmd.handler = cr -> {
171     //         cr.scheduler = this;
172     //         handler.handle(cr);
173     //     };
174     //     schedule(cmd);
175     // }
176 
177     override void schedule(ICommand cmd) {
178         synchronized (this) {
179             pending.insertBack(cmd);
180         }
181         checkPending();
182     }
183 
184     private void wrap(ICommand cmd) {
185         auto rowSetCommand = cast(CommandBase!bool)cmd;
186         if(rowSetCommand !is null) {
187             wrap!(bool)(rowSetCommand);
188             return;
189         } 
190         
191         auto preparedStatementCommand = cast(CommandBase!PreparedStatement)cmd;
192         if(preparedStatementCommand !is null) { 
193             wrap!(PreparedStatement)(preparedStatementCommand);
194             return;
195         }
196 
197         version(HUNT_DB_DEBUG) trace(typeid(cast(Object)cmd));
198         implementationMissing(false);
199     }
200 
201     private void wrap(T)(CommandBase!T cmd) {
202         ResponseHandler!T handler = cmd.handler;
203         cmd.handler = (CommandResponse!T ar) {
204             synchronized (this) {
205                 _status = ST_PENDING;
206                 if (ar.txStatus() == TxStatus.FAILED) {
207                     // We won't recover from this so rollback
208                     ICommand c;
209                     while ((c = pollPending()) !is null) {
210                         c.fail(new DatabaseException("rollback exception"));
211                     }
212                     AsyncVoidHandler h = failedHandler;
213                     if (h !is null) {
214                         // context.runOnContext(h);
215                         warning("running here");
216                         h(null);
217                     }
218                     schedule(doQuery("ROLLBACK", (ar2) {
219                         trace("running here");
220                         disposeHandler(null);
221                         handler(ar);
222                     }));
223                 } else {
224                     handler(ar);
225                     checkPending();
226                 }
227             }
228         };
229     }
230 
231     void commit() {
232         auto f = new FuturePromise!Void();
233         while(_status == ST_BEGIN) {
234             version(HUNT_DB_DEBUG_MORE) warning("Waiting for the response for BEGIN");
235         }
236 
237         commit((VoidAsyncResult ar) {
238             if (ar.succeeded()) { 
239                 f.succeeded(null);
240             } else {
241                 f.failed(ar.cause()); 
242             }
243         });
244 
245         version(HUNT_DB_DEBUG) warning("try to get a commit result");
246         import core.time;
247         f.get(awaittingTimeout);
248     }
249 
250     void commit(AsyncVoidHandler handler) {
251         version(HUNT_DB_DEBUG) tracef("_status: %d", _status);
252         switch (_status) {
253             case ST_BEGIN:
254             case ST_PENDING:
255             case ST_PROCESSING:
256                 schedule(doQuery("COMMIT", (ar) {
257                     disposeHandler(null);
258                     if (handler !is null) {
259                         if (ar.succeeded()) {
260                             handler(succeededResult(cast(Void)null));
261                         } else {
262                             version(HUNT_DB_DEBUG) warning(ar.cause());
263                             handler(failedResult!Void(ar.cause()));
264                         }
265                     }
266                 }));
267                 break;
268             case ST_COMPLETED:
269                 if (handler !is null) {
270                     handler(failedResult!Void(new DatabaseException("Transaction already completed")));
271                 }
272                 break;
273 
274             default:
275                 break;
276         }
277     }
278 
279     void rollback() {
280         try {
281             // rollback(null);
282             auto f = new FuturePromise!Void("rollback");
283             while(_status == ST_BEGIN) {
284                 version(HUNT_DB_DEBUG_MORE) warning("Waiting for the response for BEGIN");
285             }
286 
287             rollback((VoidAsyncResult ar) {
288                 if (ar.succeeded()) { 
289                     f.succeeded(null);
290                 } else {
291                     f.failed(ar.cause()); 
292                 }
293             });
294 
295             version(HUNT_DB_DEBUG) trace("try to get a rollback result");
296             import core.time;
297             f.get(awaittingTimeout);
298         } catch(Throwable t) {
299             errorf("A transaction failed to rollback.");
300             warning(t);
301         }
302     }
303 
304     void rollback(AsyncVoidHandler handler) {
305         schedule(doQuery("ROLLBACK", (RowSetAsyncResult ar) {
306             disposeHandler(null);
307             if (handler !is null) {
308                 handler(succeededResult(cast(Void)null));
309             }
310         }));
311     }
312 
313     override
314     void close() {
315         rollback();
316     }
317 
318     override
319     Transaction abortHandler(AsyncVoidHandler handler) {
320         failedHandler = handler;
321         return this;
322     }
323 
324     private ICommand doQuery(string sql, RowSetHandler handler) {
325         auto b = new SqlResultBuilder!(RowSet, RowSetImpl, RowSet)(RowSetImpl.FACTORY, handler);
326         auto cmd = new SimpleQueryCommand!(RowSet)(sql, false, b); 
327         cmd.handler = (r) { b.handle(r); };
328         return cmd;
329     }
330 }