1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.base.impl.SqlClientBase;
19 
20 import hunt.database.base.impl.ArrayTuple;
21 import hunt.database.base.impl.PreparedStatement;
22 import hunt.database.base.impl.RowSetImpl;
23 import hunt.database.base.impl.SqlResultBuilder;
24 
25 import hunt.database.base.impl.command.CommandBase;
26 import hunt.database.base.impl.command.CommandResponse;
27 import hunt.database.base.impl.command.CommandScheduler;
28 import hunt.database.base.impl.command.ExtendedBatchQueryCommand;
29 import hunt.database.base.impl.command.ExtendedQueryCommand;
30 import hunt.database.base.impl.command.PrepareStatementCommand;
31 import hunt.database.base.impl.command.SimpleQueryCommand;
32 
33 import hunt.database.base.AsyncResult;
34 import hunt.database.base.Exceptions;
35 import hunt.database.base.RowSet;
36 import hunt.database.base.Row;
37 import hunt.database.base.SqlClient;
38 import hunt.database.base.SqlResult;
39 import hunt.database.base.Tuple;
40 
41 import hunt.collection.List;
42 import hunt.concurrency.Future;
43 import hunt.concurrency.FuturePromise;
44 import hunt.Exceptions;
45 import hunt.Functions;
46 import hunt.logging;
47 import hunt.net.AbstractConnection;
48 
49 import core.time;
50 import std.variant;
51 
52 
53 /**
54  * 
55  */
56 abstract class SqlClientBase(C) : SqlClient, CommandScheduler  { // if(is(C : SqlClient))
57 
58     private Duration _awaittingTimeout = 10.seconds;
59 
60     Duration awaittingTimeout() {
61         return _awaittingTimeout;
62     }
63 
64     void awaittingTimeout(Duration value) {
65         _awaittingTimeout = value;
66     }
67 
68     override
69     C query(string sql, RowSetHandler handler) {
70         return query!(RowSet, RowSetImpl, RowSet)(sql, false, RowSetImpl.FACTORY, handler); 
71     }
72 
73     Future!RowSet queryAsync(string sql) {
74         auto f = new FuturePromise!RowSet("query");
75 
76         auto b = new SqlResultBuilder!(RowSet, RowSetImpl, RowSet)(RowSetImpl.FACTORY, (RowSetAsyncResult ar) {
77             if (ar.succeeded()) { f.succeeded(ar.result());}
78             else { f.failed(ar.cause()); }
79         });
80 
81         scheduleThen!(bool)(new SimpleQueryCommand!(RowSet)(sql, false, b), 
82             (CommandResponse!bool r) {  b.handle(r); }
83         );
84 
85         return f;
86     }
87 
88     RowSet query(string sql) {
89         auto f = queryAsync(sql);
90         try {
91             version(HUNT_DB_DEBUG) tracef("try to get a query result in %s", _awaittingTimeout);            
92             return f.get(_awaittingTimeout);
93         } catch(Exception ex) {
94             warning(ex.msg);
95             version(HUNT_DEBUG) warning(ex);
96             throw new DatabaseException(ex.msg);
97         }
98     }
99 
100     private C query(R1, R2, R3)(string sql, bool singleton,
101                 Function!(R1, R2) factory,
102                 AsyncResultHandler!(R3) handler) {
103 
104         SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder!(R1, R2, R3)(factory, handler);
105         scheduleThen!(bool)(new SimpleQueryCommand!(R1)(sql, singleton, b), 
106             (CommandResponse!bool r) {  b.handle(r); }
107         );
108 
109         return cast(C) this;
110     }
111 
112     override
113     C preparedQuery(string sql, Tuple arguments, RowSetHandler handler) {
114         return preparedQuery!(RowSet, RowSetImpl, RowSet)(sql, arguments, false, RowSetImpl.FACTORY, handler); 
115     }
116 
117 
118     // <R1, R2 extends SqlResultBase!(R1, R2), R3 extends SqlResult!(R1)> 
119     private C preparedQuery(R1, R2, R3)(
120             string sql,
121             Tuple arguments,
122             bool singleton,
123             Function!(R1, R2) factory,
124             AsyncResultHandler!(R3) handler) {
125 
126         scheduleThen!(PreparedStatement)(new PrepareStatementCommand(sql), 
127             (CommandResponse!PreparedStatement cr) {
128                 if (cr.succeeded()) {
129                     PreparedStatement ps = cr.result();
130                     string msg = ps.prepare(cast(List!(Variant)) arguments);
131                     if (msg !is null) {
132                         version(HUNT_DEBUG) warning(msg);
133                         if(handler !is null)
134                             handler(failedResult!(R3)(new DatabaseException(msg)));
135                     } else {
136                         SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder!(R1, R2, R3)(factory, handler);
137 
138                         CommandScheduler sc = cr.scheduler;
139                         version(HUNT_DB_DEBUG) {
140                             if(sc !is null) {
141                                 trace(typeid(cast(Object)sc));
142                             }
143                         }
144                         SqlClientBase!(C) client = cast(SqlClientBase!(C))sc;
145                         assert(client is this);
146 
147                         scheduleThen!(bool)(new ExtendedQueryCommand!(R1)(ps, arguments, singleton, b), 
148                             (CommandResponse!bool r) {  b.handle(r); }
149                         ); 
150                     }
151                 } else {
152                     version(HUNT_DB_DEBUG) {
153                         warning(cr.cause());
154                     }
155                     handler(failedResult!(R3)(cr.cause()));
156                 }
157             }
158         );
159 
160         return cast(C) this;
161     }
162 
163     override
164     C preparedQuery(string sql, RowSetHandler handler) {
165         return preparedQuery(sql, ArrayTuple.EMPTY, handler);
166     }
167 
168     // override
169     // <R> C preparedQuery(string sql, Collector<Row, ?, R> collector, Handler!(AsyncResult!(SqlResult!(R))) handler) {
170     //     return preparedQuery(sql, ArrayTuple.EMPTY, collector, handler);
171     // }
172 
173     override
174     C preparedBatch(string sql, List!(Tuple) batch, RowSetHandler handler) {
175         // return preparedBatch(sql, batch, false, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR, handler);
176         implementationMissing(false);
177         return null;
178     }
179 
180     // override
181     // <R> C preparedBatch(string sql, List!(Tuple) batch, Collector<Row, ?, R> collector, Handler!(AsyncResult!(SqlResult!(R))) handler) {
182     //     return preparedBatch(sql, batch, true, SqlResultImpl::new, collector, handler);
183     // }
184 
185     // private <R1, R2 extends SqlResultBase!(R1, R2), R3 extends SqlResult!(R1)> C preparedBatch(
186     //     string sql,
187     //     List!(Tuple) batch,
188     //     boolean singleton,
189     //     Function!(R1, R2) factory,
190     //     Collector<Row, ?, R1> collector,
191     //     Handler!(AsyncResult!(R3)) handler) {
192     //     schedule(new PrepareStatementCommand(sql), cr -> {
193     //         if (cr.succeeded()) {
194     //             PreparedStatement ps = cr.result();
195     //             for  (Tuple args : batch) {
196     //                 string msg = ps.prepare((List!(Object)) args);
197     //                 if (msg !is null) {
198     //                     handler.handle(Future.failedFuture(msg));
199     //                     return;
200     //                 }
201     //             }
202     //             SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder<>(factory, handler);
203     //             cr.scheduler.schedule(new ExtendedBatchQueryCommand<>(
204     //                 ps,
205     //                 batch,
206     //                 singleton,
207     //                 collector,
208     //                 b), b);
209     //         } else {
210     //             handler.handle(Future.failedFuture(cr.cause()));
211     //         }
212     //     });
213     //     return (C) this;
214     // }
215 
216     protected void scheduleThen(R)(CommandBase!(R) cmd, ResponseHandler!R handler) {
217         cmd.handler = (cr) {
218             // Tx might be gone ???
219             cr.scheduler = this;
220             handler(cr);
221         };
222         schedule(cmd);
223     }
224 
225     void schedule(ICommand cmd) {
226         throw new NotImplementedException();
227     }
228 }