1 /*
2  * Copyright (C) 2018 Julien Viet
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.driver.postgresql.impl.codec.PgEncoder;
18 
19 import hunt.database.driver.postgresql.impl.codec.Bind;
20 import hunt.database.driver.postgresql.impl.codec.CloseConnectionCommandCodec;
21 import hunt.database.driver.postgresql.impl.codec.ClosePortalCommandCodec;
22 import hunt.database.driver.postgresql.impl.codec.CloseStatementCommandCodec;
23 import hunt.database.driver.postgresql.impl.codec.DataTypeCodec;
24 import hunt.database.driver.postgresql.impl.codec.DataFormat;
25 import hunt.database.driver.postgresql.impl.codec.DataType;
26 import hunt.database.driver.postgresql.impl.codec.DataTypeDesc;
27 import hunt.database.driver.postgresql.impl.codec.Describe;
28 import hunt.database.driver.postgresql.impl.codec.ExtendedBatchQueryCommandCodec;
29 import hunt.database.driver.postgresql.impl.codec.ExtendedQueryCommandCodec;
30 import hunt.database.driver.postgresql.impl.codec.InitCommandCodec;
31 import hunt.database.driver.postgresql.impl.codec.PgColumnDesc;
32 import hunt.database.driver.postgresql.impl.codec.CommandCodec;
33 import hunt.database.driver.postgresql.impl.codec.PgDecoder;
34 import hunt.database.driver.postgresql.impl.codec.PrepareStatementCommandCodec;
35 import hunt.database.driver.postgresql.impl.codec.Query;
36 import hunt.database.driver.postgresql.impl.codec.QueryCommandBaseCodec;
37 import hunt.database.driver.postgresql.impl.codec.Parse;
38 import hunt.database.driver.postgresql.impl.codec.PasswordMessage;
39 import hunt.database.driver.postgresql.impl.codec.Response;
40 import hunt.database.driver.postgresql.impl.codec.SimpleQueryCommandCodec;
41 import hunt.database.driver.postgresql.impl.codec.StartupMessage;
42 import hunt.database.base.Util;
43 
44 import hunt.database.base.AsyncResult;
45 import hunt.database.base.Exceptions;
46 import hunt.database.base.impl.Connection;
47 import hunt.database.base.impl.ParamDesc;
48 import hunt.database.base.impl.RowDesc;
49 import hunt.database.base.impl.TxStatus;
50 import hunt.database.base.impl.command;
51 import hunt.database.base.RowSet;
52 
53 
54 import hunt.collection.ArrayDeque;
55 import hunt.io.ByteBuffer;
56 import hunt.collection.List;
57 import hunt.collection.Map;
58 import hunt.Exceptions;
59 import hunt.logging;
60 import hunt.net.buffer;
61 import hunt.net.codec.Encoder;
62 import hunt.net.Connection;
63 import hunt.text.Charset;
64 
65 alias writeCString = Util.writeCString;
66 
67 import std.container.dlist;
68 import std.range;
69 import std.variant;
70 
71 /**
72  * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
73  * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
74  */
75 final class PgEncoder : EncoderChain {
76 
77     // Frontend message types for {@link pgclient.impl.codec.encoder.MessageEncoder}
78 
79     private enum byte PASSWORD_MESSAGE = 'p';
80     private enum byte QUERY = 'Q';
81     private enum byte TERMINATE = 'X';
82     private enum byte PARSE = 'P';
83     private enum byte BIND = 'B';
84     private enum byte DESCRIBE = 'D';
85     private enum byte EXECUTE = 'E';
86     private enum byte CLOSE = 'C';
87     private enum byte SYNC = 'S';
88 
89     // private ArrayDeque<CommandCodec<?, ?>> inflight;
90     private DList!(CommandCodecBase) *inflight;
91     private Connection ctx;
92     private ByteBuf outBuffer;
93     private PgDecoder dec;
94 
95     this(PgDecoder dec, ref DList!(CommandCodecBase) inflight) {
96         this.inflight = &inflight;
97         this.dec = dec;
98     }
99 
100     override void encode(Object message, Connection connection) {
101         ctx = connection;
102 
103         ICommand cmd = cast(ICommand)message;
104         if(cmd is null) {
105             warningf("The message is not a ICommand: %s", typeid(message));
106         }
107 
108         version(HUNT_DB_DEBUG_MORE) 
109         tracef("encoding a message: %s", typeid(message));
110 
111         CommandCodecBase cmdCodec = wrap(cmd);
112 
113         cmdCodec.completionHandler = (ICommandResponse resp) {
114             version(HUNT_DB_DEBUG) {
115                 tracef("message encoding completed");
116                 // CommandCodecBase c = inflight.front();
117                 // assert(cmdCodec is c);
118                 if(resp.failed()) {
119                     Throwable th = resp.cause();
120                     warningf("Response error: %s", th.msg);
121                 }
122             }
123             version(HUNT_DB_DEBUG_MORE)  tracef("%s", typeid(cast(Object)resp));
124             inflight.removeFront();
125 
126             if(!resp.isCommandAttatched()) {
127                 // infof("No command attatched for %s", typeid(cast(Object)cmdCodec));
128                 resp.attachCommand(cmdCodec.getCommand());
129             }
130 
131             NetConnectionHandler handler = ctx.getHandler();
132             if(handler !is null) handler.messageReceived(ctx, cast(Object)resp);
133         };
134 
135         inflight.insertBack(cmdCodec);
136         cmdCodec.encode(this);
137         flush();
138 	}
139 
140     private CommandCodecBase wrap(ICommand cmd) {
141         InitCommand initCommand = cast(InitCommand) cmd;
142         if (initCommand !is null) {
143             return new InitCommandCodec(initCommand);
144         }
145 
146         SimpleQueryCommand!(RowSet) simpleCommand = cast(SimpleQueryCommand!(RowSet))cmd;
147         if(simpleCommand !is null) {
148             return new SimpleQueryCommandCodec!RowSet(simpleCommand);
149         }
150 
151         PrepareStatementCommand prepareCommand = cast(PrepareStatementCommand)cmd;
152         if(prepareCommand !is null) {
153             return new PrepareStatementCommandCodec(prepareCommand);
154         }
155 
156         ExtendedQueryCommand!RowSet extendedCommand = cast(ExtendedQueryCommand!RowSet)cmd;
157         if(extendedCommand !is null) {
158             return new ExtendedQueryCommandCodec!RowSet(extendedCommand);
159         }
160 
161         ExtendedBatchQueryCommand!RowSet batchQueryCommand = cast(ExtendedBatchQueryCommand!RowSet)cmd;
162         if(batchQueryCommand !is null) {
163             return new ExtendedBatchQueryCommandCodec!RowSet(batchQueryCommand);
164         }
165 
166         CloseConnectionCommand connCommand = cast(CloseConnectionCommand)cmd;
167         if(connCommand !is null) {
168             return CloseConnectionCommandCodec.INSTANCE();
169         }
170 
171         CloseCursorCommand cursorCommand = cast(CloseCursorCommand)cmd;
172         if(cursorCommand !is null) {
173             return new ClosePortalCommandCodec(cursorCommand);
174         }
175 
176         CloseStatementCommand statementCommand = cast(CloseStatementCommand)cmd;
177         if(statementCommand !is null) {
178             return new CloseStatementCommandCodec(statementCommand);
179         }
180 
181         throw new AssertionError();
182     }
183 
184     // override
185     // void handlerAdded(Connection ctx) {
186     //     // TODO: Tasks pending completion -@zxp at 8/22/2019, 5:50:54 PM
187     //     // 
188     //     this.ctx = ctx;
189     // }
190 
191     void flush() {
192         version(HUNT_DB_DEBUG_MORE) trace("flushing ...");
193 
194         if(ctx is null) {
195             warning("ctx is null");
196             return ;
197         }
198 
199         if (outBuffer !is null) {
200             ByteBuf buff = outBuffer;
201             outBuffer = null;
202             byte[] avaliableData = buff.getReadableBytes();
203             // version(HUNT_DB_DEBUG_MORE) {
204             //     tracef("buffer: %s", buff.toString());
205             //     // tracef("buffer data: %s", cast(string)avaliableData);
206             // }
207             ctx.write(cast(const(ubyte)[])avaliableData);
208         }
209     }
210 
211     /**
212      * This message immediately closes the connection. On receipt of this message,
213      * the backend closes the connection and terminates.
214      */
215     void writeTerminate() {
216         ensureBuffer();
217         outBuffer.writeByte(TERMINATE);
218         outBuffer.writeInt(4);
219     }
220 
221     /**
222      * <p>
223      * The purpose of this message is to provide a resynchronization point for error recovery.
224      * When an error is detected while processing any extended-query message, the backend issues {@link ErrorResponse},
225      * then reads and discards messages until this message is reached, then issues {@link ReadyForQuery} and returns to normal
226      * message processing.
227      * <p>
228      * Note that no skipping occurs if an error is detected while processing this message which ensures that there is one
229      * and only one {@link ReadyForQuery} sent for each of this message.
230      * <p>
231      * Note this message does not cause a transaction block opened with BEGIN to be closed. It is possible to detect this
232      * situation in {@link ReadyForQuery#txStatus()} that includes {@link TxStatus} information.
233      */
234     void writeSync() {
235         ensureBuffer();
236         outBuffer.writeByte(SYNC);
237         outBuffer.writeInt(4);
238     }
239 
240     /**
241      * <p>
242      * The message closes an existing prepared statement or portal and releases resources.
243      * Note that closing a prepared statement implicitly closes any open portals that were constructed from that statement.
244      * <p>
245      * The response is either {@link CloseComplete} or {@link ErrorResponse}
246      *
247      * @param portal
248      */
249     void writeClosePortal(string portal) {
250         ensureBuffer();
251         int pos = outBuffer.writerIndex();
252         outBuffer.writeByte(CLOSE);
253         outBuffer.writeInt(0);
254         outBuffer.writeByte('P'); // 'S' to close a prepared statement or 'P' to close a portal
255         Util.writeCStringUTF8(outBuffer, portal);
256         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1);
257     }
258 
259     void writeStartupMessage(StartupMessage msg) {
260         ensureBuffer();
261 
262         int pos = outBuffer.writerIndex();
263 
264         outBuffer.writeInt(0);
265         // protocol version
266         outBuffer.writeShort(3);
267         outBuffer.writeShort(0);
268 
269         writeCString(outBuffer, StartupMessage.BUFF_USER);
270         Util.writeCStringUTF8(outBuffer, msg.username);
271         writeCString(outBuffer, StartupMessage.BUFF_DATABASE);
272         Util.writeCStringUTF8(outBuffer, msg.database);
273         foreach (MapEntry!(string, string) property ; msg.properties) {
274             writeCString(outBuffer, property.getKey(), StandardCharsets.UTF_8);
275             writeCString(outBuffer, property.getValue(), StandardCharsets.UTF_8);
276         }
277 
278         outBuffer.writeByte(0);
279         outBuffer.setInt(pos, outBuffer.writerIndex() - pos);
280     }
281 
282     void writePasswordMessage(PasswordMessage msg) {
283         ensureBuffer();
284         int pos = outBuffer.writerIndex();
285         outBuffer.writeByte(PASSWORD_MESSAGE);
286         outBuffer.writeInt(0);
287         Util.writeCStringUTF8(outBuffer, msg.hash);
288         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos- 1);
289     }
290 
291     /**
292      * <p>
293      * This message includes an SQL command (or commands) expressed as a text string.
294      * <p>
295      * The possible response messages from the backend are
296      * {@link CommandComplete}, {@link RowDesc}, {@link DataRow}, {@link EmptyQueryResponse}, {@link ErrorResponse},
297      * {@link ReadyForQuery} and {@link NoticeResponse}
298      */
299     void writeQuery(Query query) {
300         ensureBuffer();
301         int pos = outBuffer.writerIndex();
302         outBuffer.writeByte(QUERY);
303         outBuffer.writeInt(0);
304         Util.writeCStringUTF8(outBuffer, query.sql);
305         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1);
306     }
307 
308     /**
309      * <p>
310      * The message that using "statement" variant specifies the name of an existing prepared statement.
311      * <p>
312      * The response is a {@link ParamDesc} message describing the parameters needed by the statement,
313      * followed by a {@link RowDesc} message describing the rows that will be returned when the statement is eventually
314      * executed or a {@link NoData} message if the statement will not return rows.
315      * {@link ErrorResponse} is issued if there is no such prepared statement.
316      * <p>
317      * Note that since {@link Bind} has not yet been issued, the formats to be used for returned columns are not yet known to
318      * the backend; the format code fields in the {@link RowDesc} message will be zeroes in this case.
319      * <p>
320      * The message that using "portal" variant specifies the name of an existing portal.
321      * <p>
322      * The response is a {@link RowDesc} message describing the rows that will be returned by executing the portal;
323      * or a {@link NoData} message if the portal does not contain a query that will return rows; or {@link ErrorResponse}
324      * if there is no such portal.
325      */
326     void writeDescribe(Describe describe) {
327         ensureBuffer();
328         int pos = outBuffer.writerIndex();
329         outBuffer.writeByte(DESCRIBE);
330         outBuffer.writeInt(0);
331         if (describe.statement != 0) {
332             outBuffer.writeByte('S');
333             outBuffer.writeLong(describe.statement);
334         } else if (describe.portal !is null) {
335             outBuffer.writeByte('P');
336             Util.writeCStringUTF8(outBuffer, describe.portal);
337         } else {
338             outBuffer.writeByte('S');
339             Util.writeCStringUTF8(outBuffer, "");
340         }
341         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos- 1);
342     }
343 
344     /**
345      * <p>
346      * The message contains a textual SQL query string.
347      * <p>
348      * The response is either {@link ParseComplete} or {@link ErrorResponse}
349      */
350     void writeParse(Parse parse) {
351         ensureBuffer();
352         int pos = outBuffer.writerIndex();
353         outBuffer.writeByte(PARSE);
354         outBuffer.writeInt(0);
355         if (parse.statement == 0) {
356             outBuffer.writeByte(0);
357         } else {
358             outBuffer.writeLong(parse.statement);
359         }
360         Util.writeCStringUTF8(outBuffer, parse.query);
361         // no parameter data types (OIDs)
362         // if(paramDataTypes is null) {
363         outBuffer.writeShort(0);
364         // } else {
365         //   // Parameter data types (OIDs)
366         //   outBuffer.writeShort(paramDataTypes.length);
367         //   for (int paramDataType : paramDataTypes) {
368         //     outBuffer.writeInt(paramDataType);
369         //   }
370         // }
371         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1);
372     }
373 
374     /**
375      * The message specifies the portal and a maximum row count (zero meaning "fetch all rows") of the result.
376      * <p>
377      * The row count of the result is only meaningful for portals containing commands that return row sets;
378      * in other cases the command is always executed to completion, and the row count of the result is ignored.
379      * <p>
380      * The possible responses to this message are the same as {@link Query} message, except that
381      * it doesn't cause {@link ReadyForQuery} or {@link RowDesc} to be issued.
382      * <p>
383      * If Execute terminates before completing the execution of a portal, it will send a {@link PortalSuspended} message;
384      * the appearance of this message tells the frontend that another Execute should be issued against the same portal to
385      * complete the operation. The {@link CommandComplete} message indicating completion of the source SQL command
386      * is not sent until the portal's execution is completed. Therefore, This message is always terminated by
387      * the appearance of exactly one of these messages: {@link CommandComplete},
388      * {@link EmptyQueryResponse}, {@link ErrorResponse} or {@link PortalSuspended}.
389      *
390      * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
391      */
392     void writeExecute(string portal, int rowCount) {
393         ensureBuffer();
394         int pos = outBuffer.writerIndex();
395         outBuffer.writeByte(EXECUTE);
396         outBuffer.writeInt(0);
397         if (portal !is null) {
398             outBuffer.writeCharSequence(portal, StandardCharsets.UTF_8);
399         }
400         outBuffer.writeByte(0);
401         outBuffer.writeInt(rowCount); // Zero denotes "no limit" maybe for ReadStream!(Row)
402         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1);
403     }
404 
405     /**
406      * <p>
407      * The message gives the name of the prepared statement, the name of portal,
408      * and the values to use for any parameter values present in the prepared statement.
409      * The supplied parameter set must match those needed by the prepared statement.
410      * <p>
411      * The response is either {@link BindComplete} or {@link ErrorResponse}.
412      */
413     void writeBind(Bind bind, string portal, List!(Variant) paramValues) {
414         ensureBuffer();
415         int pos = outBuffer.writerIndex();
416         outBuffer.writeByte(BIND);
417         outBuffer.writeInt(0);
418         if (portal !is null) {
419             outBuffer.writeCharSequence(portal, StandardCharsets.UTF_8);
420         }
421         outBuffer.writeByte(0);
422         if (bind.statement == 0) {
423             outBuffer.writeByte(0);
424         } else {
425             outBuffer.writeLong(bind.statement);
426         }
427         int paramLen = paramValues.size();
428         outBuffer.writeShort(paramLen);
429         // Parameter formats
430         for (int c = 0;c < paramLen;c++) {
431             // for now each format is Binary
432             outBuffer.writeShort(bind.paramTypes[c].supportsBinary ? 1 : 0);
433         }
434         outBuffer.writeShort(paramLen);
435         for (int c = 0;c < paramLen;c++) {
436             Variant param = paramValues.get(c);
437             if (!param.hasValue() || param == null) {
438                 // NULL value
439                 outBuffer.writeInt(-1);
440             } else {
441                 DataTypeDesc dataType = bind.paramTypes[c];
442                 version(HUNT_DB_DEBUG_MORE) {
443                     tracef("dataType: %s, param: type=%s, value=%s", 
444                         dataType, param.type, param.toString());
445                 }
446 
447                 if (dataType.supportsBinary) {
448                     int idx = outBuffer.writerIndex();
449                     outBuffer.writeInt(0); 
450                     try {
451                         DataTypeCodec.encodeBinary(cast(DataType)dataType.id, param, outBuffer);
452                     } catch(Throwable ex) {
453                         version(HUNT_DB_DEBUG_MORE) warning(ex);
454                         else warning(ex.msg);
455                         throw new DatabaseException(ex.msg);
456                     }
457                     outBuffer.setInt(idx, outBuffer.writerIndex() - idx - 4);
458                 } else {
459                     DataTypeCodec.encodeText(cast(DataType)dataType.id, param, outBuffer);
460                 }
461             }
462         }
463 
464         // MAKE resultColumsn non null to avoid null check
465 
466         // Result columns are all in Binary format
467         if (bind.resultColumns.length > 0) {
468             outBuffer.writeShort(cast(int)bind.resultColumns.length);
469             foreach (PgColumnDesc resultColumn; bind.resultColumns) {
470                 outBuffer.writeShort(resultColumn.dataType.supportsBinary ? 1 : 0);
471             }
472         } else {
473             outBuffer.writeShort(1);
474             outBuffer.writeShort(1);
475         }
476         outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1);
477     }
478 
479     private void ensureBuffer() {
480         if (outBuffer is null) {
481             outBuffer = Unpooled.buffer(_bufferSize);
482         }
483     }
484 }