Skip to content

Commit

Permalink
Make fast scan code mode clean (#6058)
Browse files Browse the repository at this point in the history
ref #5252
  • Loading branch information
hongyunyan authored Oct 13, 2022
1 parent 8614c70 commit 6a9e9ac
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 187 deletions.
34 changes: 6 additions & 28 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const RSOperatorPtr & filter_,
UInt64 max_version_,
size_t expected_block_size_,
bool is_raw_,
bool do_delete_mark_filter_for_raw_,
ReadMode read_mode_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
Expand All @@ -60,8 +59,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, header(toEmptyBlock(columns_to_read))
, max_version(max_version_)
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_)
, read_mode(read_mode_)
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(Logger::get(NAME, req_id))
Expand Down Expand Up @@ -100,29 +98,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "Read done");
return {};
}

cur_segment = task->segment;
if (is_raw)
{
cur_stream = cur_segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
do_delete_mark_filter_for_raw);
}
else
{
cur_stream = cur_segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)));
}

auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
cur_stream = task->segment->getInputStream(read_mode, *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, max_version, block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);
Expand Down Expand Up @@ -172,8 +151,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
Block header;
const UInt64 max_version;
const size_t expected_block_size;
const bool is_raw;
const bool do_delete_mark_filter_for_raw;
const ReadMode read_mode;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;

Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra
}
}

// Read data without mvcc filtering && delete mark != 0 filtering.
// Read data without mvcc filtering.
// just for debug
// readRaw is called under 'selraw xxxx'
BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
Expand Down Expand Up @@ -908,8 +908,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw = */ true,
/* do_delete_mark_filter_for_raw = */ false,
/* read_mode */ ReadMode::Raw,
std::move(tasks),
after_segment_read,
req_info);
Expand Down Expand Up @@ -937,8 +936,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw_ */ true,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
/* read_mode */ ReadMode::Raw,
extra_table_id_index,
physical_table_id,
req_info);
Expand Down Expand Up @@ -992,8 +990,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
std::move(tasks),
after_segment_read,
log_tracing_id);
Expand Down Expand Up @@ -1021,8 +1018,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw_= */ is_fast_scan,
/* do_delete_mark_filter_for_raw_= */ is_fast_scan,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
180 changes: 123 additions & 57 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,37 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo
return std::make_shared<SegmentSnapshot>(std::move(delta_snap), std::move(stable_snap));
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,

BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode,
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
switch (read_mode)
{
case ReadMode::Normal:
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
break;
case ReadMode::Fast:
return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size);
break;
case ReadMode::Raw:
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size);
break;
}
}

BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
LOG_TRACE(log, "Begin segment create input stream");

Expand Down Expand Up @@ -479,17 +503,17 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
return stream;
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
if (!segment_snap)
return {};
return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
}

BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_context,
Expand Down Expand Up @@ -527,64 +551,58 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
return data_stream;
}

/// we will call getInputStreamRaw in two condition:
/// 1. Using 'selraw xxxx' statement, which is always in test for debug. (when filter_delete_mark = false)
/// In this case, we will read all the data without mvcc filtering,
/// del_mark != 0 filtering and sorted merge.
/// We will just read all the data and return.
/// 2. We read in fast mode. (when filter_delete_mark = true)
/// In this case, we will read all the data without mvcc filtering and sorted merge,
/// but we will do del_mark != 0 filtering.
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
bool filter_delete_mark,
size_t expected_block_size)
/// We call getInputStreamModeFast when we read in fast mode.
/// In this case, we will read all the data in delta and stable, and then merge them without sorting.
/// Besides, we will do del_mark != 0 filtering to drop the deleted rows.
/// In conclusion, the output is unsorted, and does not do mvcc filtering.
BlockInputStreamPtr Segment::getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
/// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx`
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
new_columns_to_read->reserve(columns_to_read.size() + 2);

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));
if (filter_delete_mark)
{
new_columns_to_read->push_back(getTagColumnDefine());
}

bool enable_handle_clean_read = filter_delete_mark;
bool enable_del_clean_read = filter_delete_mark;
new_columns_to_read->push_back(getTagColumnDefine());

/// When we read in fast mode, we can try to do the following optimization:
/// 1. Handle Column Optimization:
/// when the columns_to_read does not include HANDLE_COLUMN,
/// we can try to skip reading the handle column if the pack's handle range is fully within read range.
/// Thus, in this case, we set enable_handle_clean_read = true.
/// 2. Del Column Optimization:
/// when the columns_to_read does not include TAG_COLUMN,
/// we can try to skip reading the del column if the pack has no deleted rows.
/// Thus, in this case, we set enable_del_clean_read = true.
/// 3. Version Column Optimization:
/// if the columns_to_read does not include VERSION_COLUMN,
/// we don't need to read version column, thus we don't force push version column into new_columns_to_read.

bool enable_handle_clean_read = true;
bool enable_del_clean_read = true;

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
if (c.id == EXTRA_HANDLE_COLUMN_ID)
{
if (filter_delete_mark && c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
new_columns_to_read->push_back(c);
}
enable_handle_clean_read = false;
}
else if (c.id == TAG_COLUMN_ID)
{
enable_del_clean_read = false;
}
else
{
enable_handle_clean_read = false;
new_columns_to_read->push_back(c);
}
}

/// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID,
/// we can try to use clean read to make optimization in stable part.
/// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column
/// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column
/// others --> we don't need read version_column
/// Thus, in fast mode, if we don't need read handle_column, we set enable_handle_clean_read as true.
/// If we don't need read del_column, we set enable_del_clean_read as true
BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
Expand All @@ -593,25 +611,73 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ enable_handle_clean_read,
/* is_fast_scan */ filter_delete_mark,
/* is_fast_scan */ true,
/* enable_del_clean_read */ enable_del_clean_read);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

if (filter_delete_mark)
// Filter the unneeded column and filter out the rows whose del_mark is true.
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);

BlockInputStreams streams;

if (dm_context.read_delta_only)
{
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
stable_stream = std::make_shared<DMDeleteFilterBlockInputStream>(stable_stream, columns_to_read, dm_context.tracing_id);
streams.push_back(delta_stream);
}
else if (dm_context.read_stable_only)
{
streams.push_back(stable_stream);
}
else
{
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);
streams.push_back(delta_stream);
streams.push_back(stable_stream);
}
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

/// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug.
/// In this case, we will read all the data without mvcc filtering and sorted merge.
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size)
{
auto new_columns_to_read = std::make_shared<ColumnDefines>();

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID)
new_columns_to_read->push_back(c);
}

BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ false);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);

// Filter the unneeded columns.
delta_stream = std::make_shared<DMColumnProjectionBlockInputStream>(delta_stream, columns_to_read);
stable_stream = std::make_shared<DMColumnProjectionBlockInputStream>(stable_stream, columns_to_read);

BlockInputStreams streams;

Expand All @@ -631,12 +697,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, bool filter_delete_mark)
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw);
if (!segment_snap)
return {};
return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, filter_delete_mark);
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, {rowkey_range});
}

SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const
Expand Down
Loading

0 comments on commit 6a9e9ac

Please sign in to comment.