1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandCodec; 18 19 import hunt.database.driver.mysql.impl.codec.CommandType; 20 import hunt.database.driver.mysql.impl.codec.DataFormat; 21 import hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandBaseCodec; 22 import hunt.database.driver.mysql.impl.codec.MySQLEncoder; 23 import hunt.database.driver.mysql.impl.codec.MySQLRowDesc; 24 import hunt.database.driver.mysql.impl.codec.Packets; 25 import hunt.database.driver.mysql.impl.codec.QueryCommandBaseCodec; 26 import hunt.database.driver.mysql.impl.codec.RowResultDecoder; 27 28 import hunt.Exceptions; 29 import hunt.logging; 30 import hunt.net.buffer.ByteBuf; 31 import hunt.database.base.impl.command.ExtendedQueryCommand; 32 33 import hunt.database.driver.mysql.impl.codec.Packets; 34 35 /** 36 * 37 */ 38 class ExtendedQueryCommandCodec(R) : ExtendedQueryCommandBaseCodec!(R, ExtendedQueryCommand!(R)) { 39 40 this(ExtendedQueryCommand!(R) cmd) { 41 super(cmd); 42 if (cmd.fetch() > 0 && statement.isCursorOpen) { 43 // restore the state we need for decoding fetch response 44 columnDefinitions = statement.rowDesc.columnDefinitions(); 45 } 46 } 47 48 override 49 void encode(MySQLEncoder encoder) { 50 super.encode(encoder); 51 52 if (statement.isCursorOpen) { 53 decoder = new RowResultDecoder!(R)(false, statement.rowDesc); 54 sendStatementFetchCommand(statement.statementId, cmd.fetch()); 55 } else { 56 if (cmd.fetch() > 0) { 57 //TODO Cursor_type is READ_ONLY? 58 sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), 59 sendType, cmd.params(), cast(byte) 0x01); 60 } else { 61 // CURSOR_TYPE_NO_CURSOR 62 sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), 63 sendType, cmd.params(), cast(byte) 0x00); 64 } 65 } 66 } 67 68 override 69 void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) { 70 if (statement.isCursorOpen) { 71 int first = payload.getUnsignedByte(payload.readerIndex()); 72 if (first == Packets.ERROR_PACKET_HEADER) { 73 handleErrorPacketPayload(payload); 74 } else { 75 // decoding COM_STMT_FETCH response 76 handleRows(payload, payloadLength, &handleSingleRow); 77 } 78 } else { 79 // decoding COM_STMT_EXECUTE response 80 if (cmd.fetch() > 0) { 81 switch (commandHandlerState) { 82 case CommandHandlerState.INIT: 83 int first = payload.getUnsignedByte(payload.readerIndex()); 84 if (first == Packets.ERROR_PACKET_HEADER) { 85 handleErrorPacketPayload(payload); 86 } else { 87 handleResultsetColumnCountPacketBody(payload); 88 } 89 break; 90 case CommandHandlerState.HANDLING_COLUMN_DEFINITION: 91 handleResultsetColumnDefinitions(payload); 92 break; 93 case CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED: 94 // accept an EOF_Packet when DEPRECATE_EOF is not enabled 95 skipEofPacketIfNeeded(payload); 96 goto case; 97 98 case CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET: 99 handleResultsetColumnDefinitionsDecodingCompleted(); 100 // need to reset packet number so that we can send a fetch request 101 this.sequenceId = 0; 102 // send fetch after cursor opened 103 decoder = new RowResultDecoder!(R)(false, statement.rowDesc); 104 105 statement.isCursorOpen = true; 106 107 sendStatementFetchCommand(statement.statementId, cmd.fetch()); 108 break; 109 default: 110 throw new IllegalStateException("Unexpected state for decoding COM_STMT_EXECUTE response with cursor opening"); 111 } 112 } else { 113 super.decodePayload(payload, payloadLength, sequenceId); 114 } 115 } 116 } 117 118 private void sendStatementFetchCommand(long statementId, int count) { 119 ByteBuf packet = allocateBuffer(); 120 // encode packet header 121 int packetStartIdx = packet.writerIndex(); 122 packet.writeMediumLE(0); // will set payload length later by calculation 123 packet.writeByte(sequenceId); 124 125 // encode packet payload 126 packet.writeByte(CommandType.COM_STMT_FETCH); 127 packet.writeIntLE(cast(int) statementId); 128 packet.writeIntLE(count); 129 130 // set payload length 131 int lenOfPayload = packet.writerIndex() - packetStartIdx - 4; 132 packet.setMediumLE(packetStartIdx, lenOfPayload); 133 134 encoder.writeAndFlush(packet); 135 } 136 }