1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.driver.postgresql.impl.PostgreSQLConnectionFactory;
19 
20 import hunt.database.driver.postgresql.impl.codec.PgCodec;
21 import hunt.database.driver.postgresql.impl.PostgreSQLSocketConnection;
22 import hunt.database.driver.postgresql.PostgreSQLConnectOptions;
23 import hunt.database.driver.postgresql.SslMode;
24 
25 import hunt.database.base.AsyncResult;
26 import hunt.database.base.Common;
27 import hunt.database.base.impl.Connection;
28 import hunt.database.base.impl.command.CommandResponse;
29 
30 import hunt.collection.ArrayList;
31 import hunt.collection.HashMap;
32 import hunt.collection.Map;
33 import hunt.Exceptions;
34 import hunt.io.BufferUtils;
35 import hunt.io.channel;
36 import hunt.logging;
37 import hunt.Object;
38 
39 import hunt.net.AbstractConnection;
40 import hunt.net.Connection;
41 import hunt.net.NetClient;
42 import hunt.net.NetClientOptions;
43 import hunt.net.NetUtil;
44 
45 
46 /**
47  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
48  */
49 class PgConnectionFactory {
50 
51     // private ArrayList!NetClient clients;
52     private NetClientOptions _netClientOptions;
53     private bool registerCloseHook;
54     private string host;
55     private int port;
56     private SslMode sslMode;
57     // private TrustOptions trustOptions;
58     private string hostnameVerificationAlgorithm;
59     private string database;
60     private string username;
61     private string password;
62     private Map!(string, string) properties;
63     private bool cachePreparedStatements;
64     private int preparedStatementCacheSize;
65     private int preparedStatementCacheSqlLimit;
66     private int pipeliningLimit;
67     private bool isUsingDomainSocket;
68     // private Closeable hook;
69 
70     this(PgConnectOptions options) {
71         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-13T10:17:49+08:00
72         // remove clients
73         // clients = new ArrayList!NetClient(50);
74         _netClientOptions = new NetClientOptions(options);
75 
76         // Make sure ssl=false as we will use STARTLS
77         _netClientOptions.setSsl(false);
78 
79         this.sslMode = options.getSslMode();
80         this.hostnameVerificationAlgorithm = _netClientOptions.getHostnameVerificationAlgorithm();
81         // this.trustOptions = netClientOptions.getTrustOptions();
82         this.host = options.getHost();
83         this.port = options.getPort();
84         this.database = options.getDatabase();
85         this.username = options.getUser();
86         this.password = options.getPassword();
87         this.properties = new HashMap!(string, string)(options.getProperties());
88         this.cachePreparedStatements = options.getCachePreparedStatements();
89         this.pipeliningLimit = options.getPipeliningLimit();
90         this.preparedStatementCacheSize = options.getPreparedStatementCacheMaxSize();
91         this.preparedStatementCacheSqlLimit = options.getPreparedStatementCacheSqlLimit();
92         this.isUsingDomainSocket = options.isUsingDomainSocket();
93     }
94 
95     // // Called by hook
96     // private void close(AsyncVoidHandler completionHandler) {
97     //     close();
98     //     if(completionHandler !is null) {
99     //         completionHandler(null);
100     //     }
101     // }
102 
103     // void close() {
104     //     foreach(NetClient client; clients) {
105     //         if(client !is null) {
106     //         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-12T10:26:30+08:00
107     //         // crashed here
108     //             // warning(typeid(cast(Object)client));
109     //             client.close();
110     //         }
111     //     }
112     // }
113 
114     void connectAndInit(AsyncResultHandler!(DbConnection) completionHandler) {
115         connect( (ar) {
116             if (ar.succeeded()) {
117                 PgSocketConnection conn = ar.result();
118                 conn.initialization();
119                 conn.sendStartupMessage(username, password, database, properties, 
120                     (r) { 
121                         if(completionHandler !is null) completionHandler(r); 
122                     }
123                 );
124             } else if(completionHandler !is null) {
125                 completionHandler(failedResponse!(DbConnection)(ar.cause()));
126             } else {
127                 warning("do nothing");
128             }
129         });
130     }
131 
132     void connect(AsyncResultHandler!(PgSocketConnection) handler) {
133         doConnect(false, handler);
134         // switch (sslMode) {
135         //     case DISABLE:
136         //         doConnect(false, handler);
137         //         break;
138         //     case ALLOW:
139         //         doConnect(false, ar -> {
140         //             if (ar.succeeded()) {
141         //                 handler.handle(Future.succeededFuture(ar.result()));
142         //             } else {
143         //                 doConnect(true, handler);
144         //             }
145         //         });
146         //         break;
147         //     case PREFER:
148         //         doConnect(true, ar -> {
149         //             if (ar.succeeded()) {
150         //                 handler.handle(Future.succeededFuture(ar.result()));
151         //             } else {
152         //                 doConnect(false, handler);
153         //             }
154         //         });
155         //         break;
156         //     case VERIFY_FULL:
157         //         if (hostnameVerificationAlgorithm is null || hostnameVerificationAlgorithm.isEmpty()) {
158         //             handler.handle(Future.failedFuture(new IllegalArgumentException("Host verification algorithm must be specified under verify-full sslmode")));
159         //             return;
160         //         }
161         //     case VERIFY_CA:
162         //         if (trustOptions is null) {
163         //             handler.handle(Future.failedFuture(new IllegalArgumentException("Trust options must be specified under verify-full or verify-ca sslmode")));
164         //             return;
165         //         }
166         //     case REQUIRE:
167         //         doConnect(true, handler);
168         //         break;
169         //     default:
170         //         throw new IllegalArgumentException("Unsupported SSL mode");
171         // }
172     }
173 
174     private void doConnect(bool ssl, AsyncResultHandler!(PgSocketConnection) handler) {
175 
176         version(HUNT_DB_DEBUG) tracef("Creating a DB connection in %s...", _netClientOptions.getConnectTimeout);
177 
178         auto client = NetUtil.createNetClient(_netClientOptions);
179 
180         client.setHandler(new class NetConnectionHandler {
181 
182                 PgSocketConnection pgConn;
183 
184                 override void connectionOpened(Connection connection) {
185                     version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress());
186                     AbstractConnection ac = cast(AbstractConnection)connection;
187                     ac.setState(ConnectionState.Opened);
188 
189                     pgConn = newSocketConnection(ac);
190                     if(handler !is null) {
191                         try {
192                             handler(succeededResult(pgConn));
193                         } catch(Throwable ex) {
194                             version(HUNT_DB_DEBUG_MORE) warning(ex);
195                             handler(failedResult!(PgSocketConnection)(ex));
196                         }
197                     }
198                 }
199 
200                 override void connectionClosed(Connection connection) {
201                     version(HUNT_DB_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress());
202                     if(pgConn !is null)
203                         pgConn.handleClosed(connection);
204                 }
205 
206                 override DataHandleStatus messageReceived(Connection connection, Object message) {
207                     DataHandleStatus resultStatus = DataHandleStatus.Done;
208                     version(HUNT_DB_DEBUG_MORE) tracef("message type: %s", typeid(message).name);
209                     try {
210                         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-01-26T14:46:35+08:00
211                         // 
212                         pgConn.handleMessage(connection, message);
213                     } catch(Throwable t) {
214                         exceptionCaught(connection, t);
215                     }
216 
217                     return resultStatus;
218                 }
219 
220                 override void exceptionCaught(Connection connection, Throwable t) {
221                     version(HUNT_DEBUG) warning(t.msg);
222                     version(HUNT_DB_DEBUG) warning(t);
223                     if(pgConn !is null) {
224                         pgConn.handleException(connection, t);
225                     }
226                     if(handler !is null) {
227                         handler(failedResult!(PgSocketConnection)(t));
228                     }
229                 }
230 
231                 override void failedOpeningConnection(int connectionId, Throwable t) {
232                     warning(t);
233 
234                     handler(failedResult!(PgSocketConnection)(t));
235                     client.close(); 
236                 }
237 
238                 override void failedAcceptingConnection(int connectionId, Throwable t) {
239                     warning(t);
240                     handler(failedResult!(PgSocketConnection)(t));
241                 }
242             });        
243 
244 
245         version(HUNT_DB_DEBUG) {
246             trace("Setting PostgreSQL codec");
247         }
248         client.setCodec(new PgCodec());
249 
250         try {
251             client.connect(host, port);
252             // clients.add(client);
253         } catch (Throwable e) {
254             // Client is closed
255             version(HUNT_DEBUG) {
256                 warning(e.message);
257             } else {
258                 warning(e);
259             }
260             
261             if(handler !is null) {
262                 handler(failedResult!PgSocketConnection(e));
263             }
264         }
265     }
266 
267     private PgSocketConnection newSocketConnection(AbstractConnection socket) {
268         return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, 
269                 preparedStatementCacheSqlLimit, pipeliningLimit);
270     }
271 }