1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.driver.postgresql.impl.PostgreSQLConnectionImpl;
18 
19 import hunt.database.driver.postgresql.impl.PostgreSQLConnectionFactory;
20 import hunt.database.driver.postgresql.impl.PostgreSQLSocketConnection;
21 
22 import hunt.database.driver.postgresql.PostgreSQLConnectOptions;
23 import hunt.database.driver.postgresql.PostgreSQLConnection;
24 import hunt.database.driver.postgresql.PostgreSQLNotification;
25 import hunt.database.driver.postgresql.PgUtil;
26 
27 import hunt.database.base.AsyncResult;
28 import hunt.database.base.Common;
29 import hunt.database.base.impl.Connection;
30 import hunt.database.base.impl.command.CommandResponse;
31 import hunt.database.base.impl.command.PrepareStatementCommand;
32 import hunt.database.base.impl.NamedQueryDesc;
33 import hunt.database.base.impl.SqlConnectionImpl;
34 import hunt.database.base.SqlResult;
35 import hunt.database.base.RowSet;
36 import hunt.database.base.Row;
37 import hunt.database.base.SqlClient;
38 import hunt.database.base.Tuple;
39 
40 import hunt.collection.List;
41 import hunt.concurrency.Future;
42 import hunt.concurrency.FuturePromise;
43 import hunt.Exceptions;
44 import hunt.logging;
45 import hunt.util.StringBuilder;
46 
47 
48 alias PgNamedQueryDesc = NamedQueryDesc!("$", true);
49 
50 /**
51  * 
52  */
53 class PgConnectionImpl : SqlConnectionImpl!(PgConnectionImpl), PgConnection  {
54 
55     private PgConnectionFactory factory;
56     private PgNotificationHandler _notificationHandler;
57     private PgSocketConnection _socketConn;
58 
59     this(PgConnectionFactory factory, DbConnection conn) {
60         super(conn);
61         _socketConn = cast(PgSocketConnection)conn;
62 
63         this.factory = factory;
64     }
65 
66     override PgConnectionImpl query(string sql, RowSetHandler handler) {
67         return super.query(sql, handler);
68     }
69 
70     override PgConnectionImpl preparedQuery(string sql, RowSetHandler handler) {
71         return super.preparedQuery(sql, handler);
72     }
73 
74     override PgConnectionImpl preparedQuery(string sql, Tuple arguments, RowSetHandler handler) {
75         return super.preparedQuery(sql, arguments, handler);
76     }
77 
78     override PgConnectionImpl preparedBatch(string sql, List!(Tuple) batch, RowSetHandler handler) {
79         return super.preparedBatch(sql, batch, handler);
80     }
81 
82     override
83     PgConnection notificationHandler(PgNotificationHandler handler) {
84         _notificationHandler = handler;
85         return this;
86     }
87 
88     override
89     void handleNotification(int processId, string channel, string payload) {
90         PgNotificationHandler handler = _notificationHandler;
91         if (handler !is null) {
92             handler(new PgNotification().setProcessId(processId).setChannel(channel).setPayload(payload));
93         }
94     }
95 
96     override
97     int processId() {
98         return conn.getProcessId();
99     }
100 
101     override
102     int secretKey() {
103         return conn.getSecretKey();
104     }
105 
106     override
107     PgConnection cancelRequest(hunt.database.base.Common.VoidHandler handler) {
108         implementationMissing(false);
109         // Context current = Vertx.currentContext();
110         // if (current == context) {
111         //     factory.connect(ar -> {
112         //         if (ar.succeeded()) {
113         //             PgSocketConnection conn = ar.result();
114         //             conn.sendCancelRequestMessage(this.processId(), this.secretKey(), handler);
115         //         } else {
116         //             handler.handle(Future.failedFuture(ar.cause()));
117         //         }
118         //     });
119         // } else {
120         //     context.runOnContext(v -> cancelRequest(handler));
121         // }
122         return this;
123     }
124 
125     // override protected AbstractNamedQueryDesc getNamedQueryDesc(string sql) {
126     //     return new PgNamedQueryDesc(sql);
127     // }
128 
129     Future!NamedQuery prepareNamedQueryAsync(string sql) {
130         version(HUNT_DB_DEBUG) trace(sql);
131         auto f = new FuturePromise!NamedQuery();
132         AbstractNamedQueryDesc queryDesc = new PgNamedQueryDesc(sql);
133 
134         scheduleThen!(PreparedStatement)(new PrepareStatementCommand(queryDesc.getSql()), 
135             (CommandResponse!PreparedStatement ar) {
136                 if (ar.succeeded()) {
137                     NamedQueryImpl queryImpl = new PgNamedQueryImpl(conn, ar.result(), queryDesc);
138                     f.succeeded(queryImpl);
139                 } else {
140                     f.failed(ar.cause()); 
141                 }
142             }
143         );
144         
145         return f;
146     }
147 
148     NamedQuery prepareNamedQuery(string sql) {
149         auto f = prepareNamedQueryAsync(sql);
150         version(HUNT_DEBUG) warning("try to get a result");
151         return f.get(awaittingTimeout);
152     }       
153 
154     string escapeIdentifier(string identifier) {
155         return PgUtil.escapeIdentifier(null, identifier).toString();
156     }
157 
158     string escapeLiteral(string literal) {
159         scope StringBuilder sb = new StringBuilder((cast(int)literal.length + 10) / 10 * 11); // Add 10% for escaping.
160         PgUtil.escapeLiteral(sb, literal, _socketConn.getStandardConformingStrings());
161         return sb.toString();
162     } 
163 
164 
165     /* ----------------------------- Static Metholds ---------------------------- */
166 
167     static void connect(PgConnectOptions options, AsyncResultHandler!(PgConnection) handler) {
168         PgConnectionFactory client = new PgConnectionFactory(options);
169         version(HUNT_DB_DEBUG) trace("connecting ...");
170         client.connectAndInit( (AsyncResult!(DbConnection) ar) {
171             version(HUNT_DB_DEBUG) info("connection result: ", ar.succeeded());
172             if (ar.succeeded()) {
173                 DbConnection conn = ar.result();
174                 PgConnectionImpl p = new PgConnectionImpl(client, conn);
175                 conn.initHolder(p);
176                 if(handler !is null) {
177                     handler(succeededResult!(PgConnection)(p));
178                 }
179             } else if(handler !is null) {
180                 handler(failedResult!(PgConnection)(ar.cause()));
181             }
182         });
183     }
184 
185 }
186 
187 
188 
189 import hunt.database.base.impl.NamedQueryDesc;
190 import hunt.database.base.impl.NamedQueryImpl;
191 import hunt.database.base.impl.PreparedQueryImpl;
192 import hunt.database.base.impl.RowDesc;
193 
194 import hunt.database.base.impl.ArrayTuple;
195 import hunt.database.base.impl.Connection;
196 import hunt.database.base.impl.PreparedStatement;
197 
198 import hunt.database.base.PreparedQuery;
199 import hunt.database.base.RowSet;
200 import std.variant;
201 
202 class PgNamedQueryImpl : NamedQueryImpl {
203 
204     this(DbConnection conn, PreparedStatement ps, AbstractNamedQueryDesc queryDesc) {
205         super(conn, ps, queryDesc);
206     }
207 
208     void setParameter(string name, Variant value) {
209         version(HUNT_DEBUG) {
210             auto itemPtr = name in _parameters;
211             if(itemPtr !is null) {
212                 warning("% will be overwrited with %s", name, value.toString());
213             }
214         }
215 
216 
217         // TODO: Tasks pending completion -@zhangxueping at 2019-10-01T13:35:23+08:00
218         // validate the type of parameter
219         // hunt.database.driver.mysql.impl.codec.ColumnDefinition;
220 
221         // RowDesc rowDesc = getPreparedStatement().rowDesc();
222         // warning(rowDesc.toString());
223 
224         _parameters[name] = value;
225     }
226 
227 }