1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except inBuffer compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to inBuffer writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.driver.postgresql.impl.codec.PgDecoder; 19 20 import hunt.database.driver.postgresql.impl.codec.Bind; 21 import hunt.database.driver.postgresql.impl.codec.DataFormat; 22 import hunt.database.driver.postgresql.impl.codec.DataType; 23 import hunt.database.driver.postgresql.impl.codec.DataTypeDesc; 24 import hunt.database.driver.postgresql.impl.codec.Describe; 25 import hunt.database.driver.postgresql.impl.codec.ErrorResponse; 26 import hunt.database.driver.postgresql.impl.codec.InitCommandCodec; 27 import hunt.database.driver.postgresql.impl.codec.NoticeResponse; 28 import hunt.database.driver.postgresql.impl.codec.PgColumnDesc; 29 import hunt.database.driver.postgresql.impl.codec.CommandCodec; 30 import hunt.database.driver.postgresql.impl.codec.PgParamDesc; 31 import hunt.database.driver.postgresql.impl.codec.PgProtocolConstants; 32 import hunt.database.driver.postgresql.impl.codec.PgRowDesc; 33 import hunt.database.driver.postgresql.impl.codec.Parse; 34 import hunt.database.driver.postgresql.impl.codec.PasswordMessage; 35 import hunt.database.driver.postgresql.impl.codec.QueryCommandBaseCodec; 36 import hunt.database.driver.postgresql.impl.codec.Response; 37 import hunt.database.driver.postgresql.impl.codec.StartupMessage; 38 import hunt.database.base.Util; 39 40 import hunt.database.base.impl.Notification; 41 import hunt.database.base.impl.RowDecoder; 42 import hunt.database.base.impl.TxStatus; 43 44 import hunt.collection.List; 45 import hunt.collection.Map; 46 import hunt.Exceptions; 47 import hunt.io.ByteBuffer; 48 import hunt.io.BufferUtils; 49 import hunt.io.channel; 50 import hunt.logging; 51 import hunt.net.codec.Decoder; 52 import hunt.net.Connection; 53 import hunt.net.buffer; 54 55 import std.container.dlist; 56 import std.conv; 57 58 59 /** 60 * 61 * Decoder for <a href="https://www.postgresql.org/docs/9.5/static/protocol.html">PostgreSQL protocol</a> 62 * 63 * @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a> 64 */ 65 66 class PgDecoder : Decoder { 67 68 private ByteBufAllocator alloc; 69 private DList!(CommandCodecBase) *inflight; 70 private ByteBuf inBuffer; 71 private CommandCompleteProcessor processor; 72 73 this(ref DList!(CommandCodecBase) inflight) { 74 this.inflight = &inflight; 75 processor = new CommandCompleteProcessor(); 76 alloc = UnpooledByteBufAllocator.DEFAULT(); 77 } 78 79 // override 80 // void handlerAdded(ChannelHandlerContext ctx) { 81 // alloc = ctx.alloc(); 82 // } 83 84 DataHandleStatus decode(ByteBuffer payload, Connection connection) { 85 DataHandleStatus resultStatus = DataHandleStatus.Done; 86 try { 87 resultStatus = doEecode(payload, connection); 88 } catch(Exception ex) { 89 BufferUtils.clear(payload); 90 warning(ex.msg); 91 version(HUNT_DEBUG) warning(ex); 92 } 93 return resultStatus; 94 } 95 96 private DataHandleStatus doEecode(ByteBuffer msg, Connection connection) { 97 version(HUNT_DB_DEBUG_MORE) tracef("decoding buffer: %s", msg.toString()); 98 DataHandleStatus resultStatus = DataHandleStatus.Done; 99 100 ByteBuf buff = Unpooled.wrappedBuffer(msg); 101 if (inBuffer is null) { 102 inBuffer = buff; 103 } else { 104 CompositeByteBuf composite = cast(CompositeByteBuf) inBuffer; 105 if (composite is null) { 106 composite = alloc.compositeBuffer(); 107 composite.addComponent(true, inBuffer); 108 inBuffer = composite; 109 } 110 composite.addComponent(true, buff); 111 } 112 113 while (true) { 114 int available = inBuffer.readableBytes(); 115 if (available < 5) { 116 break; 117 } 118 int beginIdx = inBuffer.readerIndex(); 119 int length = inBuffer.getInt(beginIdx + 1); 120 121 version(HUNT_DB_DEBUG_MORE) { 122 tracef("beginIdx: %d, required: %d, available: %d", beginIdx, length, available); 123 } 124 125 if (length + 1 > available) { 126 version(HUNT_DB_DEBUG) { 127 warningf("Waitting for more data: required: %d, available: %d", length, available); 128 } 129 break; 130 } 131 byte id = inBuffer.getByte(beginIdx); 132 int endIdx = beginIdx + length + 1; 133 int writerIndex = inBuffer.writerIndex(); 134 try { 135 inBuffer.setIndex(beginIdx + 5, endIdx); 136 // version(HUNT_DB_DEBUG_MORE) infof("Protocol(Message type) id=%c", cast(char)id); 137 switch (id) { 138 case PgProtocolConstants.MESSAGE_TYPE_READY_FOR_QUERY: { 139 decodeReadyForQuery(inBuffer); 140 break; 141 } 142 case PgProtocolConstants.MESSAGE_TYPE_DATA_ROW: { 143 decodeDataRow(inBuffer); 144 break; 145 } 146 case PgProtocolConstants.MESSAGE_TYPE_COMMAND_COMPLETE: { 147 decodeCommandComplete(inBuffer); 148 break; 149 } 150 case PgProtocolConstants.MESSAGE_TYPE_BIND_COMPLETE: { 151 decodeBindComplete(); 152 break; 153 } 154 default: { 155 decodeMessage(id, inBuffer); 156 } 157 } 158 } catch(Throwable ex) { 159 version(HUNT_DEBUG) { 160 warning(ex); 161 } else { 162 warning(ex.msg); 163 } 164 } finally { 165 inBuffer.setIndex(endIdx, writerIndex); 166 } 167 } 168 169 if (inBuffer !is null) { 170 if(inBuffer.isReadable()) { 171 // copy the remainings in current buffer 172 // version(HUNT_DB_DEBUG_MORE) { 173 // // infof("copying the remaings: %s", inBuffer.toString()); 174 // // tracef("buffer: %s, remaings: %s", inBuffer.toString(), ByteBufUtil.hexDump(inBuffer)); 175 // } 176 // inBuffer = inBuffer.copy(); 177 // version(HUNT_DB_DEBUG_MORE) infof("the remaings: %s", inBuffer.toString()); 178 } else { 179 // clear up the buffer 180 inBuffer.release(); 181 inBuffer = null; 182 } 183 } 184 185 return resultStatus; 186 } 187 188 private void decodeMessage(byte id, ByteBuf inBuffer) { 189 switch (id) { 190 case PgProtocolConstants.MESSAGE_TYPE_ROW_DESCRIPTION: { 191 decodeRowDescription(inBuffer); 192 break; 193 } 194 case PgProtocolConstants.MESSAGE_TYPE_ERROR_RESPONSE: { 195 decodeError(inBuffer); 196 break; 197 } 198 case PgProtocolConstants.MESSAGE_TYPE_NOTICE_RESPONSE: { 199 decodeNotice(inBuffer); 200 break; 201 } 202 case PgProtocolConstants.MESSAGE_TYPE_AUTHENTICATION: { 203 decodeAuthentication(inBuffer); 204 break; 205 } 206 case PgProtocolConstants.MESSAGE_TYPE_EMPTY_QUERY_RESPONSE: { 207 decodeEmptyQueryResponse(); 208 break; 209 } 210 case PgProtocolConstants.MESSAGE_TYPE_PARSE_COMPLETE: { 211 decodeParseComplete(); 212 break; 213 } 214 case PgProtocolConstants.MESSAGE_TYPE_CLOSE_COMPLETE: { 215 decodeCloseComplete(); 216 break; 217 } 218 case PgProtocolConstants.MESSAGE_TYPE_NO_DATA: { 219 decodeNoData(); 220 break; 221 } 222 case PgProtocolConstants.MESSAGE_TYPE_PORTAL_SUSPENDED: { 223 decodePortalSuspended(); 224 break; 225 } 226 case PgProtocolConstants.MESSAGE_TYPE_PARAMETER_DESCRIPTION: { 227 decodeParameterDescription(inBuffer); 228 break; 229 } 230 case PgProtocolConstants.MESSAGE_TYPE_PARAMETER_STATUS: { 231 decodeParameterStatus(inBuffer); 232 break; 233 } 234 case PgProtocolConstants.MESSAGE_TYPE_BACKEND_KEY_DATA: { 235 decodeBackendKeyData(inBuffer); 236 break; 237 } 238 case PgProtocolConstants.MESSAGE_TYPE_NOTIFICATION_RESPONSE: { 239 decodeNotificationResponse(inBuffer); 240 break; 241 } 242 default: { 243 throw new UnsupportedOperationException(); 244 } 245 } 246 } 247 248 private void decodePortalSuspended() { 249 inflight.front().handlePortalSuspended(); 250 } 251 252 private void decodeCommandComplete(ByteBuf inBuffer) { 253 int updated = processor.parse(inBuffer); 254 inflight.front().handleCommandComplete(updated); 255 } 256 257 private void decodeDataRow(ByteBuf inBuffer) { 258 CommandCodecBase codec = inflight.front(); 259 version(HUNT_DB_DEBUG_MORE) tracef("decoding data row: %s", typeid(codec)); 260 int len = inBuffer.readUnsignedShort(); 261 codec.decodeRow(len, inBuffer); 262 } 263 264 private void decodeRowDescription(ByteBuf inBuffer) { 265 PgColumnDesc[] columns = new PgColumnDesc[inBuffer.readUnsignedShort()]; 266 for (size_t c = 0; c < columns.length; ++c) { 267 string fieldName = Util.readCStringUTF8(inBuffer); 268 int tableOID = inBuffer.readInt(); 269 short columnAttributeNumber = inBuffer.readShort(); 270 int typeOID = inBuffer.readInt(); 271 short typeSize = inBuffer.readShort(); 272 int typeModifier = inBuffer.readInt(); 273 int textOrBinary = inBuffer.readUnsignedShort(); // Useless for now 274 PgColumnDesc column = new PgColumnDesc( 275 fieldName, 276 tableOID, 277 columnAttributeNumber, 278 DataTypes.valueOf(typeOID), 279 typeSize, 280 typeModifier, 281 cast(DataFormat)(textOrBinary) 282 ); 283 columns[c] = column; 284 } 285 PgRowDesc rowDesc = new PgRowDesc(columns); 286 inflight.front().handleRowDescription(rowDesc); 287 } 288 289 private enum byte I = 'I'; 290 private enum byte T = 'T'; 291 292 private void decodeReadyForQuery(ByteBuf inBuffer) { 293 byte id = inBuffer.readByte(); 294 TxStatus txStatus; 295 if (id == I) { 296 txStatus = TxStatus.IDLE; 297 } else if (id == T) { 298 txStatus = TxStatus.ACTIVE; 299 } else { 300 txStatus = TxStatus.FAILED; 301 } 302 inflight.front().handleReadyForQuery(txStatus); 303 } 304 305 private void decodeError(ByteBuf inBuffer) { 306 ErrorResponse response = new ErrorResponse(); 307 decodeErrorOrNotice(response, inBuffer); 308 inflight.front().handleErrorResponse(response); 309 } 310 311 private void decodeNotice(ByteBuf inBuffer) { 312 NoticeResponse response = new NoticeResponse(); 313 decodeErrorOrNotice(response, inBuffer); 314 inflight.front().handleNoticeResponse(response); 315 } 316 317 private void decodeErrorOrNotice(Response response, ByteBuf inBuffer) { 318 byte type; 319 while ((type = inBuffer.readByte()) != 0) { 320 switch (type) { 321 322 case PgProtocolConstants.ERROR_OR_NOTICE_SEVERITY: 323 response.setSeverity(Util.readCStringUTF8(inBuffer)); 324 break; 325 326 case PgProtocolConstants.ERROR_OR_NOTICE_CODE: 327 response.setCode(Util.readCStringUTF8(inBuffer)); 328 break; 329 330 case PgProtocolConstants.ERROR_OR_NOTICE_MESSAGE: 331 response.setMessage(Util.readCStringUTF8(inBuffer)); 332 break; 333 334 case PgProtocolConstants.ERROR_OR_NOTICE_DETAIL: 335 response.setDetail(Util.readCStringUTF8(inBuffer)); 336 break; 337 338 case PgProtocolConstants.ERROR_OR_NOTICE_HINT: 339 response.setHint(Util.readCStringUTF8(inBuffer)); 340 break; 341 342 case PgProtocolConstants.ERROR_OR_NOTICE_INTERNAL_POSITION: 343 response.setInternalPosition(Util.readCStringUTF8(inBuffer)); 344 break; 345 346 case PgProtocolConstants.ERROR_OR_NOTICE_INTERNAL_QUERY: 347 response.setInternalQuery(Util.readCStringUTF8(inBuffer)); 348 break; 349 350 case PgProtocolConstants.ERROR_OR_NOTICE_POSITION: 351 response.setPosition(Util.readCStringUTF8(inBuffer)); 352 break; 353 354 case PgProtocolConstants.ERROR_OR_NOTICE_WHERE: 355 response.setWhere(Util.readCStringUTF8(inBuffer)); 356 break; 357 358 case PgProtocolConstants.ERROR_OR_NOTICE_FILE: 359 response.setFile(Util.readCStringUTF8(inBuffer)); 360 break; 361 362 case PgProtocolConstants.ERROR_OR_NOTICE_LINE: 363 response.setLine(Util.readCStringUTF8(inBuffer)); 364 break; 365 366 case PgProtocolConstants.ERROR_OR_NOTICE_ROUTINE: 367 response.setRoutine(Util.readCStringUTF8(inBuffer)); 368 break; 369 370 case PgProtocolConstants.ERROR_OR_NOTICE_SCHEMA: 371 response.setSchema(Util.readCStringUTF8(inBuffer)); 372 break; 373 374 case PgProtocolConstants.ERROR_OR_NOTICE_TABLE: 375 response.setTable(Util.readCStringUTF8(inBuffer)); 376 break; 377 378 case PgProtocolConstants.ERROR_OR_NOTICE_COLUMN: 379 response.setColumn(Util.readCStringUTF8(inBuffer)); 380 break; 381 382 case PgProtocolConstants.ERROR_OR_NOTICE_DATA_TYPE: 383 response.setDataType(Util.readCStringUTF8(inBuffer)); 384 break; 385 386 case PgProtocolConstants.ERROR_OR_NOTICE_CONSTRAINT: 387 response.setConstraint(Util.readCStringUTF8(inBuffer)); 388 break; 389 390 default: 391 Util.readCStringUTF8(inBuffer); 392 break; 393 } 394 } 395 } 396 397 private void decodeAuthentication(ByteBuf inBuffer) { 398 399 if(inflight.empty()) { 400 warning("inflight is empty"); 401 return; 402 } 403 404 int type = inBuffer.readInt(); 405 version(HUNT_DB_DEBUG_MORE) tracef("type=%d", type); 406 407 CommandCodecBase cmdCodec = inflight.front(); 408 409 switch (type) { 410 case PgProtocolConstants.AUTHENTICATION_TYPE_OK: { 411 cmdCodec.handleAuthenticationOk(); 412 } 413 break; 414 case PgProtocolConstants.AUTHENTICATION_TYPE_MD5_PASSWORD: { 415 byte[] salt = new byte[4]; 416 inBuffer.readBytes(salt); 417 cmdCodec.handleAuthenticationMD5Password(salt); 418 } 419 break; 420 case PgProtocolConstants.AUTHENTICATION_TYPE_CLEARTEXT_PASSWORD: { 421 cmdCodec.handleAuthenticationClearTextPassword(); 422 } 423 break; 424 case PgProtocolConstants.AUTHENTICATION_TYPE_KERBEROS_V5: 425 case PgProtocolConstants.AUTHENTICATION_TYPE_SCM_CREDENTIAL: 426 case PgProtocolConstants.AUTHENTICATION_TYPE_GSS: 427 case PgProtocolConstants.AUTHENTICATION_TYPE_GSS_CONTINUE: 428 case PgProtocolConstants.AUTHENTICATION_TYPE_SSPI: 429 default: 430 throw new UnsupportedOperationException("Authentication type " ~ 431 type.to!string() ~ " is not supported inBuffer the client"); 432 } 433 } 434 435 private void decodeParseComplete() { 436 inflight.front().handleParseComplete(); 437 } 438 439 private void decodeBindComplete() { 440 inflight.front().handleBindComplete(); 441 } 442 443 private void decodeCloseComplete() { 444 inflight.front().handleCloseComplete(); 445 } 446 447 private void decodeNoData() { 448 inflight.front().handleNoData(); 449 } 450 451 private void decodeParameterDescription(ByteBuf inBuffer) { 452 DataTypeDesc[] paramDataTypes = new DataTypeDesc[inBuffer.readUnsignedShort()]; 453 for (int c = 0; c < paramDataTypes.length; ++c) { 454 paramDataTypes[c] = DataTypes.valueOf(inBuffer.readInt()); 455 } 456 inflight.front().handleParameterDescription(new PgParamDesc(paramDataTypes)); 457 } 458 459 private void decodeParameterStatus(ByteBuf inBuffer) { 460 string key = Util.readCStringUTF8(inBuffer); 461 string value = Util.readCStringUTF8(inBuffer); 462 inflight.front().handleParameterStatus(key, value); 463 } 464 465 private void decodeEmptyQueryResponse() { 466 inflight.front().handleEmptyQueryResponse(); 467 } 468 469 private void decodeBackendKeyData(ByteBuf inBuffer) { 470 int processId = inBuffer.readInt(); 471 int secretKey = inBuffer.readInt(); 472 inflight.front().handleBackendKeyData(processId, secretKey); 473 } 474 475 private void decodeNotificationResponse(ByteBuf inBuffer) { // ChannelHandlerContext ctx, 476 implementationMissing(false); 477 // ctx.fireChannelRead(new Notification(inBuffer.readInt(), Util.readCStringUTF8(inBuffer), Util.readCStringUTF8(inBuffer))); 478 } 479 } 480 481 482 483 static class CommandCompleteProcessor : ByteProcessor { 484 private enum byte SPACE = 32; 485 private int rows; 486 bool afterSpace; 487 488 int parse(ByteBuf inBuffer) { 489 afterSpace = false; 490 rows = 0; 491 inBuffer.forEachByte(inBuffer.readerIndex(), inBuffer.readableBytes() - 1, this); 492 return rows; 493 } 494 495 override 496 bool process(byte value) { 497 bool space = value == SPACE; 498 if (afterSpace) { 499 if (space) { 500 rows = 0; 501 } else { 502 rows = rows * 10 + (value - '0'); 503 } 504 } else { 505 afterSpace = space; 506 } 507 return true; 508 } 509 }