Skip to content

Commit

Permalink
[yugabyte#15154] CDCSDK: Before image failure with packed row
Browse files Browse the repository at this point in the history
Summary:
With packed row enabled, if the current schema version is different than before the image schema version, DocDB will fail to decode the packed row.

For example:-
1. Enable packed row.
2. Create a table with 2 columns. So the schema version is V1.
3. No insert one record into the table.
4. Do ALTER of the table by adding a new column, so the total columns in the table are 3. Here the schema version is V2.
5. Now update the record that is inserted in step 2.

Now call GetChanges with the before image enabled. Today the scenario is failing because the before image is in V1 version and the schema version CDC provides to read the before image is V2.

During analysis, we found that CDC uses// docdb::DocRowwiseIterator //with the corresponding schema to read rows from DocDB.  Method //DocRowwiseIterator::HasNext// is used to get the next row from DocDB, which internally calls the Prepare() method of doc_reader to check the row type it reads from DocDB. If the type of row is ValueEntryType::kPackedRow it checks the packed row’s schema version vs the input schema version, if there is a mismatch it throws an error.

To handle this we will use the current running schema version to fetch the before image row from the DocDb, if it fails to get the row, CDC will ignore adding the before image record for the corresponding change records.

Test Plan: Running all the before image unit testcases with packed row enabled

Reviewers: skumar, srangavajjula, abharadwaj

Reviewed By: abharadwaj

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D21434
  • Loading branch information
sureshdash2022-yb committed Dec 1, 2022
1 parent 644709b commit 524fd98
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
27 changes: 12 additions & 15 deletions ent/src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ Status PopulateBeforeImage(
const std::shared_ptr<tablet::TabletPeer>& tablet_peer, const ReadHybridTime& read_time,
RowMessage* row_message, const EnumOidLabelMap& enum_oid_label_map,
const CompositeAttsMap& composite_atts_map, const docdb::SubDocKey& decoded_primary_key,
const Schema& schema, const SchemaVersion schema_version) {
const Schema& schema, const SchemaVersion schema_version) {
auto tablet = tablet_peer->shared_tablet();
auto docdb = tablet->doc_db();

Expand All @@ -220,7 +220,10 @@ Status PopulateBeforeImage(

QLTableRow row;
QLValue ql_value;
if (VERIFY_RESULT(iter.HasNext())) RETURN_NOT_OK(iter.NextRow(&row));
// If CDC is failed to get the before image row, skip adding before image columns.
if (iter.HasNext().ok()) {
RETURN_NOT_OK(iter.NextRow(&row));
}

std::vector<ColumnSchema> columns(schema.columns());

Expand Down Expand Up @@ -388,7 +391,7 @@ Status PopulateCDCSDKIntentRecord(
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message,
enum_oid_label_map, composite_atts_map, prev_decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
schema_version));
} else {
for (size_t index = 0; index < schema.num_columns(); ++index) {
row_message->add_old_tuple();
Expand Down Expand Up @@ -451,8 +454,7 @@ Status PopulateCDCSDKIntentRecord(
auto hybrid_time = commit_time - 1;
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, enum_oid_label_map,
composite_atts_map, decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
composite_atts_map, decoded_key, schema, schema_version));
}

if (row_message->old_tuple_size() == 0) {
Expand Down Expand Up @@ -530,8 +532,7 @@ Status PopulateCDCSDKIntentRecord(
auto hybrid_time = commit_time - 1;
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message,
enum_oid_label_map, composite_atts_map, decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
enum_oid_label_map, composite_atts_map, decoded_key, schema, schema_version));
} else {
for (size_t index = 0; index < schema.num_columns(); ++index) {
row_message->add_old_tuple();
Expand All @@ -555,8 +556,7 @@ Status PopulateCDCSDKIntentRecord(
auto hybrid_time = commit_time - 1;
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(hybrid_time), row_message, enum_oid_label_map,
composite_atts_map, prev_decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
composite_atts_map, prev_decoded_key, schema, schema_version));
} else {
for (size_t index = 0; index < schema.num_columns(); ++index) {
row_message->add_old_tuple();
Expand Down Expand Up @@ -637,8 +637,7 @@ Status PopulateCDCSDKWriteRecord(
if (metadata.record_type == cdc::CDCRecordType::ALL) {
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message,
enum_oid_label_map, composite_atts_map, prev_decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, schema_version));
} else {
for (int new_tuple_index = 0; new_tuple_index < row_message->new_tuple_size();
++new_tuple_index) {
Expand Down Expand Up @@ -677,8 +676,7 @@ Status PopulateCDCSDKWriteRecord(
(row_message->op() == RowMessage_Op_DELETE)) {
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message,
enum_oid_label_map, composite_atts_map, decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
enum_oid_label_map, composite_atts_map, decoded_key, schema, schema_version));

if (row_message->old_tuple_size() == 0) {
RETURN_NOT_OK(AddPrimaryKey(
Expand Down Expand Up @@ -732,8 +730,7 @@ Status PopulateCDCSDKWriteRecord(
if (metadata.record_type == cdc::CDCRecordType::ALL) {
RETURN_NOT_OK(PopulateBeforeImage(
tablet_peer, ReadHybridTime::FromUint64(msg->hybrid_time() - 1), row_message,
enum_oid_label_map, composite_atts_map, prev_decoded_key, schema,
tablet_peer->tablet()->metadata()->schema_version()));
enum_oid_label_map, composite_atts_map, prev_decoded_key, schema, schema_version));
} else {
for (int index = 0; index < row_message->new_tuple_size(); ++index) {
row_message->add_old_tuple();
Expand Down
16 changes: 13 additions & 3 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2847,29 +2847,39 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestSchemaChangeBeforeImage)) {

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {2, 2, 5, 0, 0, 0};
const uint32_t expected_count_packed_row[] = {2, 3, 4, 0, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};

ExpectedRecordWithThreeColumns expected_records[] = {
{0, 0, 0}, {1, 2, INT_MAX}, {1, 3, INT_MAX}, {0, 0, INT_MAX}, {1, 4, INT_MAX},
{4, 5, 6}, {1, 99, INT_MAX}, {4, 99, 6}, {4, 99, 66}};
ExpectedRecordWithThreeColumns expected_before_image_records[] = {
{}, {}, {1, 2, INT_MAX}, {}, {1, 3, INT_MAX}, {}, {1, 4, INT_MAX}, {4, 5, 6}, {4, 99, 6}};
ExpectedRecordWithThreeColumns expected_before_image_records_with_packed_row[] = {
{}, {}, {0, 0, 0}, {}, {0, 0, 0}, {}, {0, 0, 0}, {4, 5, 6}, {4, 99, 6}};

GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets));

// If the packed row is enabled and there are multiple tables altered, if CDC fail to get before
// image row with the current running schema version, then it will ignore the before image tuples.
uint32_t record_size = change_resp.cdc_sdk_proto_records_size();
for (uint32_t i = 0; i < record_size; ++i) {
const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i);
if (i <= 6) {
CheckRecordWithThreeColumns(
record, expected_records[i], count, true, expected_before_image_records[i]);
record, expected_records[i], count, true,
FLAGS_ysql_enable_packed_row ? expected_before_image_records_with_packed_row[i]
: expected_before_image_records[i]);
} else {
CheckRecordWithThreeColumns(
record, expected_records[i], count, true, expected_before_image_records[i], true);
record, expected_records[i], count, true,
FLAGS_ysql_enable_packed_row ? expected_before_image_records_with_packed_row[i]
: expected_before_image_records[i],
true);
}
}
LOG(INFO) << "Got " << count[1] << " insert record and " << count[2] << " update record";
CheckCount(expected_count, count);
CheckCount(FLAGS_ysql_enable_packed_row ? expected_count_packed_row : expected_count, count);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBeforeImageRetention)) {
Expand Down

0 comments on commit 524fd98

Please sign in to comment.