1 module hunt.database.driver.mysql.impl.codec.MySQLEncoder;
2 
3 import hunt.database.driver.mysql.impl.codec.CapabilitiesFlag;
4 import hunt.database.driver.mysql.impl.codec.CloseConnectionCommandCodec;
5 import hunt.database.driver.mysql.impl.codec.CloseStatementCommandCodec;
6 import hunt.database.driver.mysql.impl.codec.CommandCodec;
7 import hunt.database.driver.mysql.impl.codec.ExtendedBatchQueryCommandCodec;
8 import hunt.database.driver.mysql.impl.codec.ExtendedQueryCommandCodec;
9 import hunt.database.driver.mysql.impl.codec.InitCommandCodec;
10 import hunt.database.driver.mysql.impl.codec.PrepareStatementCodec;
11 import hunt.database.driver.mysql.impl.codec.ResetStatementCommandCodec;
12 import hunt.database.driver.mysql.impl.codec.SimpleQueryCommandCodec;
13 
14 import hunt.database.driver.mysql.impl.command.ChangeUserCommand;
15 import hunt.database.driver.mysql.impl.command.DebugCommand;
16 import hunt.database.driver.mysql.impl.command.InitDbCommand;
17 import hunt.database.driver.mysql.impl.command.PingCommand;
18 import hunt.database.driver.mysql.impl.command.ResetConnectionCommand;
19 import hunt.database.driver.mysql.impl.command.SetOptionCommand;
20 import hunt.database.driver.mysql.impl.command.StatisticsCommand;
21 
22 import hunt.database.base.AsyncResult;
23 import hunt.database.base.Exceptions;
24 import hunt.database.base.impl.Connection;
25 import hunt.database.base.impl.ParamDesc;
26 import hunt.database.base.impl.RowDesc;
27 import hunt.database.base.impl.TxStatus;
28 import hunt.database.base.impl.command;
29 import hunt.database.base.RowSet;
30 
31 import hunt.collection.ArrayDeque;
32 import hunt.io.ByteBuffer;
33 import hunt.collection.List;
34 import hunt.collection.Map;
35 import hunt.Exceptions;
36 import hunt.logging;
37 import hunt.net.buffer;
38 import hunt.net.codec.Encoder;
39 import hunt.net.Connection;
40 import hunt.text.Charset;
41 
42 import std.container.dlist;
43 import std.range;
44 import std.variant;
45 
46 /**
47  * 
48  */
49 class MySQLEncoder : EncoderChain {
50 
51     // private final ArrayDeque<CommandCodec<?, ?>> inflight;
52     private DList!(CommandCodecBase) *inflight;
53     Connection ctx;
54 
55     int clientCapabilitiesFlag = 0x00000000;
56     Charset charset;
57 
58     this(ref DList!(CommandCodecBase) inflight) {
59         this.inflight = &inflight;
60         this.charset = StandardCharsets.UTF_8;
61         initSupportedCapabilitiesFlags();
62     }
63 
64     // override
65     // void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
66     //     if (msg instanceof CommandBase<?>) {
67     //         CommandBase<?> cmd = (CommandBase<?>) msg;
68     //         write(cmd);
69     //     } else {
70     //         super.write(ctx, msg, promise);
71     //     }
72     // }
73 
74     override void encode(Object message, Connection connection) {
75         ctx = connection;
76 
77         ICommand cmd = cast(ICommand)message;
78         if(cmd is null) {
79             warningf("The message is not a ICommand: %s", typeid(message));
80         }
81 
82         version(HUNT_DB_DEBUG_MORE) 
83         tracef("encoding a message: %s", typeid(message));
84 
85         CommandCodecBase cmdCodec = wrap(cmd);
86 
87         cmdCodec.completionHandler = (ICommandResponse resp) {
88             version(HUNT_DB_DEBUG_MORE) {
89                 infof("message encoding completed");
90                 // CommandCodecBase c = inflight.front();
91                 // assert(cmdCodec is c);
92                 if(resp.failed()) {
93                     Throwable th = resp.cause();
94                     warningf("Response error: %s", th.msg);
95                 }
96             }
97             version(HUNT_DB_DEBUG_MORE) tracef("%s", typeid(cast(Object)resp));
98             inflight.removeFront();
99 
100             if(!resp.isCommandAttatched()) {
101                 // infof("No command attatched for %s", typeid(cast(Object)cmdCodec));
102                 resp.attachCommand(cmdCodec.getCommand());
103             }
104 
105             NetConnectionHandler handler = ctx.getHandler();
106             handler.messageReceived(ctx, cast(Object)resp);
107         };
108 
109         inflight.insertBack(cmdCodec);
110         cmdCodec.encode(this);
111         // flush();
112 	}
113 
114     private CommandCodecBase wrap(ICommand cmd) {
115         InitCommand initCommand = cast(InitCommand) cmd;
116         if (initCommand !is null) {
117             return new InitCommandCodec(initCommand);
118         }
119 
120         SimpleQueryCommand!(RowSet) simpleCommand = cast(SimpleQueryCommand!(RowSet))cmd;
121         if(simpleCommand !is null) {
122             return new SimpleQueryCommandCodec!RowSet(simpleCommand);
123         }
124 
125         PrepareStatementCommand prepareCommand = cast(PrepareStatementCommand)cmd;
126         if(prepareCommand !is null) {
127             return new PrepareStatementCodec(prepareCommand);
128         }
129 
130         ExtendedQueryCommand!RowSet extendedCommand = cast(ExtendedQueryCommand!RowSet)cmd;
131         if(extendedCommand !is null) {
132             return new ExtendedQueryCommandCodec!RowSet(extendedCommand);
133         }
134 
135         ExtendedBatchQueryCommand!RowSet batchQueryCommand = cast(ExtendedBatchQueryCommand!RowSet)cmd;
136         if(batchQueryCommand !is null) {
137             return new ExtendedBatchQueryCommandCodec!RowSet(batchQueryCommand);
138         }
139 
140         CloseConnectionCommand connCommand = cast(CloseConnectionCommand)cmd;
141         if(connCommand !is null) {
142             return new CloseConnectionCommandCodec(connCommand);
143         }
144 
145         CloseCursorCommand cursorCommand = cast(CloseCursorCommand)cmd;
146         if(cursorCommand !is null) {
147             return new ResetStatementCommandCodec(cursorCommand);
148         }
149 
150         CloseStatementCommand statementCommand = cast(CloseStatementCommand)cmd;
151         if(statementCommand !is null) {
152             return new CloseStatementCommandCodec(statementCommand);
153         }
154         warning("Unsupported command " ~ (cast(Object)cmd).toString());
155         throw new UnsupportedOperationException("Todo");
156     }
157 
158     // private CommandCodec<?, ?> wrap(CommandBase<?> cmd) {
159     //     if (cmd instanceof InitCommand) {
160     //         return new InitCommandCodec((InitCommand) cmd);
161     //     } else if (cmd instanceof SimpleQueryCommand) {
162     //         return new SimpleQueryCommandCodec((SimpleQueryCommand) cmd);
163     //     } else if (cmd instanceof ExtendedQueryCommand) {
164     //         return new ExtendedQueryCommandCodec((ExtendedQueryCommand) cmd);
165     //     } else if (cmd instanceof ExtendedBatchQueryCommand<?>) {
166     //         return new ExtendedBatchQueryCommandCodec<>((ExtendedBatchQueryCommand<?>) cmd);
167     //     } else if (cmd instanceof CloseConnectionCommand) {
168     //         return new CloseConnectionCommandCodec((CloseConnectionCommand) cmd);
169     //     } else if (cmd instanceof PrepareStatementCommand) {
170     //         return new PrepareStatementCodec((PrepareStatementCommand) cmd);
171     //     } else if (cmd instanceof CloseStatementCommand) {
172     //         return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
173     //     } else if (cmd instanceof CloseCursorCommand) {
174     //         return new ResetStatementCommandCodec((CloseCursorCommand) cmd);
175     //     } else if (cmd instanceof PingCommand) {
176     //         return new PingCommandCodec((PingCommand) cmd);
177     //     } else if (cmd instanceof InitDbCommand) {
178     //         return new InitDbCommandCodec((InitDbCommand) cmd);
179     //     } else if (cmd instanceof StatisticsCommand) {
180     //         return new StatisticsCommandCodec((StatisticsCommand) cmd);
181     //     } else if (cmd instanceof SetOptionCommand) {
182     //         return new SetOptionCommandCodec((SetOptionCommand) cmd);
183     //     } else if (cmd instanceof ResetConnectionCommand) {
184     //         return new ResetConnectionCommandCodec((ResetConnectionCommand) cmd);
185     //     } else if (cmd instanceof DebugCommand) {
186     //         return new DebugCommandCodec((DebugCommand) cmd);
187     //     } else if (cmd instanceof ChangeUserCommand) {
188     //         return new ChangeUserCommandCodec((ChangeUserCommand) cmd);
189     //     } else {
190     //         System.out.println("Unsupported command " ~ cmd);
191     //         throw new UnsupportedOperationException("Todo");
192     //     }
193     // }
194 
195 
196     void write(ByteBuf outBuffer) {
197         // FIXME: Needing refactor or cleanup -@zxp at 9/3/2019, 6:19:07 PM
198         // 
199         writeAndFlush(outBuffer);
200     }
201 
202     void writeAndFlush(ByteBuf outBuffer) {
203         version(HUNT_DB_DEBUG_MORE) trace("writting ...");
204 
205         if(ctx is null) {
206             warning("ctx is null");
207             return ;
208         }
209 
210         if (outBuffer !is null) {
211             ByteBuf buff = outBuffer;
212             byte[] avaliableData = buff.getReadableBytes();
213             // version(HUNT_DB_DEBUG_MORE) {
214             //     tracef("buffer: %s", buff.toString());
215             //     // tracef("buffer data: %s", cast(string)avaliableData);
216             // }
217             ctx.write(cast(const(ubyte)[])avaliableData);
218         }        
219     }
220     
221 
222     private void initSupportedCapabilitiesFlags() {
223         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PLUGIN_AUTH;
224         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA;
225         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_SECURE_CONNECTION;
226         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_PROTOCOL_41;
227         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_TRANSACTIONS;
228         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_MULTI_STATEMENTS;
229         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_MULTI_RESULTS;
230         clientCapabilitiesFlag |= CapabilitiesFlag.CLIENT_SESSION_TRACK;
231     }
232 }