1 module hunt.database.driver.mysql.impl.codec.MySQLDecoder;
2 
3 import hunt.database.driver.mysql.impl.codec.CommandCodec;
4 import hunt.database.driver.mysql.impl.codec.MySQLEncoder;
5 import hunt.database.driver.mysql.impl.codec.Packets;
6 
7 import hunt.database.base.impl.Notification;
8 import hunt.database.base.impl.RowDecoder;
9 import hunt.database.base.impl.TxStatus;
10 
11 
12 import hunt.collection.List;
13 import hunt.collection.Map;
14 import hunt.Exceptions;
15 import hunt.io.ByteBuffer;
16 import hunt.io.BufferUtils;
17 import hunt.io.channel;
18 import hunt.logging;
19 import hunt.net.codec.Decoder;
20 import hunt.net.Connection;
21 import hunt.net.buffer;
22 
23 import std.container.dlist;
24 import std.conv;
25 
26 class MySQLDecoder : Decoder {
27 
28     private ByteBufAllocator alloc;
29     private DList!(CommandCodecBase) *inflight;
30     private ByteBuf inBuffer;
31     private MySQLEncoder encoder;
32 
33     private CompositeByteBuf aggregatedPacketPayload = null;
34 
35     this(ref DList!(CommandCodecBase) inflight, MySQLEncoder encoder) {
36         this.inflight = &inflight;
37         this.encoder = encoder;
38         alloc = UnpooledByteBufAllocator.DEFAULT();
39     }
40 
41     DataHandleStatus decode(ByteBuffer payload, Connection connection) {
42         DataHandleStatus resultStatus = DataHandleStatus.Done;
43         try {
44             resultStatus = doEecode(payload, connection);
45         } catch(Exception ex) {
46             BufferUtils.clear(payload);
47             version(HUNT_DEBUG) warning(ex);
48             else warning(ex.msg);
49         }
50 
51         return resultStatus;
52     }
53 
54     private DataHandleStatus doEecode(ByteBuffer payload, Connection connection) {
55         version(HUNT_DB_DEBUG_MORE) tracef("decoding buffer: %s", payload.toString());
56         DataHandleStatus resultStatus = DataHandleStatus.Done;
57 
58         ByteBuf buff = Unpooled.wrappedBuffer(payload);
59         if (inBuffer is null) {
60             inBuffer = buff;
61         } else {
62             CompositeByteBuf composite = cast(CompositeByteBuf) inBuffer;
63             if (composite is null) {
64                 composite = alloc.compositeBuffer();
65                 composite.addComponent(true, inBuffer);
66                 inBuffer = composite;
67             }
68             composite.addComponent(true, buff);
69         }
70 
71         while (true) {
72             int available = inBuffer.readableBytes();
73             if (available < 4) { // no enough bytes available in buffer
74                 break;
75             }
76 
77             int packetStartIdx = inBuffer.readerIndex();
78             int payloadLength = inBuffer.readUnsignedMediumLE();
79             int sequenceId = inBuffer.readUnsignedByte();
80 
81             if (payloadLength >= Packets.PACKET_PAYLOAD_LENGTH_LIMIT && aggregatedPacketPayload is null) {
82                 aggregatedPacketPayload = Unpooled.compositeBuffer();
83             }
84 
85             // payload
86             if (inBuffer.readableBytes() >= payloadLength) {
87                 if (aggregatedPacketPayload !is null) {
88                     // read a split packet
89                     aggregatedPacketPayload.addComponent(true, inBuffer.readRetainedSlice(payloadLength));
90                     sequenceId++;
91 
92                     if (payloadLength < Packets.PACKET_PAYLOAD_LENGTH_LIMIT) {
93                         // we have just read the last split packet and there will be no more split packet
94                         decodePayload(aggregatedPacketPayload, aggregatedPacketPayload.readableBytes(), sequenceId);
95                         aggregatedPacketPayload.release();
96                         aggregatedPacketPayload = null;
97                     }
98                 } else {
99                     // read a non-split packet
100                     decodePayload(inBuffer.readSlice(payloadLength), payloadLength, sequenceId);
101                 }
102             } else {
103                 inBuffer.readerIndex(packetStartIdx);
104                 // Need more bytes, so buffer them first.
105                 version(HUNT_DB_DEBUG_MORE) warning("Remaining: ", inBuffer.toString());
106                 break;
107             }
108 
109         }
110 
111         if (inBuffer !is null) {
112             if(inBuffer.isReadable()) {
113                 // copy the remainings in current buffer
114                 version(HUNT_DB_DEBUG_MORE) warningf("copying the remaings: %s", inBuffer.toString());
115                 inBuffer = inBuffer.copy();
116             } else {
117                 // clear up the buffer
118                 inBuffer.release();
119                 inBuffer = null;
120             }
121         }
122 
123         return resultStatus;          
124     }
125 
126     private void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
127         // if(inflight.empty()) {
128         //      warning("inflight is empty.");
129         //      return;
130         // }
131         while(inflight.empty()) {
132             version(HUNT_DB_DEBUG_MORE) warning("inflight is empty.");
133         }
134         CommandCodecBase ctx = inflight.front();
135         version(HUNT_DB_DEBUG_MORE) {
136             tracef("Command codec: %s", typeid(ctx));
137             // tracef("length: %d, payload: %s", payloadLength, payload.toString());
138         }
139         ctx.sequenceId = sequenceId + 1;
140         ctx.decodePayload(payload, payloadLength, sequenceId);
141         payload.clear();
142     }
143 }