1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandCodec;
18 
19 import hunt.database.driver.mysql.impl.codec.CommandType;
20 import hunt.database.driver.mysql.impl.codec.DataFormat;
21 import hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandBaseCodec;
22 import hunt.database.driver.mysql.impl.codec.MySQLEncoder;
23 import hunt.database.driver.mysql.impl.codec.MySQLRowDesc;
24 import hunt.database.driver.mysql.impl.codec.Packets;
25 import hunt.database.driver.mysql.impl.codec.QueryCommandBaseCodec;
26 import hunt.database.driver.mysql.impl.codec.RowResultDecoder;
27 
28 import hunt.Exceptions;
29 import hunt.logging;
30 import hunt.net.buffer.ByteBuf;
31 import hunt.database.base.impl.command.ExtendedQueryCommand;
32 
33 import hunt.database.driver.mysql.impl.codec.Packets;
34 
35 /**
36  * 
37  */
38 class ExtendedQueryCommandCodec(R) : ExtendedQueryCommandBaseCodec!(R, ExtendedQueryCommand!(R)) {
39 
40 	this(ExtendedQueryCommand!(R) cmd) {
41 		super(cmd);
42 		if (cmd.fetch() > 0 && statement.isCursorOpen) {
43 			// restore the state we need for decoding fetch response
44 			columnDefinitions = statement.rowDesc.columnDefinitions();
45 		}
46 	}
47 
48 	override
49 	void encode(MySQLEncoder encoder) {
50 		super.encode(encoder);
51 
52 		if (statement.isCursorOpen) {
53 			decoder = new RowResultDecoder!(R)(false, statement.rowDesc);
54 			sendStatementFetchCommand(statement.statementId, cmd.fetch());
55 		} else {
56 			if (cmd.fetch() > 0) {
57 				//TODO Cursor_type is READ_ONLY?
58 				sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), 
59 					sendType, cmd.params(), cast(byte) 0x01);
60 			} else {
61 				// CURSOR_TYPE_NO_CURSOR
62 				sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), 
63 					sendType, cmd.params(), cast(byte) 0x00);
64 			}
65 		}
66 	}
67 
68 	override
69 	void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
70 		if (statement.isCursorOpen) {
71 			int first = payload.getUnsignedByte(payload.readerIndex());
72 			if (first == Packets.ERROR_PACKET_HEADER) {
73 				handleErrorPacketPayload(payload);
74 			} else {
75 				// decoding COM_STMT_FETCH response
76 				handleRows(payload, payloadLength, &handleSingleRow);
77 			}
78 		} else {
79 			// decoding COM_STMT_EXECUTE response
80 			if (cmd.fetch() > 0) {
81 				switch (commandHandlerState) {
82 					case CommandHandlerState.INIT:
83 						int first = payload.getUnsignedByte(payload.readerIndex());
84 						if (first == Packets.ERROR_PACKET_HEADER) {
85 							handleErrorPacketPayload(payload);
86 						} else {
87 							handleResultsetColumnCountPacketBody(payload);
88 						}
89 						break;
90 					case CommandHandlerState.HANDLING_COLUMN_DEFINITION:
91 						handleResultsetColumnDefinitions(payload);
92 						break;
93 					case CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED:
94 						// accept an EOF_Packet when DEPRECATE_EOF is not enabled
95 						skipEofPacketIfNeeded(payload);
96 						goto case;
97 
98 					case CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET:
99 						handleResultsetColumnDefinitionsDecodingCompleted();
100 						// need to reset packet number so that we can send a fetch request
101 						this.sequenceId = 0;
102 						// send fetch after cursor opened
103 						decoder = new RowResultDecoder!(R)(false, statement.rowDesc);
104 
105 						statement.isCursorOpen = true;
106 
107 						sendStatementFetchCommand(statement.statementId, cmd.fetch());
108 						break;
109 					default:
110 						throw new IllegalStateException("Unexpected state for decoding COM_STMT_EXECUTE response with cursor opening");
111 				}
112 			} else {
113 				super.decodePayload(payload, payloadLength, sequenceId);
114 			}
115 		}
116 	}
117 
118 	private void sendStatementFetchCommand(long statementId, int count) {
119 		ByteBuf packet = allocateBuffer();
120 		// encode packet header
121 		int packetStartIdx = packet.writerIndex();
122 		packet.writeMediumLE(0); // will set payload length later by calculation
123 		packet.writeByte(sequenceId);
124 
125 		// encode packet payload
126 		packet.writeByte(CommandType.COM_STMT_FETCH);
127 		packet.writeIntLE(cast(int) statementId);
128 		packet.writeIntLE(count);
129 
130 		// set payload length
131 		int lenOfPayload = packet.writerIndex() - packetStartIdx - 4;
132 		packet.setMediumLE(packetStartIdx, lenOfPayload);
133 
134 		encoder.writeAndFlush(packet);
135 	}
136 }