From 524fd98111f68596700ee87264576733409e07e4 Mon Sep 17 00:00:00 2001 From: sureshdash2022-yb Date: Thu, 1 Dec 2022 10:49:39 +0530 Subject: [PATCH] [#15154] CDCSDK: Before image failure with packed row MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: With packed row enabled, if the current schema version is different than before the image schema version, DocDB will fail to decode the packed row. For example:- 1. Enable packed row. 2. Create a table with 2 columns. So the schema version is V1. 3. No insert one record into the table. 4. Do ALTER of the table by adding a new column, so the total columns in the table are 3. Here the schema version is V2. 5. Now update the record that is inserted in step 2. Now call GetChanges with the before image enabled. Today the scenario is failing because the before image is in V1 version and the schema version CDC provides to read the before image is V2. During analysis, we found that CDC uses// docdb::DocRowwiseIterator //with the corresponding schema to read rows from DocDB. Method //DocRowwiseIterator::HasNext// is used to get the next row from DocDB, which internally calls the Prepare() method of doc_reader to check the row type it reads from DocDB. If the type of row is ValueEntryType::kPackedRow it checks the packed row’s schema version vs the input schema version, if there is a mismatch it throws an error. To handle this we will use the current running schema version to fetch the before image row from the DocDb, if it fails to get the row, CDC will ignore adding the before image record for the corresponding change records. Test Plan: Running all the before image unit testcases with packed row enabled Reviewers: skumar, srangavajjula, abharadwaj Reviewed By: abharadwaj Subscribers: ycdcxcluster Differential Revision: https://phabricator.dev.yugabyte.com/D21434 --- ent/src/yb/cdc/cdcsdk_producer.cc | 27 +++++++++---------- .../yb/integration-tests/cdcsdk_ysql-test.cc | 16 ++++++++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/ent/src/yb/cdc/cdcsdk_producer.cc b/ent/src/yb/cdc/cdcsdk_producer.cc index b84b56e8a73e..cc18825a82bd 100644 --- a/ent/src/yb/cdc/cdcsdk_producer.cc +++ b/ent/src/yb/cdc/cdcsdk_producer.cc @@ -204,7 +204,7 @@ Status PopulateBeforeImage( const std::shared_ptr& tablet_peer, const ReadHybridTime& read_time, RowMessage* row_message, const EnumOidLabelMap& enum_oid_label_map, const CompositeAttsMap& composite_atts_map, const docdb::SubDocKey& decoded_primary_key, - const Schema& schema, const SchemaVersion schema_version) { + const Schema& schema, const SchemaVersion schema_version) { auto tablet = tablet_peer->shared_tablet(); auto docdb = tablet->doc_db(); @@ -220,7 +220,10 @@ Status PopulateBeforeImage( QLTableRow row; QLValue ql_value; - if (VERIFY_RESULT(iter.HasNext())) RETURN_NOT_OK(iter.NextRow(&row)); + // If CDC is failed to get the before image row, skip adding before image columns. + if (iter.HasNext().ok()) { + RETURN_NOT_OK(iter.NextRow(&row)); + } std::vector columns(schema.columns()); @@ -388,7 +391,7 @@ Status PopulateCDCSDKIntentRecord( RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + schema_version)); } else { for (size_t index = 0; index < schema.num_columns(); ++index) { row_message->add_old_tuple(); @@ -451,8 +454,7 @@ Status PopulateCDCSDKIntentRecord( auto hybrid_time = commit_time - 1; RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, enum_oid_label_map, - composite_atts_map, decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + composite_atts_map, decoded_key, schema, schema_version)); } if (row_message->old_tuple_size() == 0) { @@ -530,8 +532,7 @@ Status PopulateCDCSDKIntentRecord( auto hybrid_time = commit_time - 1; RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, - enum_oid_label_map, composite_atts_map, decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + enum_oid_label_map, composite_atts_map, decoded_key, schema, schema_version)); } else { for (size_t index = 0; index < schema.num_columns(); ++index) { row_message->add_old_tuple(); @@ -555,8 +556,7 @@ Status PopulateCDCSDKIntentRecord( auto hybrid_time = commit_time - 1; RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, enum_oid_label_map, - composite_atts_map, prev_decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + composite_atts_map, prev_decoded_key, schema, schema_version)); } else { for (size_t index = 0; index < schema.num_columns(); ++index) { row_message->add_old_tuple(); @@ -637,8 +637,7 @@ Status PopulateCDCSDKWriteRecord( if (metadata.record_type == cdc::CDCRecordType::ALL) { RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message, - enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, schema_version)); } else { for (int new_tuple_index = 0; new_tuple_index < row_message->new_tuple_size(); ++new_tuple_index) { @@ -677,8 +676,7 @@ Status PopulateCDCSDKWriteRecord( (row_message->op() == RowMessage_Op_DELETE)) { RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message, - enum_oid_label_map, composite_atts_map, decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + enum_oid_label_map, composite_atts_map, decoded_key, schema, schema_version)); if (row_message->old_tuple_size() == 0) { RETURN_NOT_OK(AddPrimaryKey( @@ -732,8 +730,7 @@ Status PopulateCDCSDKWriteRecord( if (metadata.record_type == cdc::CDCRecordType::ALL) { RETURN_NOT_OK(PopulateBeforeImage( tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message, - enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, - tablet_peer->tablet()->metadata()->schema_version())); + enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, schema_version)); } else { for (int index = 0; index < row_message->new_tuple_size(); ++index) { row_message->add_old_tuple(); diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 5459edd8b2d1..5745bbd0c501 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -2847,6 +2847,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestSchemaChangeBeforeImage)) { // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. const uint32_t expected_count[] = {2, 2, 5, 0, 0, 0}; + const uint32_t expected_count_packed_row[] = {2, 3, 4, 0, 0, 0}; uint32_t count[] = {0, 0, 0, 0, 0, 0}; ExpectedRecordWithThreeColumns expected_records[] = { @@ -2854,22 +2855,31 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestSchemaChangeBeforeImage)) { {4, 5, 6}, {1, 99, INT_MAX}, {4, 99, 6}, {4, 99, 66}}; ExpectedRecordWithThreeColumns expected_before_image_records[] = { {}, {}, {1, 2, INT_MAX}, {}, {1, 3, INT_MAX}, {}, {1, 4, INT_MAX}, {4, 5, 6}, {4, 99, 6}}; + ExpectedRecordWithThreeColumns expected_before_image_records_with_packed_row[] = { + {}, {}, {0, 0, 0}, {}, {0, 0, 0}, {}, {0, 0, 0}, {4, 5, 6}, {4, 99, 6}}; GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + // If the packed row is enabled and there are multiple tables altered, if CDC fail to get before + // image row with the current running schema version, then it will ignore the before image tuples. uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); for (uint32_t i = 0; i < record_size; ++i) { const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i); if (i <= 6) { CheckRecordWithThreeColumns( - record, expected_records[i], count, true, expected_before_image_records[i]); + record, expected_records[i], count, true, + FLAGS_ysql_enable_packed_row ? expected_before_image_records_with_packed_row[i] + : expected_before_image_records[i]); } else { CheckRecordWithThreeColumns( - record, expected_records[i], count, true, expected_before_image_records[i], true); + record, expected_records[i], count, true, + FLAGS_ysql_enable_packed_row ? expected_before_image_records_with_packed_row[i] + : expected_before_image_records[i], + true); } } LOG(INFO) << "Got " << count[1] << " insert record and " << count[2] << " update record"; - CheckCount(expected_count, count); + CheckCount(FLAGS_ysql_enable_packed_row ? expected_count_packed_row : expected_count, count); } TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBeforeImageRetention)) {