1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.base.impl.SqlClientBase; 19 20 import hunt.database.base.impl.ArrayTuple; 21 import hunt.database.base.impl.PreparedStatement; 22 import hunt.database.base.impl.RowSetImpl; 23 import hunt.database.base.impl.SqlResultBuilder; 24 25 import hunt.database.base.impl.command.CommandBase; 26 import hunt.database.base.impl.command.CommandResponse; 27 import hunt.database.base.impl.command.CommandScheduler; 28 import hunt.database.base.impl.command.ExtendedBatchQueryCommand; 29 import hunt.database.base.impl.command.ExtendedQueryCommand; 30 import hunt.database.base.impl.command.PrepareStatementCommand; 31 import hunt.database.base.impl.command.SimpleQueryCommand; 32 33 import hunt.database.base.AsyncResult; 34 import hunt.database.base.Exceptions; 35 import hunt.database.base.RowSet; 36 import hunt.database.base.Row; 37 import hunt.database.base.SqlClient; 38 import hunt.database.base.SqlResult; 39 import hunt.database.base.Tuple; 40 41 import hunt.collection.List; 42 import hunt.concurrency.Future; 43 import hunt.concurrency.FuturePromise; 44 import hunt.Exceptions; 45 import hunt.Functions; 46 import hunt.logging; 47 import hunt.net.AbstractConnection; 48 49 import core.time; 50 import std.variant; 51 52 53 /** 54 * 55 */ 56 abstract class SqlClientBase(C) : SqlClient, CommandScheduler { // if(is(C : SqlClient)) 57 58 private Duration _awaittingTimeout = 10.seconds; 59 60 Duration awaittingTimeout() { 61 return _awaittingTimeout; 62 } 63 64 void awaittingTimeout(Duration value) { 65 _awaittingTimeout = value; 66 } 67 68 override 69 C query(string sql, RowSetHandler handler) { 70 return query!(RowSet, RowSetImpl, RowSet)(sql, false, RowSetImpl.FACTORY, handler); 71 } 72 73 Future!RowSet queryAsync(string sql) { 74 auto f = new FuturePromise!RowSet("query"); 75 76 auto b = new SqlResultBuilder!(RowSet, RowSetImpl, RowSet)(RowSetImpl.FACTORY, (RowSetAsyncResult ar) { 77 if (ar.succeeded()) { f.succeeded(ar.result());} 78 else { f.failed(ar.cause()); } 79 }); 80 81 scheduleThen!(bool)(new SimpleQueryCommand!(RowSet)(sql, false, b), 82 (CommandResponse!bool r) { b.handle(r); } 83 ); 84 85 return f; 86 } 87 88 RowSet query(string sql) { 89 auto f = queryAsync(sql); 90 try { 91 version(HUNT_DB_DEBUG) tracef("try to get a query result in %s", _awaittingTimeout); 92 return f.get(_awaittingTimeout); 93 } catch(Exception ex) { 94 warning(ex.msg); 95 version(HUNT_DEBUG) warning(ex); 96 throw new DatabaseException(ex.msg); 97 } 98 } 99 100 private C query(R1, R2, R3)(string sql, bool singleton, 101 Function!(R1, R2) factory, 102 AsyncResultHandler!(R3) handler) { 103 104 SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder!(R1, R2, R3)(factory, handler); 105 scheduleThen!(bool)(new SimpleQueryCommand!(R1)(sql, singleton, b), 106 (CommandResponse!bool r) { b.handle(r); } 107 ); 108 109 return cast(C) this; 110 } 111 112 override 113 C preparedQuery(string sql, Tuple arguments, RowSetHandler handler) { 114 return preparedQuery!(RowSet, RowSetImpl, RowSet)(sql, arguments, false, RowSetImpl.FACTORY, handler); 115 } 116 117 118 // <R1, R2 extends SqlResultBase!(R1, R2), R3 extends SqlResult!(R1)> 119 private C preparedQuery(R1, R2, R3)( 120 string sql, 121 Tuple arguments, 122 bool singleton, 123 Function!(R1, R2) factory, 124 AsyncResultHandler!(R3) handler) { 125 126 scheduleThen!(PreparedStatement)(new PrepareStatementCommand(sql), 127 (CommandResponse!PreparedStatement cr) { 128 if (cr.succeeded()) { 129 PreparedStatement ps = cr.result(); 130 string msg = ps.prepare(cast(List!(Variant)) arguments); 131 if (msg !is null) { 132 version(HUNT_DEBUG) warning(msg); 133 if(handler !is null) 134 handler(failedResult!(R3)(new DatabaseException(msg))); 135 } else { 136 SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder!(R1, R2, R3)(factory, handler); 137 138 CommandScheduler sc = cr.scheduler; 139 version(HUNT_DB_DEBUG) { 140 if(sc !is null) { 141 trace(typeid(cast(Object)sc)); 142 } 143 } 144 SqlClientBase!(C) client = cast(SqlClientBase!(C))sc; 145 assert(client is this); 146 147 scheduleThen!(bool)(new ExtendedQueryCommand!(R1)(ps, arguments, singleton, b), 148 (CommandResponse!bool r) { b.handle(r); } 149 ); 150 } 151 } else { 152 version(HUNT_DB_DEBUG) { 153 warning(cr.cause()); 154 } 155 handler(failedResult!(R3)(cr.cause())); 156 } 157 } 158 ); 159 160 return cast(C) this; 161 } 162 163 override 164 C preparedQuery(string sql, RowSetHandler handler) { 165 return preparedQuery(sql, ArrayTuple.EMPTY, handler); 166 } 167 168 // override 169 // <R> C preparedQuery(string sql, Collector<Row, ?, R> collector, Handler!(AsyncResult!(SqlResult!(R))) handler) { 170 // return preparedQuery(sql, ArrayTuple.EMPTY, collector, handler); 171 // } 172 173 override 174 C preparedBatch(string sql, List!(Tuple) batch, RowSetHandler handler) { 175 // return preparedBatch(sql, batch, false, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR, handler); 176 implementationMissing(false); 177 return null; 178 } 179 180 // override 181 // <R> C preparedBatch(string sql, List!(Tuple) batch, Collector<Row, ?, R> collector, Handler!(AsyncResult!(SqlResult!(R))) handler) { 182 // return preparedBatch(sql, batch, true, SqlResultImpl::new, collector, handler); 183 // } 184 185 // private <R1, R2 extends SqlResultBase!(R1, R2), R3 extends SqlResult!(R1)> C preparedBatch( 186 // string sql, 187 // List!(Tuple) batch, 188 // boolean singleton, 189 // Function!(R1, R2) factory, 190 // Collector<Row, ?, R1> collector, 191 // Handler!(AsyncResult!(R3)) handler) { 192 // schedule(new PrepareStatementCommand(sql), cr -> { 193 // if (cr.succeeded()) { 194 // PreparedStatement ps = cr.result(); 195 // for (Tuple args : batch) { 196 // string msg = ps.prepare((List!(Object)) args); 197 // if (msg !is null) { 198 // handler.handle(Future.failedFuture(msg)); 199 // return; 200 // } 201 // } 202 // SqlResultBuilder!(R1, R2, R3) b = new SqlResultBuilder<>(factory, handler); 203 // cr.scheduler.schedule(new ExtendedBatchQueryCommand<>( 204 // ps, 205 // batch, 206 // singleton, 207 // collector, 208 // b), b); 209 // } else { 210 // handler.handle(Future.failedFuture(cr.cause())); 211 // } 212 // }); 213 // return (C) this; 214 // } 215 216 protected void scheduleThen(R)(CommandBase!(R) cmd, ResponseHandler!R handler) { 217 cmd.handler = (cr) { 218 // Tx might be gone ??? 219 cr.scheduler = this; 220 handler(cr); 221 }; 222 schedule(cmd); 223 } 224 225 void schedule(ICommand cmd) { 226 throw new NotImplementedException(); 227 } 228 }