1 module hunt.database.driver.mysql.impl.codec.MySQLDecoder; 2 3 import hunt.database.driver.mysql.impl.codec.CommandCodec; 4 import hunt.database.driver.mysql.impl.codec.MySQLEncoder; 5 import hunt.database.driver.mysql.impl.codec.Packets; 6 7 import hunt.database.base.impl.Notification; 8 import hunt.database.base.impl.RowDecoder; 9 import hunt.database.base.impl.TxStatus; 10 11 12 import hunt.collection.List; 13 import hunt.collection.Map; 14 import hunt.Exceptions; 15 import hunt.io.ByteBuffer; 16 import hunt.io.BufferUtils; 17 import hunt.io.channel; 18 import hunt.logging; 19 import hunt.net.codec.Decoder; 20 import hunt.net.Connection; 21 import hunt.net.buffer; 22 23 import std.container.dlist; 24 import std.conv; 25 26 class MySQLDecoder : Decoder { 27 28 private ByteBufAllocator alloc; 29 private DList!(CommandCodecBase) *inflight; 30 private ByteBuf inBuffer; 31 private MySQLEncoder encoder; 32 33 private CompositeByteBuf aggregatedPacketPayload = null; 34 35 this(ref DList!(CommandCodecBase) inflight, MySQLEncoder encoder) { 36 this.inflight = &inflight; 37 this.encoder = encoder; 38 alloc = UnpooledByteBufAllocator.DEFAULT(); 39 } 40 41 DataHandleStatus decode(ByteBuffer payload, Connection connection) { 42 DataHandleStatus resultStatus = DataHandleStatus.Done; 43 try { 44 resultStatus = doEecode(payload, connection); 45 } catch(Exception ex) { 46 BufferUtils.clear(payload); 47 version(HUNT_DEBUG) warning(ex); 48 else warning(ex.msg); 49 } 50 51 return resultStatus; 52 } 53 54 private DataHandleStatus doEecode(ByteBuffer payload, Connection connection) { 55 version(HUNT_DB_DEBUG_MORE) tracef("decoding buffer: %s", payload.toString()); 56 DataHandleStatus resultStatus = DataHandleStatus.Done; 57 58 ByteBuf buff = Unpooled.wrappedBuffer(payload); 59 if (inBuffer is null) { 60 inBuffer = buff; 61 } else { 62 CompositeByteBuf composite = cast(CompositeByteBuf) inBuffer; 63 if (composite is null) { 64 composite = alloc.compositeBuffer(); 65 composite.addComponent(true, inBuffer); 66 inBuffer = composite; 67 } 68 composite.addComponent(true, buff); 69 } 70 71 while (true) { 72 int available = inBuffer.readableBytes(); 73 if (available < 4) { // no enough bytes available in buffer 74 break; 75 } 76 77 int packetStartIdx = inBuffer.readerIndex(); 78 int payloadLength = inBuffer.readUnsignedMediumLE(); 79 int sequenceId = inBuffer.readUnsignedByte(); 80 81 if (payloadLength >= Packets.PACKET_PAYLOAD_LENGTH_LIMIT && aggregatedPacketPayload is null) { 82 aggregatedPacketPayload = Unpooled.compositeBuffer(); 83 } 84 85 // payload 86 if (inBuffer.readableBytes() >= payloadLength) { 87 if (aggregatedPacketPayload !is null) { 88 // read a split packet 89 aggregatedPacketPayload.addComponent(true, inBuffer.readRetainedSlice(payloadLength)); 90 sequenceId++; 91 92 if (payloadLength < Packets.PACKET_PAYLOAD_LENGTH_LIMIT) { 93 // we have just read the last split packet and there will be no more split packet 94 decodePayload(aggregatedPacketPayload, aggregatedPacketPayload.readableBytes(), sequenceId); 95 aggregatedPacketPayload.release(); 96 aggregatedPacketPayload = null; 97 } 98 } else { 99 // read a non-split packet 100 decodePayload(inBuffer.readSlice(payloadLength), payloadLength, sequenceId); 101 } 102 } else { 103 inBuffer.readerIndex(packetStartIdx); 104 // Need more bytes, so buffer them first. 105 version(HUNT_DB_DEBUG_MORE) warning("Remaining: ", inBuffer.toString()); 106 break; 107 } 108 109 } 110 111 if (inBuffer !is null) { 112 if(inBuffer.isReadable()) { 113 // copy the remainings in current buffer 114 version(HUNT_DB_DEBUG_MORE) warningf("copying the remaings: %s", inBuffer.toString()); 115 inBuffer = inBuffer.copy(); 116 } else { 117 // clear up the buffer 118 inBuffer.release(); 119 inBuffer = null; 120 } 121 } 122 123 return resultStatus; 124 } 125 126 private void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) { 127 // if(inflight.empty()) { 128 // warning("inflight is empty."); 129 // return; 130 // } 131 while(inflight.empty()) { 132 version(HUNT_DB_DEBUG_MORE) warning("inflight is empty."); 133 } 134 CommandCodecBase ctx = inflight.front(); 135 version(HUNT_DB_DEBUG_MORE) { 136 tracef("Command codec: %s", typeid(ctx)); 137 // tracef("length: %d, payload: %s", payloadLength, payload.toString()); 138 } 139 ctx.sequenceId = sequenceId + 1; 140 ctx.decodePayload(payload, payloadLength, sequenceId); 141 payload.clear(); 142 } 143 }