1 module hunt.database.driver.mysql.impl.codec.MySQLEncoder; 2 3 import hunt.database.driver.mysql.impl.codec.CapabilitiesFlag; 4 import hunt.database.driver.mysql.impl.codec.CloseConnectionCommandCodec; 5 import hunt.database.driver.mysql.impl.codec.CloseStatementCommandCodec; 6 import hunt.database.driver.mysql.impl.codec.CommandCodec; 7 import hunt.database.driver.mysql.impl.codec.ExtendedBatchQueryCommandCodec; 8 import hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandCodec; 9 import hunt.database.driver.mysql.impl.codec.InitCommandCodec; 10 import hunt.database.driver.mysql.impl.codec.PrepareStatementCodec; 11 import hunt.database.driver.mysql.impl.codec.ResetStatementCommandCodec; 12 import hunt.database.driver.mysql.impl.codec.SimpleQueryCommandCodec; 13 14 import hunt.database.driver.mysql.impl.command.ChangeUserCommand; 15 import hunt.database.driver.mysql.impl.command.DebugCommand; 16 import hunt.database.driver.mysql.impl.command.InitDbCommand; 17 import hunt.database.driver.mysql.impl.command.PingCommand; 18 import hunt.database.driver.mysql.impl.command.ResetConnectionCommand; 19 import hunt.database.driver.mysql.impl.command.SetOptionCommand; 20 import hunt.database.driver.mysql.impl.command.StatisticsCommand; 21 22 import hunt.database.base.AsyncResult; 23 import hunt.database.base.Exceptions; 24 import hunt.database.base.impl.Connection; 25 import hunt.database.base.impl.ParamDesc; 26 import hunt.database.base.impl.RowDesc; 27 import hunt.database.base.impl.TxStatus; 28 import hunt.database.base.impl.command; 29 import hunt.database.base.RowSet; 30 31 import hunt.collection.ArrayDeque; 32 import hunt.io.ByteBuffer; 33 import hunt.collection.List; 34 import hunt.collection.Map; 35 import hunt.Exceptions; 36 import hunt.logging; 37 import hunt.net.buffer; 38 import hunt.net.codec.Encoder; 39 import hunt.net.Connection; 40 import hunt.text.Charset; 41 42 import std.container.dlist; 43 import std.range; 44 import std.variant; 45 46 /** 47 * 48 */ 49 class MySQLEncoder : EncoderChain { 50 51 // private final ArrayDeque<CommandCodec<?, ?>> inflight; 52 private DList!(CommandCodecBase) *inflight; 53 Connection ctx; 54 55 int clientCapabilitiesFlag = 0x00000000; 56 Charset charset; 57 58 this(ref DList!(CommandCodecBase) inflight) { 59 this.inflight = &inflight; 60 this.charset = StandardCharsets.UTF_8; 61 initSupportedCapabilitiesFlags(); 62 } 63 64 // override 65 // void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 66 // if (msg instanceof CommandBase<?>) { 67 // CommandBase<?> cmd = (CommandBase<?>) msg; 68 // write(cmd); 69 // } else { 70 // super.write(ctx, msg, promise); 71 // } 72 // } 73 74 override void encode(Object message, Connection connection) { 75 ctx = connection; 76 77 ICommand cmd = cast(ICommand)message; 78 if(cmd is null) { 79 warningf("The message is not a ICommand: %s", typeid(message)); 80 } 81 82 version(HUNT_DB_DEBUG_MORE) 83 tracef("encoding a message: %s", typeid(message)); 84 85 CommandCodecBase cmdCodec = wrap(cmd); 86 87 cmdCodec.completionHandler = (ICommandResponse resp) { 88 version(HUNT_DB_DEBUG_MORE) { 89 infof("message encoding completed"); 90 // CommandCodecBase c = inflight.front(); 91 // assert(cmdCodec is c); 92 if(resp.failed()) { 93 Throwable th = resp.cause(); 94 warningf("Response error: %s", th.msg); 95 } 96 } 97 version(HUNT_DB_DEBUG_MORE) tracef("%s", typeid(cast(Object)resp)); 98 inflight.removeFront(); 99 100 if(!resp.isCommandAttatched()) { 101 // infof("No command attatched for %s", typeid(cast(Object)cmdCodec)); 102 resp.attachCommand(cmdCodec.getCommand()); 103 } 104 105 NetConnectionHandler handler = ctx.getHandler(); 106 handler.messageReceived(ctx, cast(Object)resp); 107 }; 108 109 inflight.insertBack(cmdCodec); 110 cmdCodec.encode(this); 111 // flush(); 112 } 113 114 private CommandCodecBase wrap(ICommand cmd) { 115 InitCommand initCommand = cast(InitCommand) cmd; 116 if (initCommand !is null) { 117 return new InitCommandCodec(initCommand); 118 } 119 120 SimpleQueryCommand!(RowSet) simpleCommand = cast(SimpleQueryCommand!(RowSet))cmd; 121 if(simpleCommand !is null) { 122 return new SimpleQueryCommandCodec!RowSet(simpleCommand); 123 } 124 125 PrepareStatementCommand prepareCommand = cast(PrepareStatementCommand)cmd; 126 if(prepareCommand !is null) { 127 return new PrepareStatementCodec(prepareCommand); 128 } 129 130 ExtendedQueryCommand!RowSet extendedCommand = cast(ExtendedQueryCommand!RowSet)cmd; 131 if(extendedCommand !is null) { 132 return new ExtendedQueryCommandCodec!RowSet(extendedCommand); 133 } 134 135 ExtendedBatchQueryCommand!RowSet batchQueryCommand = cast(ExtendedBatchQueryCommand!RowSet)cmd; 136 if(batchQueryCommand !is null) { 137 return new ExtendedBatchQueryCommandCodec!RowSet(batchQueryCommand); 138 } 139 140 CloseConnectionCommand connCommand = cast(CloseConnectionCommand)cmd; 141 if(connCommand !is null) { 142 return new CloseConnectionCommandCodec(connCommand); 143 } 144 145 CloseCursorCommand cursorCommand = cast(CloseCursorCommand)cmd; 146 if(cursorCommand !is null) { 147 return new ResetStatementCommandCodec(cursorCommand); 148 } 149 150 CloseStatementCommand statementCommand = cast(CloseStatementCommand)cmd; 151 if(statementCommand !is null) { 152 return new CloseStatementCommandCodec(statementCommand); 153 } 154 warning("Unsupported command " ~ (cast(Object)cmd).toString()); 155 throw new UnsupportedOperationException("Todo"); 156 } 157 158 // private CommandCodec<?, ?> wrap(CommandBase<?> cmd) { 159 // if (cmd instanceof InitCommand) { 160 // return new InitCommandCodec((InitCommand) cmd); 161 // } else if (cmd instanceof SimpleQueryCommand) { 162 // return new SimpleQueryCommandCodec((SimpleQueryCommand) cmd); 163 // } else if (cmd instanceof ExtendedQueryCommand) { 164 // return new ExtendedQueryCommandCodec((ExtendedQueryCommand) cmd); 165 // } else if (cmd instanceof ExtendedBatchQueryCommand<?>) { 166 // return new ExtendedBatchQueryCommandCodec<>((ExtendedBatchQueryCommand<?>) cmd); 167 // } else if (cmd instanceof CloseConnectionCommand) { 168 // return new CloseConnectionCommandCodec((CloseConnectionCommand) cmd); 169 // } else if (cmd instanceof PrepareStatementCommand) { 170 // return new PrepareStatementCodec((PrepareStatementCommand) cmd); 171 // } else if (cmd instanceof CloseStatementCommand) { 172 // return new CloseStatementCommandCodec((CloseStatementCommand) cmd); 173 // } else if (cmd instanceof CloseCursorCommand) { 174 // return new ResetStatementCommandCodec((CloseCursorCommand) cmd); 175 // } else if (cmd instanceof PingCommand) { 176 // return new PingCommandCodec((PingCommand) cmd); 177 // } else if (cmd instanceof InitDbCommand) { 178 // return new InitDbCommandCodec((InitDbCommand) cmd); 179 // } else if (cmd instanceof StatisticsCommand) { 180 // return new StatisticsCommandCodec((StatisticsCommand) cmd); 181 // } else if (cmd instanceof SetOptionCommand) { 182 // return new SetOptionCommandCodec((SetOptionCommand) cmd); 183 // } else if (cmd instanceof ResetConnectionCommand) { 184 // return new ResetConnectionCommandCodec((ResetConnectionCommand) cmd); 185 // } else if (cmd instanceof DebugCommand) { 186 // return new DebugCommandCodec((DebugCommand) cmd); 187 // } else if (cmd instanceof ChangeUserCommand) { 188 // return new ChangeUserCommandCodec((ChangeUserCommand) cmd); 189 // } else { 190 // System.out.println("Unsupported command " ~ cmd); 191 // throw new UnsupportedOperationException("Todo"); 192 // } 193 // } 194 195 196 void write(ByteBuf outBuffer) { 197 // FIXME: Needing refactor or cleanup -@zxp at 9/3/2019, 6:19:07 PM 198 // 199 writeAndFlush(outBuffer); 200 } 201 202 void writeAndFlush(ByteBuf outBuffer) { 203 version(HUNT_DB_DEBUG_MORE) trace("writting ..."); 204 205 if(ctx is null) { 206 warning("ctx is null"); 207 return ; 208 } 209 210 if (outBuffer !is null) { 211 ByteBuf buff = outBuffer; 212 byte[] avaliableData = buff.getReadableBytes(); 213 // version(HUNT_DB_DEBUG_MORE) { 214 // tracef("buffer: %s", buff.toString()); 215 // // tracef("buffer data: %s", cast(string)avaliableData); 216 // } 217 ctx.write(cast(const(ubyte)[])avaliableData); 218 } 219 } 220 221 222 private void initSupportedCapabilitiesFlags() { 223 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PLUGIN_AUTH; 224 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA; 225 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_SECURE_CONNECTION; 226 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PROTOCOL_41; 227 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_TRANSACTIONS; 228 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_MULTI_STATEMENTS; 229 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_MULTI_RESULTS; 230 clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_SESSION_TRACK; 231 } 232 }