1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.driver.mysql.impl.codec.QueryCommandBaseCodec; 18 19 import hunt.database.driver.mysql.impl.codec.ColumnDefinition; 20 import hunt.database.driver.mysql.impl.codec.CommandCodec; 21 import hunt.database.driver.mysql.impl.codec.DataFormat; 22 import hunt.database.driver.mysql.impl.codec.MySQLRowDesc; 23 import hunt.database.driver.mysql.impl.codec.Packets; 24 import hunt.database.driver.mysql.impl.codec.RowResultDecoder; 25 26 import hunt.database.driver.mysql.MySQLClient; 27 import hunt.database.driver.mysql.impl.util.BufferUtils; 28 29 import hunt.database.base.Row; 30 import hunt.database.base.impl.RowDesc; 31 import hunt.database.base.impl.RowSetImpl; 32 import hunt.database.base.impl.command.CommandResponse; 33 import hunt.database.base.impl.command.QueryCommandBase; 34 35 import hunt.Functions; 36 import hunt.logging; 37 import hunt.net.buffer.ByteBuf; 38 import hunt.text.Charset; 39 40 import std.variant; 41 42 /** 43 * 44 */ 45 abstract class QueryCommandBaseCodec(T, C) : CommandCodec!(bool, C) { 46 // C extends QueryCommandBase!(T) 47 48 private DataFormat format; 49 50 protected CommandHandlerState commandHandlerState = CommandHandlerState.INIT; 51 protected ColumnDefinition[] columnDefinitions; 52 // protected RowResultDecoder<?, T> decoder; 53 protected RowResultDecoder!T decoder; 54 private int currentColumn; 55 56 this(C cmd, DataFormat format) { 57 super(cmd); 58 this.format = format; 59 } 60 61 // private static <A, T> T emptyResult(Collector!(Row, A, T) collector) { 62 // return collector.finisher().apply(collector.supplier().get()); 63 // } 64 65 override 66 void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) { 67 version(HUNT_DB_DEBUG_MORE) infof("commandHandlerState: %s", commandHandlerState); 68 switch (commandHandlerState) { 69 case CommandHandlerState.INIT: 70 handleInitPacket(payload); 71 break; 72 case CommandHandlerState.HANDLING_COLUMN_DEFINITION: 73 handleResultsetColumnDefinitions(payload); 74 break; 75 case CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED: 76 skipEofPacketIfNeeded(payload); 77 handleResultsetColumnDefinitionsDecodingCompleted(); 78 break; 79 case CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET: 80 handleRows(payload, payloadLength, &handleSingleRow); 81 break; 82 default: 83 warningf("Unhandled state: %d", commandHandlerState); 84 break; 85 } 86 } 87 88 protected abstract void handleInitPacket(ByteBuf payload); 89 90 protected void handleResultsetColumnCountPacketBody(ByteBuf payload) { 91 int columnCount = decodeColumnCountPacketPayload(payload); 92 commandHandlerState = CommandHandlerState.HANDLING_COLUMN_DEFINITION; 93 columnDefinitions = new ColumnDefinition[columnCount]; 94 } 95 96 protected void handleResultsetColumnDefinitions(ByteBuf payload) { 97 ColumnDefinition def = decodeColumnDefinitionPacketPayload(payload); 98 columnDefinitions[currentColumn++] = def; 99 if (currentColumn == columnDefinitions.length) { 100 // all column definitions have been decoded, switch to column definitions decoding completed state 101 if (isDeprecatingEofFlagEnabled()) { 102 // we enabled the DEPRECATED_EOF flag and don't need to accept an EOF_Packet 103 handleResultsetColumnDefinitionsDecodingCompleted(); 104 } else { 105 // we need to decode an EOF_Packet before handling rows, to be compatible with MySQL version below 5.7.5 106 commandHandlerState = CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED; 107 } 108 } 109 } 110 111 protected void handleResultsetColumnDefinitionsDecodingCompleted() { 112 commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET; 113 decoder = new RowResultDecoder!(T)(false/*cmd.isSingleton()*/, new MySQLRowDesc(columnDefinitions, format)); 114 } 115 116 protected void handleRows(ByteBuf payload, int payloadLength, Consumer!(ByteBuf) singleRowHandler) { 117 /* 118 Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff) 119 To ensure that packets beginning with 0xfe correspond to the ending packet (EOF_Packet or OK_Packet with a 0xFE header), 120 the packet length must be checked and must be less than 0xffffff in length. 121 */ 122 int first = payload.getUnsignedByte(payload.readerIndex()); 123 if (first == Packets.ERROR_PACKET_HEADER) { 124 handleErrorPacketPayload(payload); 125 } 126 // enabling CLIENT_DEPRECATE_EOF capability will receive an OK_Packet with a EOF_Packet header here 127 // we need check this is not a row data by checking packet length < 0xFFFFFF 128 else if (first == Packets.EOF_PACKET_HEADER && payloadLength < 0xFFFFFF) { 129 int serverStatusFlags; 130 int affectedRows = -1; 131 int lastInsertId = -1; 132 if (isDeprecatingEofFlagEnabled()) { 133 OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8); 134 serverStatusFlags = okPacket.serverStatusFlags(); 135 affectedRows = cast(int) okPacket.affectedRows(); 136 lastInsertId = cast(int) okPacket.lastInsertId(); 137 } else { 138 serverStatusFlags = decodeEofPacketPayload(payload).serverStatusFlags(); 139 } 140 handleSingleResultsetDecodingCompleted(serverStatusFlags, affectedRows, lastInsertId); 141 } else { 142 if(singleRowHandler !is null) { 143 singleRowHandler(payload); 144 } 145 } 146 } 147 148 protected void handleSingleRow(ByteBuf payload) { 149 // accept a row data 150 decoder.decodeRow(cast(int)columnDefinitions.length, payload); 151 } 152 153 protected void handleSingleResultsetDecodingCompleted(int serverStatusFlags, 154 int affectedRows, int lastInsertId) { 155 handleSingleResultsetEndPacket(serverStatusFlags, affectedRows, lastInsertId); 156 resetIntermediaryResult(); 157 if (isDecodingCompleted(serverStatusFlags)) { 158 // no more sql result 159 handleAllResultsetDecodingCompleted(); 160 } 161 } 162 163 protected bool isDecodingCompleted(int serverStatusFlags) { 164 return (serverStatusFlags & ServerStatusFlags.SERVER_MORE_RESULTS_EXISTS) == 0; 165 } 166 167 private void handleSingleResultsetEndPacket(int serverStatusFlags, int affectedRows, int lastInsertId) { 168 this.result = (serverStatusFlags & ServerStatusFlags.SERVER_STATUS_LAST_ROW_SENT) == 0; 169 T result; 170 int size; 171 RowDesc rowDesc; 172 if (decoder !is null) { 173 result = decoder.complete(); 174 rowDesc = decoder.rowDesc; 175 size = decoder.size(); 176 decoder.reset(); 177 } else { 178 result = new RowSetImpl(); 179 size = 0; 180 rowDesc = null; 181 } 182 version(HUNT_DB_DEBUG_MORE) infof("size=%d, affectedRows=%d", size, affectedRows); 183 184 // MySQL returns affected rows as 0 for SELECT query but Postgres returns queried amount 185 if(affectedRows == -1) 186 affectedRows = size; 187 188 cmd.resultHandler().handleResult(affectedRows, size, rowDesc, result); 189 Variant v = Variant(lastInsertId); 190 cmd.resultHandler().addProperty(MySQLClient.LAST_INSERTED_ID, v); 191 } 192 193 private void handleAllResultsetDecodingCompleted() { 194 CommandResponse!(bool) response; 195 if (this.failure !is null) { 196 response = failedResponse!bool(this.failure); 197 } else { 198 response = succeededResponse(this.result); 199 } 200 201 if(completionHandler !is null) { 202 completionHandler(response); 203 } 204 } 205 206 private int decodeColumnCountPacketPayload(ByteBuf payload) { 207 long columnCount = BufferUtils.readLengthEncodedInteger(payload); 208 return cast(int) columnCount; 209 } 210 211 private void resetIntermediaryResult() { 212 commandHandlerState = CommandHandlerState.INIT; 213 columnDefinitions = null; 214 currentColumn = 0; 215 } 216 217 } 218 219 enum CommandHandlerState { 220 INIT, 221 HANDLING_COLUMN_DEFINITION, 222 COLUMN_DEFINITIONS_DECODING_COMPLETED, 223 HANDLING_ROW_DATA_OR_END_PACKET 224 }