1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.driver.postgresql.impl.PostgreSQLConnectionImpl; 18 19 import hunt.database.driver.postgresql.impl.PostgreSQLConnectionFactory; 20 import hunt.database.driver.postgresql.impl.PostgreSQLSocketConnection; 21 22 import hunt.database.driver.postgresql.PostgreSQLConnectOptions; 23 import hunt.database.driver.postgresql.PostgreSQLConnection; 24 import hunt.database.driver.postgresql.PostgreSQLNotification; 25 import hunt.database.driver.postgresql.PgUtil; 26 27 import hunt.database.base.AsyncResult; 28 import hunt.database.base.Common; 29 import hunt.database.base.impl.Connection; 30 import hunt.database.base.impl.command.CommandResponse; 31 import hunt.database.base.impl.command.PrepareStatementCommand; 32 import hunt.database.base.impl.NamedQueryDesc; 33 import hunt.database.base.impl.SqlConnectionImpl; 34 import hunt.database.base.SqlResult; 35 import hunt.database.base.RowSet; 36 import hunt.database.base.Row; 37 import hunt.database.base.SqlClient; 38 import hunt.database.base.Tuple; 39 40 import hunt.collection.List; 41 import hunt.concurrency.Future; 42 import hunt.concurrency.FuturePromise; 43 import hunt.Exceptions; 44 import hunt.logging; 45 import hunt.util.StringBuilder; 46 47 48 alias PgNamedQueryDesc = NamedQueryDesc!("$", true); 49 50 /** 51 * 52 */ 53 class PgConnectionImpl : SqlConnectionImpl!(PgConnectionImpl), PgConnection { 54 55 private PgConnectionFactory factory; 56 private PgNotificationHandler _notificationHandler; 57 private PgSocketConnection _socketConn; 58 59 this(PgConnectionFactory factory, DbConnection conn) { 60 super(conn); 61 _socketConn = cast(PgSocketConnection)conn; 62 63 this.factory = factory; 64 } 65 66 override PgConnectionImpl query(string sql, RowSetHandler handler) { 67 return super.query(sql, handler); 68 } 69 70 override PgConnectionImpl preparedQuery(string sql, RowSetHandler handler) { 71 return super.preparedQuery(sql, handler); 72 } 73 74 override PgConnectionImpl preparedQuery(string sql, Tuple arguments, RowSetHandler handler) { 75 return super.preparedQuery(sql, arguments, handler); 76 } 77 78 override PgConnectionImpl preparedBatch(string sql, List!(Tuple) batch, RowSetHandler handler) { 79 return super.preparedBatch(sql, batch, handler); 80 } 81 82 override 83 PgConnection notificationHandler(PgNotificationHandler handler) { 84 _notificationHandler = handler; 85 return this; 86 } 87 88 override 89 void handleNotification(int processId, string channel, string payload) { 90 PgNotificationHandler handler = _notificationHandler; 91 if (handler !is null) { 92 handler(new PgNotification().setProcessId(processId).setChannel(channel).setPayload(payload)); 93 } 94 } 95 96 override 97 int processId() { 98 return conn.getProcessId(); 99 } 100 101 override 102 int secretKey() { 103 return conn.getSecretKey(); 104 } 105 106 override 107 PgConnection cancelRequest(hunt.database.base.Common.VoidHandler handler) { 108 implementationMissing(false); 109 // Context current = Vertx.currentContext(); 110 // if (current == context) { 111 // factory.connect(ar -> { 112 // if (ar.succeeded()) { 113 // PgSocketConnection conn = ar.result(); 114 // conn.sendCancelRequestMessage(this.processId(), this.secretKey(), handler); 115 // } else { 116 // handler.handle(Future.failedFuture(ar.cause())); 117 // } 118 // }); 119 // } else { 120 // context.runOnContext(v -> cancelRequest(handler)); 121 // } 122 return this; 123 } 124 125 // override protected AbstractNamedQueryDesc getNamedQueryDesc(string sql) { 126 // return new PgNamedQueryDesc(sql); 127 // } 128 129 Future!NamedQuery prepareNamedQueryAsync(string sql) { 130 version(HUNT_DB_DEBUG) trace(sql); 131 auto f = new FuturePromise!NamedQuery(); 132 AbstractNamedQueryDesc queryDesc = new PgNamedQueryDesc(sql); 133 134 scheduleThen!(PreparedStatement)(new PrepareStatementCommand(queryDesc.getSql()), 135 (CommandResponse!PreparedStatement ar) { 136 if (ar.succeeded()) { 137 NamedQueryImpl queryImpl = new PgNamedQueryImpl(conn, ar.result(), queryDesc); 138 f.succeeded(queryImpl); 139 } else { 140 f.failed(ar.cause()); 141 } 142 } 143 ); 144 145 return f; 146 } 147 148 NamedQuery prepareNamedQuery(string sql) { 149 auto f = prepareNamedQueryAsync(sql); 150 version(HUNT_DEBUG) warning("try to get a result"); 151 return f.get(awaittingTimeout); 152 } 153 154 string escapeIdentifier(string identifier) { 155 return PgUtil.escapeIdentifier(null, identifier).toString(); 156 } 157 158 string escapeLiteral(string literal) { 159 scope StringBuilder sb = new StringBuilder((cast(int)literal.length + 10) / 10 * 11); // Add 10% for escaping. 160 PgUtil.escapeLiteral(sb, literal, _socketConn.getStandardConformingStrings()); 161 return sb.toString(); 162 } 163 164 165 /* ----------------------------- Static Metholds ---------------------------- */ 166 167 static void connect(PgConnectOptions options, AsyncResultHandler!(PgConnection) handler) { 168 PgConnectionFactory client = new PgConnectionFactory(options); 169 version(HUNT_DB_DEBUG) trace("connecting ..."); 170 client.connectAndInit( (AsyncResult!(DbConnection) ar) { 171 version(HUNT_DB_DEBUG) info("connection result: ", ar.succeeded()); 172 if (ar.succeeded()) { 173 DbConnection conn = ar.result(); 174 PgConnectionImpl p = new PgConnectionImpl(client, conn); 175 conn.initHolder(p); 176 if(handler !is null) { 177 handler(succeededResult!(PgConnection)(p)); 178 } 179 } else if(handler !is null) { 180 handler(failedResult!(PgConnection)(ar.cause())); 181 } 182 }); 183 } 184 185 } 186 187 188 189 import hunt.database.base.impl.NamedQueryDesc; 190 import hunt.database.base.impl.NamedQueryImpl; 191 import hunt.database.base.impl.PreparedQueryImpl; 192 import hunt.database.base.impl.RowDesc; 193 194 import hunt.database.base.impl.ArrayTuple; 195 import hunt.database.base.impl.Connection; 196 import hunt.database.base.impl.PreparedStatement; 197 198 import hunt.database.base.PreparedQuery; 199 import hunt.database.base.RowSet; 200 import std.variant; 201 202 class PgNamedQueryImpl : NamedQueryImpl { 203 204 this(DbConnection conn, PreparedStatement ps, AbstractNamedQueryDesc queryDesc) { 205 super(conn, ps, queryDesc); 206 } 207 208 void setParameter(string name, Variant value) { 209 version(HUNT_DEBUG) { 210 auto itemPtr = name in _parameters; 211 if(itemPtr !is null) { 212 warning("% will be overwrited with %s", name, value.toString()); 213 } 214 } 215 216 217 // TODO: Tasks pending completion -@zhangxueping at 2019-10-01T13:35:23+08:00 218 // validate the type of parameter 219 // hunt.database.driver.mysql.impl.codec.ColumnDefinition; 220 221 // RowDesc rowDesc = getPreparedStatement().rowDesc(); 222 // warning(rowDesc.toString()); 223 224 _parameters[name] = value; 225 } 226 227 }