From 7fdb5afa71005c19f206dccdb057a7a68ae4216d Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 6 Dec 2024 10:50:40 +0800 Subject: [PATCH] address comments Signed-off-by: Lloyd-Pottiger --- .../Storages/DeltaMerge/File/DMFileReader.cpp | 128 ++++++++++++------ .../Storages/DeltaMerge/File/DMFileReader.h | 10 +- 2 files changed, 92 insertions(+), 46 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 05db41446b3..4e6abd3c41e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -302,6 +302,8 @@ Block DMFileReader::read(const IColumn::Filter * filter) ColumnsWithTypeAndName columns; columns.reserve(read_columns.size()); + + size_t passed_count = filter ? countBytesInFilter(filter->data(), 0, read_rows) : read_rows; for (auto & cd : read_columns) { try @@ -311,16 +313,37 @@ Block DMFileReader::read(const IColumn::Filter * filter) switch (cd.id) { case EXTRA_HANDLE_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, handle_column_clean_read_packs); + col = readExtraColumn( + cd, + start_pack_id, + pack_count, + read_rows, + handle_column_clean_read_packs, + filter, + passed_count); break; case TAG_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, del_column_clean_read_packs); + col = readExtraColumn( + cd, + start_pack_id, + pack_count, + read_rows, + del_column_clean_read_packs, + filter, + passed_count); break; case VERSION_COLUMN_ID: - col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, version_column_clean_read_packs); + col = readExtraColumn( + cd, + start_pack_id, + pack_count, + read_rows, + version_column_clean_read_packs, + filter, + passed_count); break; default: - col = readColumn(cd, start_pack_id, pack_count, read_rows, filter); + col = readColumn(cd, start_pack_id, pack_count, read_rows, filter, passed_count); break; } columns.emplace_back(std::move(col), cd.type, cd.name, cd.id); @@ -386,7 +409,9 @@ ColumnPtr DMFileReader::readExtraColumn( size_t start_pack_id, size_t pack_count, size_t read_rows, - const std::vector & clean_read_packs) + const std::vector & clean_read_packs, + const IColumn::Filter * filter, + size_t passed_count) { assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID); @@ -421,7 +446,7 @@ ColumnPtr DMFileReader::readExtraColumn( } case ColumnCache::Strategy::Disk: { - src_col = readColumn(cd, range.first, range.second - range.first, rows_count, nullptr); + src_col = readColumn(cd, range.first, range.second - range.first, rows_count, nullptr, rows_count); break; } default: @@ -441,6 +466,9 @@ ColumnPtr DMFileReader::readExtraColumn( column->insertRangeFrom(*src_col, 0, src_col->size()); } } + + if (filter) + return column->filter(*filter, passed_count); return column; } @@ -449,58 +477,69 @@ ColumnPtr DMFileReader::readColumn( size_t start_pack_id, size_t pack_count, size_t read_rows, - const IColumn::Filter * filter) + const IColumn::Filter * filter, + size_t passed_count) { // New column after ddl is not exist in this DMFile, fill with default value if (!column_streams.contains(DMFile::getFileNameBase(cd.id))) return createColumnWithDefaultValue(cd, read_rows); auto type_on_disk = dmfile->getColumnStat(cd.id).type; + ColumnPtr column; // try read PK column from ColumnCacheLongTerm if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd)) { // ColumnCacheLongTerm only caches user assigned PrimaryKey column. - auto column_all_data = column_cache_long_term->get( - dmfile->parentPath(), - dmfile->fileId(), - cd.id, - [&]() -> IColumn::Ptr { - // Always read all packs when filling cache, and always set filter to nullptr - return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), dmfile->getRows(), nullptr); - }); - - auto column = type_on_disk->createColumn(); - column->insertRangeFrom(*column_all_data, pack_id_to_offset[start_pack_id], read_rows); - return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); + auto column_all_data + = column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr { + // Always read all packs when filling cache, and always set filter to nullptr + auto rows = dmfile->getRows(); + return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), rows, nullptr, rows); + }); + + auto mut_column = type_on_disk->createColumn(); + mut_column->insertRangeFrom(*column_all_data, pack_id_to_offset[start_pack_id], read_rows); + column = std::move(mut_column); } - // Not cached - if (!enable_column_cache || !isCacheableColumn(cd)) + else if (!enable_column_cache || !isCacheableColumn(cd)) { - auto column = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, filter); + column + = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, filter, passed_count); return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); } - // enable_column_cache && isCacheableColumn(cd) - - // try to get column from cache - auto column = getColumnFromCache( - column_cache, - cd, - type_on_disk, - start_pack_id, - pack_count, - read_rows, - [&](const ColumnDefine & cd, - const DataTypePtr & type_on_disk, - size_t start_pack_id, - size_t pack_count, - size_t read_rows) { - // read from cache, always set filter to nullptr - return readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, nullptr); - }); - // add column to cache - addColumnToCache(column_cache, cd.id, start_pack_id, pack_count, column); + else + { + // try to get column from cache + column = getColumnFromCache( + column_cache, + cd, + type_on_disk, + start_pack_id, + pack_count, + read_rows, + [&](const ColumnDefine & cd, + const DataTypePtr & type_on_disk, + size_t start_pack_id, + size_t pack_count, + size_t read_rows) { + // read from cache, always set filter to nullptr + return readFromDiskOrSharingCache( + cd, + type_on_disk, + start_pack_id, + pack_count, + read_rows, + nullptr, + read_rows); + }); + // add column to cache + addColumnToCache(column_cache, cd.id, start_pack_id, pack_count, column); + } + // filter column + if (filter) + column = column->filter(*filter, passed_count); // Cast column's data from DataType in disk to what we need now return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd); } @@ -548,7 +587,8 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( size_t start_pack_id, size_t pack_count, size_t read_rows, - const IColumn::Filter * filter) + const IColumn::Filter * filter, + size_t passed_count) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr @@ -584,6 +624,8 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( DMFileReaderPool::instance().set(*this, cd.id, start_pack_id, pack_count, column); // Delete column from local cache since it is not used anymore. data_sharing_col_data_cache->delColumn(cd.id, std::get<0>(read_block_infos.front())); + if (filter) + column = column->filter(*filter, passed_count); return column; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 9b32e320d19..4a3e4d8952d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -105,7 +105,9 @@ class DMFileReader size_t start_pack_id, size_t pack_count, size_t read_rows, - const std::vector & clean_read_packs); + const std::vector & clean_read_packs, + const IColumn::Filter * filter, + size_t passed_count); ColumnPtr readFromDisk( const ColumnDefine & cd, const DataTypePtr & type_on_disk, @@ -118,13 +120,15 @@ class DMFileReader size_t start_pack_id, size_t pack_count, size_t read_rows, - const IColumn::Filter * filter); + const IColumn::Filter * filter, + size_t passed_count); ColumnPtr readColumn( const ColumnDefine & cd, size_t start_pack_id, size_t pack_count, size_t read_rows, - const IColumn::Filter * filter); + const IColumn::Filter * filter, + size_t passed_count); ColumnPtr cleanRead( const ColumnDefine & cd, size_t rows_count,