1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except inBuffer compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to inBuffer writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.driver.postgresql.impl.codec.PgDecoder;
19 
20 import hunt.database.driver.postgresql.impl.codec.Bind;
21 import hunt.database.driver.postgresql.impl.codec.DataFormat;
22 import hunt.database.driver.postgresql.impl.codec.DataType;
23 import hunt.database.driver.postgresql.impl.codec.DataTypeDesc;
24 import hunt.database.driver.postgresql.impl.codec.Describe;
25 import hunt.database.driver.postgresql.impl.codec.ErrorResponse;
26 import hunt.database.driver.postgresql.impl.codec.InitCommandCodec;
27 import hunt.database.driver.postgresql.impl.codec.NoticeResponse;
28 import hunt.database.driver.postgresql.impl.codec.PgColumnDesc;
29 import hunt.database.driver.postgresql.impl.codec.CommandCodec;
30 import hunt.database.driver.postgresql.impl.codec.PgParamDesc;
31 import hunt.database.driver.postgresql.impl.codec.PgProtocolConstants;
32 import hunt.database.driver.postgresql.impl.codec.PgRowDesc;
33 import hunt.database.driver.postgresql.impl.codec.Parse;
34 import hunt.database.driver.postgresql.impl.codec.PasswordMessage;
35 import hunt.database.driver.postgresql.impl.codec.QueryCommandBaseCodec;
36 import hunt.database.driver.postgresql.impl.codec.Response;
37 import hunt.database.driver.postgresql.impl.codec.StartupMessage;
38 import hunt.database.base.Util;
39 
40 import hunt.database.base.impl.Notification;
41 import hunt.database.base.impl.RowDecoder;
42 import hunt.database.base.impl.TxStatus;
43 
44 import hunt.collection.List;
45 import hunt.collection.Map;
46 import hunt.Exceptions;
47 import hunt.io.ByteBuffer;
48 import hunt.io.BufferUtils;
49 import hunt.io.channel;
50 import hunt.logging;
51 import hunt.net.codec.Decoder;
52 import hunt.net.Connection;
53 import hunt.net.buffer;
54 
55 import std.container.dlist;
56 import std.conv;
57 
58 
59 /**
60  *
61  * Decoder for <a href="https://www.postgresql.org/docs/9.5/static/protocol.html">PostgreSQL protocol</a>
62  *
63  * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
64  */
65 
66 class PgDecoder : Decoder {
67 
68     private ByteBufAllocator alloc;
69     private DList!(CommandCodecBase) *inflight;
70     private ByteBuf inBuffer;
71     private CommandCompleteProcessor processor;
72 
73     this(ref DList!(CommandCodecBase) inflight) {
74         this.inflight = &inflight;
75         processor = new CommandCompleteProcessor();
76         alloc = UnpooledByteBufAllocator.DEFAULT();
77     }
78 
79     // override
80     // void handlerAdded(ChannelHandlerContext ctx) {
81     //     alloc = ctx.alloc();
82     // }
83 
84     DataHandleStatus decode(ByteBuffer payload, Connection connection) {
85         DataHandleStatus resultStatus = DataHandleStatus.Done;
86         try {
87             resultStatus = doEecode(payload, connection);
88         } catch(Exception ex) {
89             BufferUtils.clear(payload);
90             warning(ex.msg);
91             version(HUNT_DEBUG) warning(ex);
92         }
93         return resultStatus;
94     }
95 
96     private DataHandleStatus doEecode(ByteBuffer msg, Connection connection) {
97         version(HUNT_DB_DEBUG_MORE) tracef("decoding buffer: %s", msg.toString());
98         DataHandleStatus resultStatus = DataHandleStatus.Done;
99 
100         ByteBuf buff = Unpooled.wrappedBuffer(msg);
101         if (inBuffer is null) {
102             inBuffer = buff;
103         } else {
104             CompositeByteBuf composite = cast(CompositeByteBuf) inBuffer;
105             if (composite is null) {
106                 composite = alloc.compositeBuffer();
107                 composite.addComponent(true, inBuffer);
108                 inBuffer = composite;
109             }
110             composite.addComponent(true, buff);
111         }
112 
113         while (true) {
114             int available = inBuffer.readableBytes();
115             if (available < 5) {
116                 break;
117             }
118             int beginIdx = inBuffer.readerIndex();
119             int length = inBuffer.getInt(beginIdx + 1);
120 
121             version(HUNT_DB_DEBUG_MORE) {
122                 tracef("beginIdx: %d, required: %d, available: %d", beginIdx, length, available);
123             }
124 
125             if (length + 1 > available) {
126                 version(HUNT_DB_DEBUG) {
127                     warningf("Waitting for more data: required: %d, available: %d", length, available);
128                 }
129                 break;
130             }
131             byte id = inBuffer.getByte(beginIdx);
132             int endIdx = beginIdx + length + 1;
133             int writerIndex = inBuffer.writerIndex();
134             try {
135                 inBuffer.setIndex(beginIdx + 5, endIdx);
136                 // version(HUNT_DB_DEBUG_MORE) infof("Protocol(Message type) id=%c", cast(char)id);
137                 switch (id) {
138                     case PgProtocolConstants.MESSAGE_TYPE_READY_FOR_QUERY: {
139                         decodeReadyForQuery(inBuffer);
140                         break;
141                     }
142                     case PgProtocolConstants.MESSAGE_TYPE_DATA_ROW: {
143                         decodeDataRow(inBuffer);
144                         break;
145                     }
146                     case PgProtocolConstants.MESSAGE_TYPE_COMMAND_COMPLETE: {
147                         decodeCommandComplete(inBuffer);
148                         break;
149                     }
150                     case PgProtocolConstants.MESSAGE_TYPE_BIND_COMPLETE: {
151                         decodeBindComplete();
152                         break;
153                     }
154                     default: {
155                         decodeMessage(id, inBuffer);
156                     }
157                 }
158             } catch(Throwable ex) {
159                 version(HUNT_DEBUG) {
160                     warning(ex);
161                 } else {
162                     warning(ex.msg);
163                 }
164             } finally {
165                 inBuffer.setIndex(endIdx, writerIndex);
166             }
167         }
168 
169         if (inBuffer !is null) {
170             if(inBuffer.isReadable()) {
171                 // copy the remainings in current buffer
172                 // version(HUNT_DB_DEBUG_MORE) {
173                 //     // infof("copying the remaings: %s", inBuffer.toString());
174                 //     // tracef("buffer: %s, remaings: %s", inBuffer.toString(), ByteBufUtil.hexDump(inBuffer));
175                 // }
176                 // inBuffer = inBuffer.copy();
177                 // version(HUNT_DB_DEBUG_MORE) infof("the remaings: %s", inBuffer.toString());
178             } else {
179                 // clear up the buffer
180                 inBuffer.release();
181                 inBuffer = null;
182             }
183         }
184 
185         return resultStatus;
186     }
187 
188     private void decodeMessage(byte id, ByteBuf inBuffer) {
189         switch (id) {
190             case PgProtocolConstants.MESSAGE_TYPE_ROW_DESCRIPTION: {
191                 decodeRowDescription(inBuffer);
192                 break;
193             }
194             case PgProtocolConstants.MESSAGE_TYPE_ERROR_RESPONSE: {
195                 decodeError(inBuffer);
196                 break;
197             }
198             case PgProtocolConstants.MESSAGE_TYPE_NOTICE_RESPONSE: {
199                 decodeNotice(inBuffer);
200                 break;
201             }
202             case PgProtocolConstants.MESSAGE_TYPE_AUTHENTICATION: {
203                 decodeAuthentication(inBuffer);
204                 break;
205             }
206             case PgProtocolConstants.MESSAGE_TYPE_EMPTY_QUERY_RESPONSE: {
207                 decodeEmptyQueryResponse();
208                 break;
209             }
210             case PgProtocolConstants.MESSAGE_TYPE_PARSE_COMPLETE: {
211                 decodeParseComplete();
212                 break;
213             }
214             case PgProtocolConstants.MESSAGE_TYPE_CLOSE_COMPLETE: {
215                 decodeCloseComplete();
216                 break;
217             }
218             case PgProtocolConstants.MESSAGE_TYPE_NO_DATA: {
219                 decodeNoData();
220                 break;
221             }
222             case PgProtocolConstants.MESSAGE_TYPE_PORTAL_SUSPENDED: {
223                 decodePortalSuspended();
224                 break;
225             }
226             case PgProtocolConstants.MESSAGE_TYPE_PARAMETER_DESCRIPTION: {
227                 decodeParameterDescription(inBuffer);
228                 break;
229             }
230             case PgProtocolConstants.MESSAGE_TYPE_PARAMETER_STATUS: {
231                 decodeParameterStatus(inBuffer);
232                 break;
233             }
234             case PgProtocolConstants.MESSAGE_TYPE_BACKEND_KEY_DATA: {
235                 decodeBackendKeyData(inBuffer);
236                 break;
237             }
238             case PgProtocolConstants.MESSAGE_TYPE_NOTIFICATION_RESPONSE: {
239                 decodeNotificationResponse(inBuffer);
240                 break;
241             }
242             default: {
243                 throw new UnsupportedOperationException();
244             }
245         }
246     }
247 
248     private void decodePortalSuspended() {
249         inflight.front().handlePortalSuspended();
250     }
251 
252     private void decodeCommandComplete(ByteBuf inBuffer) {
253         int updated = processor.parse(inBuffer);
254         inflight.front().handleCommandComplete(updated);
255     }
256 
257     private void decodeDataRow(ByteBuf inBuffer) {
258         CommandCodecBase codec = inflight.front();
259         version(HUNT_DB_DEBUG_MORE) tracef("decoding data row: %s", typeid(codec));
260         int len = inBuffer.readUnsignedShort();
261         codec.decodeRow(len, inBuffer);
262     }
263 
264     private void  decodeRowDescription(ByteBuf inBuffer) {
265         PgColumnDesc[] columns = new PgColumnDesc[inBuffer.readUnsignedShort()];
266         for (size_t c = 0; c < columns.length; ++c) {
267             string fieldName = Util.readCStringUTF8(inBuffer);
268             int tableOID = inBuffer.readInt();
269             short columnAttributeNumber = inBuffer.readShort();
270             int typeOID = inBuffer.readInt();
271             short typeSize = inBuffer.readShort();
272             int typeModifier = inBuffer.readInt();
273             int textOrBinary = inBuffer.readUnsignedShort(); // Useless for now
274             PgColumnDesc column = new PgColumnDesc(
275                 fieldName,
276                 tableOID,
277                 columnAttributeNumber,
278                 DataTypes.valueOf(typeOID),
279                 typeSize,
280                 typeModifier,
281                 cast(DataFormat)(textOrBinary)
282             );
283             columns[c] = column;
284         }
285         PgRowDesc rowDesc = new PgRowDesc(columns);
286         inflight.front().handleRowDescription(rowDesc);
287     }
288 
289     private enum byte I = 'I';
290     private enum byte T = 'T';
291 
292     private void decodeReadyForQuery(ByteBuf inBuffer) {
293         byte id = inBuffer.readByte();
294         TxStatus txStatus;
295         if (id == I) {
296             txStatus = TxStatus.IDLE;
297         } else if (id == T) {
298             txStatus = TxStatus.ACTIVE;
299         } else {
300             txStatus = TxStatus.FAILED;
301         }
302         inflight.front().handleReadyForQuery(txStatus);
303     }
304 
305     private void decodeError(ByteBuf inBuffer) {
306         ErrorResponse response = new ErrorResponse();
307         decodeErrorOrNotice(response, inBuffer);
308         inflight.front().handleErrorResponse(response);
309     }
310 
311     private void decodeNotice(ByteBuf inBuffer) {
312         NoticeResponse response = new NoticeResponse();
313         decodeErrorOrNotice(response, inBuffer);
314         inflight.front().handleNoticeResponse(response);
315     }
316 
317     private void decodeErrorOrNotice(Response response, ByteBuf inBuffer) {
318         byte type;
319         while ((type = inBuffer.readByte()) != 0) {
320             switch (type) {
321 
322                 case PgProtocolConstants.ERROR_OR_NOTICE_SEVERITY:
323                     response.setSeverity(Util.readCStringUTF8(inBuffer));
324                     break;
325 
326                 case PgProtocolConstants.ERROR_OR_NOTICE_CODE:
327                     response.setCode(Util.readCStringUTF8(inBuffer));
328                     break;
329 
330                 case PgProtocolConstants.ERROR_OR_NOTICE_MESSAGE:
331                     response.setMessage(Util.readCStringUTF8(inBuffer));
332                     break;
333 
334                 case PgProtocolConstants.ERROR_OR_NOTICE_DETAIL:
335                     response.setDetail(Util.readCStringUTF8(inBuffer));
336                     break;
337 
338                 case PgProtocolConstants.ERROR_OR_NOTICE_HINT:
339                     response.setHint(Util.readCStringUTF8(inBuffer));
340                     break;
341 
342                 case PgProtocolConstants.ERROR_OR_NOTICE_INTERNAL_POSITION:
343                     response.setInternalPosition(Util.readCStringUTF8(inBuffer));
344                     break;
345 
346                 case PgProtocolConstants.ERROR_OR_NOTICE_INTERNAL_QUERY:
347                     response.setInternalQuery(Util.readCStringUTF8(inBuffer));
348                     break;
349 
350                 case PgProtocolConstants.ERROR_OR_NOTICE_POSITION:
351                     response.setPosition(Util.readCStringUTF8(inBuffer));
352                     break;
353 
354                 case PgProtocolConstants.ERROR_OR_NOTICE_WHERE:
355                     response.setWhere(Util.readCStringUTF8(inBuffer));
356                     break;
357 
358                 case PgProtocolConstants.ERROR_OR_NOTICE_FILE:
359                     response.setFile(Util.readCStringUTF8(inBuffer));
360                     break;
361 
362                 case PgProtocolConstants.ERROR_OR_NOTICE_LINE:
363                     response.setLine(Util.readCStringUTF8(inBuffer));
364                     break;
365 
366                 case PgProtocolConstants.ERROR_OR_NOTICE_ROUTINE:
367                     response.setRoutine(Util.readCStringUTF8(inBuffer));
368                     break;
369 
370                 case PgProtocolConstants.ERROR_OR_NOTICE_SCHEMA:
371                     response.setSchema(Util.readCStringUTF8(inBuffer));
372                     break;
373 
374                 case PgProtocolConstants.ERROR_OR_NOTICE_TABLE:
375                     response.setTable(Util.readCStringUTF8(inBuffer));
376                     break;
377 
378                 case PgProtocolConstants.ERROR_OR_NOTICE_COLUMN:
379                     response.setColumn(Util.readCStringUTF8(inBuffer));
380                     break;
381 
382                 case PgProtocolConstants.ERROR_OR_NOTICE_DATA_TYPE:
383                     response.setDataType(Util.readCStringUTF8(inBuffer));
384                     break;
385 
386                 case PgProtocolConstants.ERROR_OR_NOTICE_CONSTRAINT:
387                     response.setConstraint(Util.readCStringUTF8(inBuffer));
388                     break;
389 
390                 default:
391                     Util.readCStringUTF8(inBuffer);
392                     break;
393             }
394         }
395     }
396 
397     private void decodeAuthentication(ByteBuf inBuffer) {
398 
399         if(inflight.empty()) {
400             warning("inflight is empty");
401             return;
402         }
403 
404         int type = inBuffer.readInt();
405         version(HUNT_DB_DEBUG_MORE) tracef("type=%d", type);
406 
407         CommandCodecBase cmdCodec = inflight.front();
408 
409         switch (type) {
410             case PgProtocolConstants.AUTHENTICATION_TYPE_OK: {
411                 cmdCodec.handleAuthenticationOk();
412             }
413             break;
414             case PgProtocolConstants.AUTHENTICATION_TYPE_MD5_PASSWORD: {
415                 byte[] salt = new byte[4];
416                 inBuffer.readBytes(salt);
417                 cmdCodec.handleAuthenticationMD5Password(salt);
418             }
419             break;
420             case PgProtocolConstants.AUTHENTICATION_TYPE_CLEARTEXT_PASSWORD: {
421                 cmdCodec.handleAuthenticationClearTextPassword();
422             }
423             break;
424             case PgProtocolConstants.AUTHENTICATION_TYPE_KERBEROS_V5:
425             case PgProtocolConstants.AUTHENTICATION_TYPE_SCM_CREDENTIAL:
426             case PgProtocolConstants.AUTHENTICATION_TYPE_GSS:
427             case PgProtocolConstants.AUTHENTICATION_TYPE_GSS_CONTINUE:
428             case PgProtocolConstants.AUTHENTICATION_TYPE_SSPI:
429             default:
430                 throw new UnsupportedOperationException("Authentication type " ~ 
431                     type.to!string() ~ " is not supported inBuffer the client");
432         }
433     }
434 
435     private void decodeParseComplete() {
436         inflight.front().handleParseComplete();
437     }
438 
439     private void decodeBindComplete() {
440         inflight.front().handleBindComplete();
441     }
442 
443     private void decodeCloseComplete() {
444         inflight.front().handleCloseComplete();
445     }
446 
447     private void decodeNoData() {
448         inflight.front().handleNoData();
449     }
450 
451     private void decodeParameterDescription(ByteBuf inBuffer) {
452         DataTypeDesc[] paramDataTypes = new DataTypeDesc[inBuffer.readUnsignedShort()];
453         for (int c = 0; c < paramDataTypes.length; ++c) {
454             paramDataTypes[c] = DataTypes.valueOf(inBuffer.readInt());
455         }
456         inflight.front().handleParameterDescription(new PgParamDesc(paramDataTypes));
457     }
458 
459     private void decodeParameterStatus(ByteBuf inBuffer) {
460         string key = Util.readCStringUTF8(inBuffer);
461         string value = Util.readCStringUTF8(inBuffer);
462         inflight.front().handleParameterStatus(key, value);
463     }
464 
465     private void decodeEmptyQueryResponse() {
466         inflight.front().handleEmptyQueryResponse();
467     }
468 
469     private void decodeBackendKeyData(ByteBuf inBuffer) {
470         int processId = inBuffer.readInt();
471         int secretKey = inBuffer.readInt();
472         inflight.front().handleBackendKeyData(processId, secretKey);
473     }
474 
475     private void decodeNotificationResponse(ByteBuf inBuffer) { // ChannelHandlerContext ctx, 
476         implementationMissing(false);
477         // ctx.fireChannelRead(new Notification(inBuffer.readInt(), Util.readCStringUTF8(inBuffer), Util.readCStringUTF8(inBuffer)));
478     }
479 }
480 
481 
482 
483 static class CommandCompleteProcessor : ByteProcessor {
484     private enum byte SPACE = 32;
485     private int rows;
486     bool afterSpace;
487 
488     int parse(ByteBuf inBuffer) {
489         afterSpace = false;
490         rows = 0;
491         inBuffer.forEachByte(inBuffer.readerIndex(), inBuffer.readableBytes() - 1, this);
492         return rows;
493     }
494 
495     override
496     bool process(byte value) {
497         bool space = value == SPACE;
498         if (afterSpace) {
499             if (space) {
500                 rows = 0;
501             } else {
502                 rows = rows * 10 + (value - '0');
503             }
504         } else {
505             afterSpace = space;
506         }
507         return true;
508     }
509 }