1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.driver.postgresql.impl.PostgreSQLSocketConnection;
19 
20 import hunt.database.driver.postgresql.impl.codec.PgCodec;
21 
22 import hunt.database.base.AsyncResult;
23 import hunt.database.base.Common;
24 import hunt.database.base.impl.Connection;
25 import hunt.database.base.impl.SocketConnectionBase;
26 import hunt.database.base.impl.command.CommandResponse;
27 import hunt.database.base.impl.command.InitCommand;
28 
29 import hunt.io.ByteBuffer;
30 import hunt.io.BufferUtils;
31 import hunt.collection.Map;
32 import hunt.Exceptions;
33 import hunt.logging;
34 import hunt.net.AbstractConnection;
35 import hunt.net.Exceptions;
36 import hunt.util.Common;
37 
38 /**
39  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
40  */
41 class PgSocketConnection : SocketConnectionBase {
42 
43     // private PgCodec codec;
44     int processId;
45     int secretKey;
46 
47     // default value for server versions that don't report standard_conforming_strings
48     private bool _standardConformingStrings = false;
49 
50     this(AbstractConnection socket,
51             bool cachePreparedStatements,
52             int preparedStatementCacheSize,
53             int preparedStatementCacheSqlLimit,
54             int pipeliningLimit) {
55                 
56         super(socket, cachePreparedStatements, preparedStatementCacheSize, 
57                 preparedStatementCacheSqlLimit, pipeliningLimit);
58     }
59 
60     // override
61     // void initialization() {
62     //     codec = new PgCodec();
63     //     version(HUNT_DEBUG) {
64     //         trace("Setting DB codec");
65     //     }
66     //     socket().setCodec(codec);
67     //     super.initialization();
68     // }
69 
70     void sendStartupMessage(string username, string password, string database, Map!(string, string) properties,
71             ResponseHandler!(DbConnection) completionHandler) {
72         InitCommand cmd = new InitCommand(this, username, password, database, properties);
73         cmd.handler = completionHandler;
74         version(HUNT_DEBUG) {
75             trace("Sending InitCommand");
76         }
77         schedule(cmd);
78     }
79 
80     void sendCancelRequestMessage(int processId, int secretKey, Callback handler) {
81         ByteBuffer buffer = BufferUtils.allocate(16);
82         buffer.putInt(16);
83         // cancel request code
84         buffer.putInt(80877102);
85         buffer.putInt(processId);
86         buffer.putInt(secretKey);
87 
88         socket.write(buffer, new class Callback {
89 
90             void succeeded() {
91                 // directly close this connection
92                 if (status == Status.CONNECTED) {
93                     status = Status.CLOSING;
94                     socket.close();
95                 }
96                 handler.succeeded();
97             }
98 
99             void failed(Exception x) {
100                 // handler(Future.failedFuture(ar.cause()));
101                 handler.failed(x);
102             }
103 
104             bool isNonBlocking() {
105                 return true;
106             }
107         });
108     }
109 
110     override
111     int getProcessId() {
112         return processId;
113     }
114 
115     override
116     int getSecretKey() {
117         return secretKey;
118     }
119 
120     void setStandardConformingStrings(bool value) {
121         _standardConformingStrings = value;
122     }
123 
124     bool getStandardConformingStrings() {
125         return _standardConformingStrings;
126     }
127 
128     void upgradeToSSLConnection(Callback completionHandler) {
129         // ChannelPipeline pipeline = socket.channelHandlerContext().pipeline();
130         // Promise!(Void) upgradePromise = Promise.promise();
131         // upgradePromise.future().setHandler(ar->{
132         //     if (ar.succeeded()) {
133         //         completionHandler.handle(Future.succeededFuture());
134         //     } else {
135         //         Throwable cause = ar.cause();
136         //         if (cause instanceof DecoderException) {
137         //             DecoderException err = (DecoderException) cause;
138         //             cause = err.getCause();
139         //         }
140         //         completionHandler.handle(Future.failedFuture(cause));
141         //     }
142         // });
143         // pipeline.addBefore("handler", "initiate-ssl-handler", new InitiateSslHandler(this, upgradePromise));
144         // TODO: Tasks pending completion -@zxp at 8/14/2019, 11:42:27 AM
145         // 
146         implementationMissing(false);
147     }
148 
149 }