Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fast scan code mode clean #6058

Merged
merged 15 commits into from
Oct 13, 2022
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "Read done");
return {};
}
buildStreamBasedOnReadMode(cur_stream, read_mode, task, dm_context, columns_to_read, filter, max_version, expected_block_size);
LOG_TRACE(log, "Start to read segment, segment={}", task->segment->simpleInfo());
cur_segment = task->segment;

auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
cur_stream = task->segment->getInputStream(read_mode, *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, max_version, block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);

Expand Down
62 changes: 44 additions & 18 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include <memory>
#include <numeric>

#include "Storages/DeltaMerge/SegmentReadTaskPool.h"
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved

namespace ProfileEvents
{
extern const Event DMWriteBlock;
Expand Down Expand Up @@ -395,13 +397,37 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo
return std::make_shared<SegmentSnapshot>(std::move(delta_snap), std::move(stable_snap));
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,

BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode,
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
switch (read_mode)
{
case ReadMode::Normal:
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
break;
case ReadMode::Fast:
return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size);
break;
case ReadMode::Raw:
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size);
break;
}
}

BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
LOG_TRACE(log, "Begin segment create input stream");

Expand Down Expand Up @@ -479,17 +505,17 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
return stream;
}

BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
if (!segment_snap)
return {};
return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size);
}

BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_context,
Expand Down Expand Up @@ -527,11 +553,11 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
return data_stream;
}

/// We call getInputStreamFast when we read in fast mode.
/// We call getInputStreamModeFast when we read in fast mode.
/// In this case, we will read all the data in delta and stable, and then merge them without sorting.
/// Besides, we will do del_mark != 0 filtering to drop the deleted rows.
/// In conclusion, the output is unsorted, and does not do mvcc filtering.
BlockInputStreamPtr Segment::getInputStreamFast(
BlockInputStreamPtr Segment::getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
Expand Down Expand Up @@ -618,13 +644,13 @@ BlockInputStreamPtr Segment::getInputStreamFast(
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

/// We call getInputStreamRaw in 'selraw xxxx' statement, which is always in test for debug.
/// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug.
/// In this case, we will read all the data without mvcc filtering and sorted merge.
BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size)
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size)
{
auto new_columns_to_read = std::make_shared<ColumnDefines>();

Expand Down Expand Up @@ -673,12 +699,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
return std::make_shared<ConcatBlockInputStream>(streams, dm_context.tracing_id);
}

BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read)
BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read)
{
auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw);
if (!segment_snap)
return {};
return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range});
return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, {rowkey_range});
}

SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const
Expand Down
22 changes: 17 additions & 5 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/WriteBatch.h>

#include "Storages/DeltaMerge/SegmentReadTaskPool.h"
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved

namespace DB::DM
{
class Segment;
Expand Down Expand Up @@ -155,7 +157,7 @@ class Segment : private boost::noncopyable

SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const;

BlockInputStreamPtr getInputStream(
BlockInputStreamPtr getInputStreamModeNormal(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
Expand All @@ -164,7 +166,7 @@ class Segment : private boost::noncopyable
UInt64 max_version,
size_t expected_block_size);

BlockInputStreamPtr getInputStream(
BlockInputStreamPtr getInputStreamModeNormal(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
Expand All @@ -182,22 +184,32 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamFast(
BlockInputStreamPtr getInputStream(
const ReadMode & read_mode,
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size);

BlockInputStreamPtr getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(
BlockInputStreamPtr getInputStreamModeRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(
BlockInputStreamPtr getInputStreamModeRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read);

Expand Down
37 changes: 2 additions & 35 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,6 @@ extern const Metric DT_SegmentReadTasks;

namespace DB::DM
{
void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size)
{
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
switch (read_mode)
{
case ReadMode::Normal:
stream = task->segment->getInputStream(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
max_version,
block_size);
break;
case ReadMode::Fast:
stream = task->segment->getInputStreamFast(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
block_size);
break;
case ReadMode::Raw:
stream = task->segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges);
break;
}
}

SegmentReadTask::SegmentReadTask(const SegmentPtr & segment_, //
const SegmentSnapshotPtr & read_snapshot_,
const RowKeyRanges & ranges_)
Expand Down Expand Up @@ -133,7 +99,8 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t
{
MemoryTrackerSetter setter(true, mem_tracker.get());
BlockInputStreamPtr stream;
buildStreamBasedOnReadMode(stream, read_mode, t, dm_context, columns_to_read, filter, max_version, expected_block_size);
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code in dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp L103-L131 is similar to dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h L103-L131.

Maybe we can do some refactoring and make code reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I try to make a new function for L103-L131 to make code reusable, please take a look.

stream = t->segment->getInputStream(read_mode, *dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size);
LOG_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId());
return stream;
}
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#include <Storages/DeltaMerge/ReadThread/WorkQueue.h>
#include <Storages/DeltaMerge/RowKeyRangeUtils.h>

#include "Storages/DeltaMerge/Segment.h"

namespace DB
{
namespace DM
Expand Down Expand Up @@ -133,7 +131,6 @@ enum class ReadMode
Raw,
};

void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size);
class SegmentReadTaskPool : private boost::noncopyable
{
public:
Expand Down
Loading