1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.driver.mysql.impl.codec.QueryCommandBaseCodec;
18 
19 import hunt.database.driver.mysql.impl.codec.ColumnDefinition;
20 import hunt.database.driver.mysql.impl.codec.CommandCodec;
21 import hunt.database.driver.mysql.impl.codec.DataFormat;
22 import hunt.database.driver.mysql.impl.codec.MySQLRowDesc;
23 import hunt.database.driver.mysql.impl.codec.Packets;
24 import hunt.database.driver.mysql.impl.codec.RowResultDecoder;
25 
26 import hunt.database.driver.mysql.MySQLClient;
27 import hunt.database.driver.mysql.impl.util.BufferUtils;
28 
29 import hunt.database.base.Row;
30 import hunt.database.base.impl.RowDesc;
31 import hunt.database.base.impl.RowSetImpl;
32 import hunt.database.base.impl.command.CommandResponse;
33 import hunt.database.base.impl.command.QueryCommandBase;
34 
35 import hunt.Functions;
36 import hunt.logging;
37 import hunt.net.buffer.ByteBuf;
38 import hunt.text.Charset;
39 
40 import std.variant;
41 
42 /**
43  * 
44  */
45 abstract class QueryCommandBaseCodec(T, C) : CommandCodec!(bool, C) {
46     // C extends QueryCommandBase!(T)
47 
48     private DataFormat format;
49 
50     protected CommandHandlerState commandHandlerState = CommandHandlerState.INIT;
51     protected ColumnDefinition[] columnDefinitions;
52     // protected RowResultDecoder<?, T> decoder;
53     protected RowResultDecoder!T decoder;
54     private int currentColumn;
55 
56     this(C cmd, DataFormat format) {
57         super(cmd);
58         this.format = format;
59     }
60 
61     // private static <A, T> T emptyResult(Collector!(Row, A, T) collector) {
62     //     return collector.finisher().apply(collector.supplier().get());
63     // }
64 
65     override
66     void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
67         version(HUNT_DB_DEBUG_MORE) infof("commandHandlerState: %s", commandHandlerState);
68         switch (commandHandlerState) {
69             case CommandHandlerState.INIT:
70                 handleInitPacket(payload);
71                 break;
72             case CommandHandlerState.HANDLING_COLUMN_DEFINITION:
73                 handleResultsetColumnDefinitions(payload);
74                 break;
75             case CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED:
76                 skipEofPacketIfNeeded(payload);
77                 handleResultsetColumnDefinitionsDecodingCompleted();
78                 break;
79             case CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET:
80                 handleRows(payload, payloadLength, &handleSingleRow);
81                 break;
82             default:
83                 warningf("Unhandled state: %d", commandHandlerState);
84                 break;
85         }
86     }
87 
88     protected abstract void handleInitPacket(ByteBuf payload);
89 
90     protected void handleResultsetColumnCountPacketBody(ByteBuf payload) {
91         int columnCount = decodeColumnCountPacketPayload(payload);
92         commandHandlerState = CommandHandlerState.HANDLING_COLUMN_DEFINITION;
93         columnDefinitions = new ColumnDefinition[columnCount];
94     }
95 
96     protected void handleResultsetColumnDefinitions(ByteBuf payload) {
97         ColumnDefinition def = decodeColumnDefinitionPacketPayload(payload);
98         columnDefinitions[currentColumn++] = def;
99         if (currentColumn == columnDefinitions.length) {
100             // all column definitions have been decoded, switch to column definitions decoding completed state
101             if (isDeprecatingEofFlagEnabled()) {
102                 // we enabled the DEPRECATED_EOF flag and don't need to accept an EOF_Packet
103                 handleResultsetColumnDefinitionsDecodingCompleted();
104             } else {
105                 // we need to decode an EOF_Packet before handling rows, to be compatible with MySQL version below 5.7.5
106                 commandHandlerState = CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED;
107             }
108         }
109     }
110 
111     protected void handleResultsetColumnDefinitionsDecodingCompleted() {
112         commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET;
113         decoder = new RowResultDecoder!(T)(false/*cmd.isSingleton()*/, new MySQLRowDesc(columnDefinitions, format));
114     }
115 
116     protected void handleRows(ByteBuf payload, int payloadLength, Consumer!(ByteBuf) singleRowHandler) {
117     /*
118         Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff)
119         To ensure that packets beginning with 0xfe correspond to the ending packet (EOF_Packet or OK_Packet with a 0xFE header),
120         the packet length must be checked and must be less than 0xffffff in length.
121      */
122         int first = payload.getUnsignedByte(payload.readerIndex());
123         if (first == Packets.ERROR_PACKET_HEADER) {
124             handleErrorPacketPayload(payload);
125         }
126         // enabling CLIENT_DEPRECATE_EOF capability will receive an OK_Packet with a EOF_Packet header here
127         // we need check this is not a row data by checking packet length < 0xFFFFFF
128         else if (first == Packets.EOF_PACKET_HEADER && payloadLength < 0xFFFFFF) {
129             int serverStatusFlags;
130             int affectedRows = -1;
131             int lastInsertId = -1;
132             if (isDeprecatingEofFlagEnabled()) {
133                 OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8);
134                 serverStatusFlags = okPacket.serverStatusFlags();
135                 affectedRows = cast(int) okPacket.affectedRows();
136                 lastInsertId = cast(int) okPacket.lastInsertId();
137             } else {
138                 serverStatusFlags = decodeEofPacketPayload(payload).serverStatusFlags();
139             }
140             handleSingleResultsetDecodingCompleted(serverStatusFlags, affectedRows, lastInsertId);
141         } else {
142             if(singleRowHandler !is null) {
143                 singleRowHandler(payload);
144             }
145         }
146     }
147 
148     protected void handleSingleRow(ByteBuf payload) {
149         // accept a row data
150         decoder.decodeRow(cast(int)columnDefinitions.length, payload);
151     }
152 
153     protected void handleSingleResultsetDecodingCompleted(int serverStatusFlags, 
154             int affectedRows, int lastInsertId) {
155         handleSingleResultsetEndPacket(serverStatusFlags, affectedRows, lastInsertId);
156         resetIntermediaryResult();
157         if (isDecodingCompleted(serverStatusFlags)) {
158             // no more sql result
159             handleAllResultsetDecodingCompleted();
160         }
161     }
162 
163     protected bool isDecodingCompleted(int serverStatusFlags) {
164         return (serverStatusFlags & ServerStatusFlags.SERVER_MORE_RESULTS_EXISTS) == 0;
165     }
166 
167     private void handleSingleResultsetEndPacket(int serverStatusFlags, int affectedRows, int lastInsertId) {
168         this.result = (serverStatusFlags & ServerStatusFlags.SERVER_STATUS_LAST_ROW_SENT) == 0;
169         T result;
170         int size;
171         RowDesc rowDesc;
172         if (decoder !is null) {
173             result = decoder.complete();
174             rowDesc = decoder.rowDesc;
175             size = decoder.size();
176             decoder.reset();
177         } else {
178             result = new RowSetImpl(); 
179             size = 0;
180             rowDesc = null;
181         }
182         version(HUNT_DB_DEBUG_MORE) infof("size=%d, affectedRows=%d", size, affectedRows);
183 
184         // MySQL returns affected rows as 0 for SELECT query but Postgres returns queried amount
185         if(affectedRows == -1)
186             affectedRows = size;
187         
188         cmd.resultHandler().handleResult(affectedRows, size, rowDesc, result);
189         Variant v = Variant(lastInsertId);
190         cmd.resultHandler().addProperty(MySQLClient.LAST_INSERTED_ID, v);
191     }
192 
193     private void handleAllResultsetDecodingCompleted() {
194         CommandResponse!(bool) response;
195         if (this.failure !is null) {
196             response = failedResponse!bool(this.failure);
197         } else {
198             response = succeededResponse(this.result);
199         }
200 
201         if(completionHandler !is null) {
202             completionHandler(response);
203         }
204     }
205 
206     private int decodeColumnCountPacketPayload(ByteBuf payload) {
207         long columnCount = BufferUtils.readLengthEncodedInteger(payload);
208         return cast(int) columnCount;
209     }
210 
211     private void resetIntermediaryResult() {
212         commandHandlerState = CommandHandlerState.INIT;
213         columnDefinitions = null;
214         currentColumn = 0;
215     }
216 
217 }
218 
219 enum CommandHandlerState {
220     INIT,
221     HANDLING_COLUMN_DEFINITION,
222     COLUMN_DEFINITIONS_DECODING_COMPLETED,
223     HANDLING_ROW_DATA_OR_END_PACKET
224 }