Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Dec 6, 2024
1 parent e252f97 commit 7fdb5af
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 46 deletions.
128 changes: 85 additions & 43 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ Block DMFileReader::read(const IColumn::Filter * filter)

ColumnsWithTypeAndName columns;
columns.reserve(read_columns.size());

size_t passed_count = filter ? countBytesInFilter(filter->data(), 0, read_rows) : read_rows;
for (auto & cd : read_columns)
{
try
Expand All @@ -311,16 +313,37 @@ Block DMFileReader::read(const IColumn::Filter * filter)
switch (cd.id)
{
case EXTRA_HANDLE_COLUMN_ID:
col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, handle_column_clean_read_packs);
col = readExtraColumn(
cd,
start_pack_id,
pack_count,
read_rows,
handle_column_clean_read_packs,
filter,
passed_count);
break;
case TAG_COLUMN_ID:
col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, del_column_clean_read_packs);
col = readExtraColumn(
cd,
start_pack_id,
pack_count,
read_rows,
del_column_clean_read_packs,
filter,
passed_count);
break;
case VERSION_COLUMN_ID:
col = readExtraColumn(cd, start_pack_id, pack_count, read_rows, version_column_clean_read_packs);
col = readExtraColumn(
cd,
start_pack_id,
pack_count,
read_rows,
version_column_clean_read_packs,
filter,
passed_count);
break;
default:
col = readColumn(cd, start_pack_id, pack_count, read_rows, filter);
col = readColumn(cd, start_pack_id, pack_count, read_rows, filter, passed_count);
break;
}
columns.emplace_back(std::move(col), cd.type, cd.name, cd.id);
Expand Down Expand Up @@ -386,7 +409,9 @@ ColumnPtr DMFileReader::readExtraColumn(
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const std::vector<size_t> & clean_read_packs)
const std::vector<size_t> & clean_read_packs,
const IColumn::Filter * filter,
size_t passed_count)
{
assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID);

Expand Down Expand Up @@ -421,7 +446,7 @@ ColumnPtr DMFileReader::readExtraColumn(
}
case ColumnCache::Strategy::Disk:
{
src_col = readColumn(cd, range.first, range.second - range.first, rows_count, nullptr);
src_col = readColumn(cd, range.first, range.second - range.first, rows_count, nullptr, rows_count);
break;
}
default:
Expand All @@ -441,6 +466,9 @@ ColumnPtr DMFileReader::readExtraColumn(
column->insertRangeFrom(*src_col, 0, src_col->size());
}
}

if (filter)
return column->filter(*filter, passed_count);
return column;
}

Expand All @@ -449,58 +477,69 @@ ColumnPtr DMFileReader::readColumn(
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const IColumn::Filter * filter)
const IColumn::Filter * filter,
size_t passed_count)
{
// New column after ddl is not exist in this DMFile, fill with default value
if (!column_streams.contains(DMFile::getFileNameBase(cd.id)))
return createColumnWithDefaultValue(cd, read_rows);

auto type_on_disk = dmfile->getColumnStat(cd.id).type;
ColumnPtr column;
// try read PK column from ColumnCacheLongTerm
if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd))
{
// ColumnCacheLongTerm only caches user assigned PrimaryKey column.
auto column_all_data = column_cache_long_term->get(
dmfile->parentPath(),
dmfile->fileId(),
cd.id,
[&]() -> IColumn::Ptr {
// Always read all packs when filling cache, and always set filter to nullptr
return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), dmfile->getRows(), nullptr);
});

auto column = type_on_disk->createColumn();
column->insertRangeFrom(*column_all_data, pack_id_to_offset[start_pack_id], read_rows);
return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd);
auto column_all_data
= column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr {
// Always read all packs when filling cache, and always set filter to nullptr
auto rows = dmfile->getRows();
return readFromDiskOrSharingCache(cd, type_on_disk, 0, dmfile->getPacks(), rows, nullptr, rows);
});

auto mut_column = type_on_disk->createColumn();
mut_column->insertRangeFrom(*column_all_data, pack_id_to_offset[start_pack_id], read_rows);
column = std::move(mut_column);
}

// Not cached
if (!enable_column_cache || !isCacheableColumn(cd))
else if (!enable_column_cache || !isCacheableColumn(cd))
{
auto column = readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, filter);
column
= readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, filter, passed_count);
return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd);
}

// enable_column_cache && isCacheableColumn(cd)

// try to get column from cache
auto column = getColumnFromCache(
column_cache,
cd,
type_on_disk,
start_pack_id,
pack_count,
read_rows,
[&](const ColumnDefine & cd,
const DataTypePtr & type_on_disk,
size_t start_pack_id,
size_t pack_count,
size_t read_rows) {
// read from cache, always set filter to nullptr
return readFromDiskOrSharingCache(cd, type_on_disk, start_pack_id, pack_count, read_rows, nullptr);
});
// add column to cache
addColumnToCache(column_cache, cd.id, start_pack_id, pack_count, column);
else
{
// try to get column from cache
column = getColumnFromCache(
column_cache,
cd,
type_on_disk,
start_pack_id,
pack_count,
read_rows,
[&](const ColumnDefine & cd,
const DataTypePtr & type_on_disk,
size_t start_pack_id,
size_t pack_count,
size_t read_rows) {
// read from cache, always set filter to nullptr
return readFromDiskOrSharingCache(
cd,
type_on_disk,
start_pack_id,
pack_count,
read_rows,
nullptr,
read_rows);
});
// add column to cache
addColumnToCache(column_cache, cd.id, start_pack_id, pack_count, column);
}
// filter column
if (filter)
column = column->filter(*filter, passed_count);
// Cast column's data from DataType in disk to what we need now
return convertColumnByColumnDefineIfNeed(type_on_disk, std::move(column), cd);
}
Expand Down Expand Up @@ -548,7 +587,8 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache(
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const IColumn::Filter * filter)
const IColumn::Filter * filter,
size_t passed_count)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr
Expand Down Expand Up @@ -584,6 +624,8 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache(
DMFileReaderPool::instance().set(*this, cd.id, start_pack_id, pack_count, column);
// Delete column from local cache since it is not used anymore.
data_sharing_col_data_cache->delColumn(cd.id, std::get<0>(read_block_infos.front()));
if (filter)
column = column->filter(*filter, passed_count);
return column;
}

Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ class DMFileReader
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const std::vector<size_t> & clean_read_packs);
const std::vector<size_t> & clean_read_packs,
const IColumn::Filter * filter,
size_t passed_count);
ColumnPtr readFromDisk(
const ColumnDefine & cd,
const DataTypePtr & type_on_disk,
Expand All @@ -118,13 +120,15 @@ class DMFileReader
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const IColumn::Filter * filter);
const IColumn::Filter * filter,
size_t passed_count);
ColumnPtr readColumn(
const ColumnDefine & cd,
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
const IColumn::Filter * filter);
const IColumn::Filter * filter,
size_t passed_count);
ColumnPtr cleanRead(
const ColumnDefine & cd,
size_t rows_count,
Expand Down

0 comments on commit 7fdb5af

Please sign in to comment.