diff --git a/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp index c3be870c7b9..e3ece5006fd 100644 --- a/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp @@ -71,6 +71,8 @@ Block LateMaterializationBlockInputStream::read() } for (auto & col : filter_column_block) { + if (col.name == filter_column_name) + continue; col.column = col.column->filter(col_filter, passed_count); } } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index e55a904df69..39db4bf7417 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -4092,6 +4092,131 @@ try check(filter_none, RSResult::Some, filter_none_data); } CATCH + +TEST_F(DeltaMergeStoreTest, LMAllWithMultiVersionRecords) +try +{ + auto log = Logger::get(GET_GTEST_FULL_NAME); + auto table_column_defines = DMTestEnv::getDefaultColumns(); + ColumnDefine cd_time(1, "col_time", std::make_shared()); + ColumnDefine cd_int(2, "col_int", std::make_shared()); + table_column_defines->push_back(cd_time); + table_column_defines->push_back(cd_int); + + store = reload(table_column_defines); + + auto create_data = [&](Int64 start, Int64 limit) { + std::vector v(limit, 0); + std::iota(v.begin(), v.end(), start); // start ... start + limit - 1 + return v; + }; + + auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) { + auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts); + auto data = create_data(0, end - beg); + block.insert(createColumn(data, cd_time.name, cd_time.id)); + block.insert(createColumn(data, cd_int.name, cd_int.id)); + block.checkNumberOfRows(); + return block; + }; + + auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector & expected_data) { + auto in = store->read( + *db_context, + db_context->getSettingsRef(), + store->getTableColumns(), + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* start_ts= */ std::numeric_limits::max(), + filter, + std::vector{}, + 0, + "", + /* keep_order= */ false, + /* is_fast_scan= */ false, + /* expected_block_size= */ 1024)[0]; + + Int64 rows = 0; + in->readPrefix(); + while (true) + { + auto b = in->read(); + if (!b) + break; + rows += b.rows(); + ASSERT_EQ(b.getRSResult(), expected_res) << fmt::format("{} vs {}", b.getRSResult(), expected_res); + const auto * v = toColumnVectorDataPtr(b.getByName("col_time").column); + ASSERT_NE(v, nullptr); + ASSERT_EQ(v->size(), expected_data.size()); + ASSERT_TRUE(std::equal(v->begin(), v->end(), expected_data.begin())) + << fmt::format("{} vs {}", *v, expected_data); + } + in->readSuffix(); + ASSERT_EQ(rows, expected_data.size()); + }; + + const String table_info_json = R"json({ + "cols":[ + {"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"col_time","O":"col_time"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":11}}, + {"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"col_int","O":"col_int"},"offset":-1,"origin_default":null,"state":0,"type":{"Charset":null,"Collate":null,"Decimal":5,"Elems":null,"Flag":1,"Flen":0,"Tp":8}} + ], + "pk_is_handle":false,"index_info":[],"is_common_handle":false, + "name":{"L":"t_111","O":"t_111"},"partition":null, + "comment":"Mocked.","id":30,"schema_version":-1,"state":0,"tiflash_replica":{"Count":0},"update_timestamp":1636471547239654 +})json"; + + auto create_filter = [&](Int64 value) { + auto filter = generatePushDownFilter( + *db_context, + table_info_json, + fmt::format("select * from default.t_111 where col_time >= {}", value)); + RUNTIME_CHECK(filter->extra_cast != nullptr); + RUNTIME_CHECK(filter->rs_operator != nullptr); + auto rs_unsupported = typeid_cast(filter->rs_operator.get()); + RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString()); + RUNTIME_CHECK(filter->before_where != nullptr); + LOG_DEBUG( + log, + "value={} extra_cast={} rs_operator={} before_where={}", + value, + filter->extra_cast->dumpActions(), + filter->rs_operator->toDebugString(), + filter->before_where->dumpActions()); + return filter; + }; + + DB::registerFunctions(); + + constexpr Int64 num_rows = 128; + auto filter_all = create_filter(0); + auto filter_all_data = create_data(0, num_rows); + + // Write multi-version records. + { + auto block = create_block(0, num_rows, 1); + store->write(*db_context, db_context->getSettingsRef(), block); + } + { + auto block = create_block(0, num_rows, 2); + store->write(*db_context, db_context->getSettingsRef(), block); + } + store->mergeDeltaAll(*db_context); + + // Ensure multi-version records. + ASSERT_EQ(store->id_to_segment.size(), 1); + auto seg = store->id_to_segment.begin()->second; + seg->stable->calculateStableProperty( + *store->newDMContext(*db_context, db_context->getSettingsRef()), + RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), + store->isCommonHandle()); + const auto & property = seg->stable->getStableProperty(); + ASSERT_EQ(property.num_versions, num_rows * 2); + ASSERT_EQ(property.num_puts, num_rows); + + check(filter_all, RSResult::All, filter_all_data); +} +CATCH + } // namespace tests } // namespace DM } // namespace DB