1 /* 2 * Copyright (C) 2018 Julien Viet 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.driver.postgresql.impl.codec.PgEncoder; 18 19 import hunt.database.driver.postgresql.impl.codec.Bind; 20 import hunt.database.driver.postgresql.impl.codec.CloseConnectionCommandCodec; 21 import hunt.database.driver.postgresql.impl.codec.ClosePortalCommandCodec; 22 import hunt.database.driver.postgresql.impl.codec.CloseStatementCommandCodec; 23 import hunt.database.driver.postgresql.impl.codec.DataTypeCodec; 24 import hunt.database.driver.postgresql.impl.codec.DataFormat; 25 import hunt.database.driver.postgresql.impl.codec.DataType; 26 import hunt.database.driver.postgresql.impl.codec.DataTypeDesc; 27 import hunt.database.driver.postgresql.impl.codec.Describe; 28 import hunt.database.driver.postgresql.impl.codec.ExtendedBatchQueryCommandCodec; 29 import hunt.database.driver.postgresql.impl.codec.ExtendedQueryCommandCodec; 30 import hunt.database.driver.postgresql.impl.codec.InitCommandCodec; 31 import hunt.database.driver.postgresql.impl.codec.PgColumnDesc; 32 import hunt.database.driver.postgresql.impl.codec.CommandCodec; 33 import hunt.database.driver.postgresql.impl.codec.PgDecoder; 34 import hunt.database.driver.postgresql.impl.codec.PrepareStatementCommandCodec; 35 import hunt.database.driver.postgresql.impl.codec.Query; 36 import hunt.database.driver.postgresql.impl.codec.QueryCommandBaseCodec; 37 import hunt.database.driver.postgresql.impl.codec.Parse; 38 import hunt.database.driver.postgresql.impl.codec.PasswordMessage; 39 import hunt.database.driver.postgresql.impl.codec.Response; 40 import hunt.database.driver.postgresql.impl.codec.SimpleQueryCommandCodec; 41 import hunt.database.driver.postgresql.impl.codec.StartupMessage; 42 import hunt.database.base.Util; 43 44 import hunt.database.base.AsyncResult; 45 import hunt.database.base.Exceptions; 46 import hunt.database.base.impl.Connection; 47 import hunt.database.base.impl.ParamDesc; 48 import hunt.database.base.impl.RowDesc; 49 import hunt.database.base.impl.TxStatus; 50 import hunt.database.base.impl.command; 51 import hunt.database.base.RowSet; 52 53 54 import hunt.collection.ArrayDeque; 55 import hunt.io.ByteBuffer; 56 import hunt.collection.List; 57 import hunt.collection.Map; 58 import hunt.Exceptions; 59 import hunt.logging; 60 import hunt.net.buffer; 61 import hunt.net.codec.Encoder; 62 import hunt.net.Connection; 63 import hunt.text.Charset; 64 65 alias writeCString = Util.writeCString; 66 67 import std.container.dlist; 68 import std.range; 69 import std.variant; 70 71 /** 72 * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a> 73 * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> 74 */ 75 final class PgEncoder : EncoderChain { 76 77 // Frontend message types for {@link pgclient.impl.codec.encoder.MessageEncoder} 78 79 private enum byte PASSWORD_MESSAGE = 'p'; 80 private enum byte QUERY = 'Q'; 81 private enum byte TERMINATE = 'X'; 82 private enum byte PARSE = 'P'; 83 private enum byte BIND = 'B'; 84 private enum byte DESCRIBE = 'D'; 85 private enum byte EXECUTE = 'E'; 86 private enum byte CLOSE = 'C'; 87 private enum byte SYNC = 'S'; 88 89 // private ArrayDeque<CommandCodec<?, ?>> inflight; 90 private DList!(CommandCodecBase) *inflight; 91 private Connection ctx; 92 private ByteBuf outBuffer; 93 private PgDecoder dec; 94 95 this(PgDecoder dec, ref DList!(CommandCodecBase) inflight) { 96 this.inflight = &inflight; 97 this.dec = dec; 98 } 99 100 override void encode(Object message, Connection connection) { 101 ctx = connection; 102 103 ICommand cmd = cast(ICommand)message; 104 if(cmd is null) { 105 warningf("The message is not a ICommand: %s", typeid(message)); 106 } 107 108 version(HUNT_DB_DEBUG_MORE) 109 tracef("encoding a message: %s", typeid(message)); 110 111 CommandCodecBase cmdCodec = wrap(cmd); 112 113 cmdCodec.completionHandler = (ICommandResponse resp) { 114 version(HUNT_DB_DEBUG) { 115 tracef("message encoding completed"); 116 // CommandCodecBase c = inflight.front(); 117 // assert(cmdCodec is c); 118 if(resp.failed()) { 119 Throwable th = resp.cause(); 120 warningf("Response error: %s", th.msg); 121 } 122 } 123 version(HUNT_DB_DEBUG_MORE) tracef("%s", typeid(cast(Object)resp)); 124 inflight.removeFront(); 125 126 if(!resp.isCommandAttatched()) { 127 // infof("No command attatched for %s", typeid(cast(Object)cmdCodec)); 128 resp.attachCommand(cmdCodec.getCommand()); 129 } 130 131 NetConnectionHandler handler = ctx.getHandler(); 132 if(handler !is null) handler.messageReceived(ctx, cast(Object)resp); 133 }; 134 135 inflight.insertBack(cmdCodec); 136 cmdCodec.encode(this); 137 flush(); 138 } 139 140 private CommandCodecBase wrap(ICommand cmd) { 141 InitCommand initCommand = cast(InitCommand) cmd; 142 if (initCommand !is null) { 143 return new InitCommandCodec(initCommand); 144 } 145 146 SimpleQueryCommand!(RowSet) simpleCommand = cast(SimpleQueryCommand!(RowSet))cmd; 147 if(simpleCommand !is null) { 148 return new SimpleQueryCommandCodec!RowSet(simpleCommand); 149 } 150 151 PrepareStatementCommand prepareCommand = cast(PrepareStatementCommand)cmd; 152 if(prepareCommand !is null) { 153 return new PrepareStatementCommandCodec(prepareCommand); 154 } 155 156 ExtendedQueryCommand!RowSet extendedCommand = cast(ExtendedQueryCommand!RowSet)cmd; 157 if(extendedCommand !is null) { 158 return new ExtendedQueryCommandCodec!RowSet(extendedCommand); 159 } 160 161 ExtendedBatchQueryCommand!RowSet batchQueryCommand = cast(ExtendedBatchQueryCommand!RowSet)cmd; 162 if(batchQueryCommand !is null) { 163 return new ExtendedBatchQueryCommandCodec!RowSet(batchQueryCommand); 164 } 165 166 CloseConnectionCommand connCommand = cast(CloseConnectionCommand)cmd; 167 if(connCommand !is null) { 168 return CloseConnectionCommandCodec.INSTANCE(); 169 } 170 171 CloseCursorCommand cursorCommand = cast(CloseCursorCommand)cmd; 172 if(cursorCommand !is null) { 173 return new ClosePortalCommandCodec(cursorCommand); 174 } 175 176 CloseStatementCommand statementCommand = cast(CloseStatementCommand)cmd; 177 if(statementCommand !is null) { 178 return new CloseStatementCommandCodec(statementCommand); 179 } 180 181 throw new AssertionError(); 182 } 183 184 // override 185 // void handlerAdded(Connection ctx) { 186 // // TODO: Tasks pending completion -@zxp at 8/22/2019, 5:50:54 PM 187 // // 188 // this.ctx = ctx; 189 // } 190 191 void flush() { 192 version(HUNT_DB_DEBUG_MORE) trace("flushing ..."); 193 194 if(ctx is null) { 195 warning("ctx is null"); 196 return ; 197 } 198 199 if (outBuffer !is null) { 200 ByteBuf buff = outBuffer; 201 outBuffer = null; 202 byte[] avaliableData = buff.getReadableBytes(); 203 // version(HUNT_DB_DEBUG_MORE) { 204 // tracef("buffer: %s", buff.toString()); 205 // // tracef("buffer data: %s", cast(string)avaliableData); 206 // } 207 ctx.write(cast(const(ubyte)[])avaliableData); 208 } 209 } 210 211 /** 212 * This message immediately closes the connection. On receipt of this message, 213 * the backend closes the connection and terminates. 214 */ 215 void writeTerminate() { 216 ensureBuffer(); 217 outBuffer.writeByte(TERMINATE); 218 outBuffer.writeInt(4); 219 } 220 221 /** 222 * <p> 223 * The purpose of this message is to provide a resynchronization point for error recovery. 224 * When an error is detected while processing any extended-query message, the backend issues {@link ErrorResponse}, 225 * then reads and discards messages until this message is reached, then issues {@link ReadyForQuery} and returns to normal 226 * message processing. 227 * <p> 228 * Note that no skipping occurs if an error is detected while processing this message which ensures that there is one 229 * and only one {@link ReadyForQuery} sent for each of this message. 230 * <p> 231 * Note this message does not cause a transaction block opened with BEGIN to be closed. It is possible to detect this 232 * situation in {@link ReadyForQuery#txStatus()} that includes {@link TxStatus} information. 233 */ 234 void writeSync() { 235 ensureBuffer(); 236 outBuffer.writeByte(SYNC); 237 outBuffer.writeInt(4); 238 } 239 240 /** 241 * <p> 242 * The message closes an existing prepared statement or portal and releases resources. 243 * Note that closing a prepared statement implicitly closes any open portals that were constructed from that statement. 244 * <p> 245 * The response is either {@link CloseComplete} or {@link ErrorResponse} 246 * 247 * @param portal 248 */ 249 void writeClosePortal(string portal) { 250 ensureBuffer(); 251 int pos = outBuffer.writerIndex(); 252 outBuffer.writeByte(CLOSE); 253 outBuffer.writeInt(0); 254 outBuffer.writeByte('P'); // 'S' to close a prepared statement or 'P' to close a portal 255 Util.writeCStringUTF8(outBuffer, portal); 256 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1); 257 } 258 259 void writeStartupMessage(StartupMessage msg) { 260 ensureBuffer(); 261 262 int pos = outBuffer.writerIndex(); 263 264 outBuffer.writeInt(0); 265 // protocol version 266 outBuffer.writeShort(3); 267 outBuffer.writeShort(0); 268 269 writeCString(outBuffer, StartupMessage.BUFF_USER); 270 Util.writeCStringUTF8(outBuffer, msg.username); 271 writeCString(outBuffer, StartupMessage.BUFF_DATABASE); 272 Util.writeCStringUTF8(outBuffer, msg.database); 273 foreach (MapEntry!(string, string) property ; msg.properties) { 274 writeCString(outBuffer, property.getKey(), StandardCharsets.UTF_8); 275 writeCString(outBuffer, property.getValue(), StandardCharsets.UTF_8); 276 } 277 278 outBuffer.writeByte(0); 279 outBuffer.setInt(pos, outBuffer.writerIndex() - pos); 280 } 281 282 void writePasswordMessage(PasswordMessage msg) { 283 ensureBuffer(); 284 int pos = outBuffer.writerIndex(); 285 outBuffer.writeByte(PASSWORD_MESSAGE); 286 outBuffer.writeInt(0); 287 Util.writeCStringUTF8(outBuffer, msg.hash); 288 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos- 1); 289 } 290 291 /** 292 * <p> 293 * This message includes an SQL command (or commands) expressed as a text string. 294 * <p> 295 * The possible response messages from the backend are 296 * {@link CommandComplete}, {@link RowDesc}, {@link DataRow}, {@link EmptyQueryResponse}, {@link ErrorResponse}, 297 * {@link ReadyForQuery} and {@link NoticeResponse} 298 */ 299 void writeQuery(Query query) { 300 ensureBuffer(); 301 int pos = outBuffer.writerIndex(); 302 outBuffer.writeByte(QUERY); 303 outBuffer.writeInt(0); 304 Util.writeCStringUTF8(outBuffer, query.sql); 305 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1); 306 } 307 308 /** 309 * <p> 310 * The message that using "statement" variant specifies the name of an existing prepared statement. 311 * <p> 312 * The response is a {@link ParamDesc} message describing the parameters needed by the statement, 313 * followed by a {@link RowDesc} message describing the rows that will be returned when the statement is eventually 314 * executed or a {@link NoData} message if the statement will not return rows. 315 * {@link ErrorResponse} is issued if there is no such prepared statement. 316 * <p> 317 * Note that since {@link Bind} has not yet been issued, the formats to be used for returned columns are not yet known to 318 * the backend; the format code fields in the {@link RowDesc} message will be zeroes in this case. 319 * <p> 320 * The message that using "portal" variant specifies the name of an existing portal. 321 * <p> 322 * The response is a {@link RowDesc} message describing the rows that will be returned by executing the portal; 323 * or a {@link NoData} message if the portal does not contain a query that will return rows; or {@link ErrorResponse} 324 * if there is no such portal. 325 */ 326 void writeDescribe(Describe describe) { 327 ensureBuffer(); 328 int pos = outBuffer.writerIndex(); 329 outBuffer.writeByte(DESCRIBE); 330 outBuffer.writeInt(0); 331 if (describe.statement != 0) { 332 outBuffer.writeByte('S'); 333 outBuffer.writeLong(describe.statement); 334 } else if (describe.portal !is null) { 335 outBuffer.writeByte('P'); 336 Util.writeCStringUTF8(outBuffer, describe.portal); 337 } else { 338 outBuffer.writeByte('S'); 339 Util.writeCStringUTF8(outBuffer, ""); 340 } 341 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos- 1); 342 } 343 344 /** 345 * <p> 346 * The message contains a textual SQL query string. 347 * <p> 348 * The response is either {@link ParseComplete} or {@link ErrorResponse} 349 */ 350 void writeParse(Parse parse) { 351 ensureBuffer(); 352 int pos = outBuffer.writerIndex(); 353 outBuffer.writeByte(PARSE); 354 outBuffer.writeInt(0); 355 if (parse.statement == 0) { 356 outBuffer.writeByte(0); 357 } else { 358 outBuffer.writeLong(parse.statement); 359 } 360 Util.writeCStringUTF8(outBuffer, parse.query); 361 // no parameter data types (OIDs) 362 // if(paramDataTypes is null) { 363 outBuffer.writeShort(0); 364 // } else { 365 // // Parameter data types (OIDs) 366 // outBuffer.writeShort(paramDataTypes.length); 367 // for (int paramDataType : paramDataTypes) { 368 // outBuffer.writeInt(paramDataType); 369 // } 370 // } 371 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1); 372 } 373 374 /** 375 * The message specifies the portal and a maximum row count (zero meaning "fetch all rows") of the result. 376 * <p> 377 * The row count of the result is only meaningful for portals containing commands that return row sets; 378 * in other cases the command is always executed to completion, and the row count of the result is ignored. 379 * <p> 380 * The possible responses to this message are the same as {@link Query} message, except that 381 * it doesn't cause {@link ReadyForQuery} or {@link RowDesc} to be issued. 382 * <p> 383 * If Execute terminates before completing the execution of a portal, it will send a {@link PortalSuspended} message; 384 * the appearance of this message tells the frontend that another Execute should be issued against the same portal to 385 * complete the operation. The {@link CommandComplete} message indicating completion of the source SQL command 386 * is not sent until the portal's execution is completed. Therefore, This message is always terminated by 387 * the appearance of exactly one of these messages: {@link CommandComplete}, 388 * {@link EmptyQueryResponse}, {@link ErrorResponse} or {@link PortalSuspended}. 389 * 390 * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a> 391 */ 392 void writeExecute(string portal, int rowCount) { 393 ensureBuffer(); 394 int pos = outBuffer.writerIndex(); 395 outBuffer.writeByte(EXECUTE); 396 outBuffer.writeInt(0); 397 if (portal !is null) { 398 outBuffer.writeCharSequence(portal, StandardCharsets.UTF_8); 399 } 400 outBuffer.writeByte(0); 401 outBuffer.writeInt(rowCount); // Zero denotes "no limit" maybe for ReadStream!(Row) 402 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1); 403 } 404 405 /** 406 * <p> 407 * The message gives the name of the prepared statement, the name of portal, 408 * and the values to use for any parameter values present in the prepared statement. 409 * The supplied parameter set must match those needed by the prepared statement. 410 * <p> 411 * The response is either {@link BindComplete} or {@link ErrorResponse}. 412 */ 413 void writeBind(Bind bind, string portal, List!(Variant) paramValues) { 414 ensureBuffer(); 415 int pos = outBuffer.writerIndex(); 416 outBuffer.writeByte(BIND); 417 outBuffer.writeInt(0); 418 if (portal !is null) { 419 outBuffer.writeCharSequence(portal, StandardCharsets.UTF_8); 420 } 421 outBuffer.writeByte(0); 422 if (bind.statement == 0) { 423 outBuffer.writeByte(0); 424 } else { 425 outBuffer.writeLong(bind.statement); 426 } 427 int paramLen = paramValues.size(); 428 outBuffer.writeShort(paramLen); 429 // Parameter formats 430 for (int c = 0;c < paramLen;c++) { 431 // for now each format is Binary 432 outBuffer.writeShort(bind.paramTypes[c].supportsBinary ? 1 : 0); 433 } 434 outBuffer.writeShort(paramLen); 435 for (int c = 0;c < paramLen;c++) { 436 Variant param = paramValues.get(c); 437 if (!param.hasValue() || param == null) { 438 // NULL value 439 outBuffer.writeInt(-1); 440 } else { 441 DataTypeDesc dataType = bind.paramTypes[c]; 442 version(HUNT_DB_DEBUG_MORE) { 443 tracef("dataType: %s, param: type=%s, value=%s", 444 dataType, param.type, param.toString()); 445 } 446 447 if (dataType.supportsBinary) { 448 int idx = outBuffer.writerIndex(); 449 outBuffer.writeInt(0); 450 try { 451 DataTypeCodec.encodeBinary(cast(DataType)dataType.id, param, outBuffer); 452 } catch(Throwable ex) { 453 version(HUNT_DB_DEBUG_MORE) warning(ex); 454 else warning(ex.msg); 455 throw new DatabaseException(ex.msg); 456 } 457 outBuffer.setInt(idx, outBuffer.writerIndex() - idx - 4); 458 } else { 459 DataTypeCodec.encodeText(cast(DataType)dataType.id, param, outBuffer); 460 } 461 } 462 } 463 464 // MAKE resultColumsn non null to avoid null check 465 466 // Result columns are all in Binary format 467 if (bind.resultColumns.length > 0) { 468 outBuffer.writeShort(cast(int)bind.resultColumns.length); 469 foreach (PgColumnDesc resultColumn; bind.resultColumns) { 470 outBuffer.writeShort(resultColumn.dataType.supportsBinary ? 1 : 0); 471 } 472 } else { 473 outBuffer.writeShort(1); 474 outBuffer.writeShort(1); 475 } 476 outBuffer.setInt(pos + 1, outBuffer.writerIndex() - pos - 1); 477 } 478 479 private void ensureBuffer() { 480 if (outBuffer is null) { 481 outBuffer = Unpooled.buffer(_bufferSize); 482 } 483 } 484 }