Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fine grained shuffle for window function #5048

Merged
merged 52 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
21ea8e1
support fine grained shuffle
guo-shaoge May 24, 2022
1524bef
add unit test for StreamingDAGResponseWriter
guo-shaoge Jun 6, 2022
d860c45
add perf test for window function
guo-shaoge Jun 6, 2022
119b462
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 8, 2022
1e86122
fix conflict
guo-shaoge Jun 8, 2022
6dc4eab
fix contrib change
guo-shaoge Jun 8, 2022
2260514
fix some comment
guo-shaoge Jun 9, 2022
c1585b1
fix
guo-shaoge Jun 10, 2022
8211475
fix executeOrder()
guo-shaoge Jun 10, 2022
bcdf69f
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 15, 2022
162b1e3
add interpreter unittest
guo-shaoge Jun 15, 2022
df8aef7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 16, 2022
d03f0eb
refine microbenchmark
guo-shaoge Jun 16, 2022
e5a01cf
rm exchange_perftest.cpp
guo-shaoge Jun 16, 2022
278240c
update kvproto dep
guo-shaoge Jun 20, 2022
0d4bf2e
fix fmt
guo-shaoge Jun 20, 2022
a4ee8a8
fix lint
guo-shaoge Jun 20, 2022
73e314d
Merge branch 'master' into fine_grained_shuffle
SeaRise Jun 21, 2022
eb605ce
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 23, 2022
e3d3ff3
fix conflict
guo-shaoge Jun 23, 2022
e8b9747
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jun 23, 2022
db50925
enable fine_grained_shuffle in fragment level
guo-shaoge Jun 28, 2022
8624a85
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 29, 2022
c20c74c
stream_count from uint32_t to uint64_t
guo-shaoge Jun 30, 2022
b5df200
fix testcase
guo-shaoge Jul 1, 2022
4df7c9a
fix minor type
guo-shaoge Jul 1, 2022
d78a055
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 1, 2022
9210111
fix comment
guo-shaoge Jul 1, 2022
e52fc32
add extra_info for stream
guo-shaoge Jul 2, 2022
6fbeb02
make enable_fine_grained_shuffle as template argument
guo-shaoge Jul 3, 2022
709ac80
change uint64_t to UInt64
guo-shaoge Jul 3, 2022
2c7ab24
fix some comment, add fullstack test
guo-shaoge Jul 4, 2022
505f49a
update tipb
guo-shaoge Jul 4, 2022
b1de8cd
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 4, 2022
383b89e
fix fmt
guo-shaoge Jul 4, 2022
db8497f
update window.test
guo-shaoge Jul 4, 2022
eda9596
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 4, 2022
fb13e0e
fix some comments
guo-shaoge Jul 5, 2022
c23487a
update kvproto
guo-shaoge Jul 5, 2022
6e9f7b9
update kvproto
guo-shaoge Jul 5, 2022
030b874
fix bunch of comments
guo-shaoge Jul 7, 2022
59e3cb8
update
guo-shaoge Jul 7, 2022
c64d8ef
move send_exec_summary_at_last to top; Add RUNTIME_CHECK in DAGQueryB…
guo-shaoge Jul 7, 2022
c9ac908
using unique_ptr for msg_channels; use min(stream_count, max_stream)
guo-shaoge Jul 7, 2022
5893b37
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 7, 2022
0f035ec
fix
guo-shaoge Jul 7, 2022
031d1ed
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 7, 2022
4f05ba7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jul 8, 2022
8c34265
update tiflash-proxy
guo-shaoge Jul 8, 2022
ec3fe0d
fix conflict(mockExecutor.cpp)
guo-shaoge Jul 8, 2022
7b35d25
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 8, 2022
920673c
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
// ExchangeReceiverBlockInputStream only need to read its own stream, i.e., streams[stream_id].
// CoprocessorBlockInputStream doesn't take care of this.
UInt32 stream_id;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand Down Expand Up @@ -120,7 +125,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

bool fetchRemoteResult()
{
auto result = remote_reader->nextResult(block_queue, sample_block);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
if (result.meet_error)
{
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg);
Expand Down Expand Up @@ -168,13 +173,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

public:
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id)
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id, UInt32 stream_id_)
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
tipb_exchange_receiver.encoded_task_meta_size(),
10,
/*req_id=*/"",
/*executor_id=*/"");
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"");
/*executor_id=*/"",
/*fine_grained_shuffle_stream_count=*/0);
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/0);
return ret;
}
else
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class CoprocessorReader
return detail;
}

CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header)
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, UInt32 /* stream_id */)
{
auto && [result, has_next] = resp_iter.next();
if (!result.error.empty())
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ void DAGContext::initExchangeReceiverIfMPP(Context & context, size_t max_streams
executor.exchange_receiver().encoded_task_meta_size(),
max_streams,
log->identifier(),
executor_id);
executor_id,
fine_grained_shuffle_stream_count);
mpp_exchange_receiver_map[executor_id] = exchange_receiver;
new_thread_count_of_exchange_receiver += exchange_receiver->computeNewThreadCount();
}
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint32_t stream_count) { return stream_count > 0; }

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand All @@ -125,6 +127,8 @@ class DAGContext
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, fine_grained_shuffle_stream_count(0)
, fine_grained_shuffle_batch_size(0)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();
Expand All @@ -147,6 +151,8 @@ class DAGContext
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, fine_grained_shuffle_stream_count(0)
, fine_grained_shuffle_batch_size(0)
{
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());

Expand All @@ -167,6 +173,8 @@ class DAGContext
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
, fine_grained_shuffle_stream_count(0)
, fine_grained_shuffle_batch_size(0)
, is_test(true)
{}

Expand Down Expand Up @@ -341,6 +349,12 @@ class DAGContext
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

bool enableFineGrainedShuffle() const { return ::DB::enableFineGrainedShuffle(fine_grained_shuffle_stream_count); }
uint32_t getFineGrainedShuffleStreamCount() const { return fine_grained_shuffle_stream_count; }
Int64 getFineGrainedShuffleBatchSize() const { return fine_grained_shuffle_batch_size; }
void setFineGrainedShuffleStreamCount(uint32_t count) { fine_grained_shuffle_stream_count = count; }
void setFineGrainedShuffleBatchSize(Int64 size) { fine_grained_shuffle_batch_size = size; }

private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
Expand Down Expand Up @@ -375,6 +389,9 @@ class DAGContext
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;

uint32_t fine_grained_shuffle_stream_count;
Int64 fine_grained_shuffle_batch_size;

bool is_test = false; /// switch for test, do not use it in production.
std::unordered_map<String, ColumnsWithTypeAndName> columns_for_test_map; /// <exector_id, columns>, for multiple sources
};
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ try
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context);
dag_context,
/*fine_grained_shuffle_stream_count=*/0,
/*fine_grained_shuffle_batch_size=*/0);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
}
Expand Down
73 changes: 51 additions & 22 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,18 @@ void DAGQueryBlockInterpreter::executeWindow(
{
executeExpression(pipeline, window_description.before_window, "before window");

/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "merge into one for window input");
assert(pipeline.streams.size() == 1);
pipeline.firstStream() = std::make_shared<WindowBlockInputStream>(pipeline.firstStream(), window_description, log->identifier());
if (dagContext().enableFineGrainedShuffle())
{
// fine_grained_shuffle_stream_count is logical, no need to be equal to real stream count.
pipeline.transform([&](auto & stream) { stream = std::make_shared<WindowBlockInputStream>(stream, window_description, log->identifier()); });
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "merge into one for window input");
assert(pipeline.streams.size() == 1);
pipeline.firstStream() = std::make_shared<WindowBlockInputStream>(pipeline.firstStream(), window_description, log->identifier());
}
}

void DAGQueryBlockInterpreter::executeAggregation(
Expand Down Expand Up @@ -434,16 +442,16 @@ void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const E

void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc)
{
orderStreams(pipeline, sort_desc, 0);
orderStreams(pipeline, sort_desc, 0, dagContext().enableFineGrainedShuffle());
}

void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns)
{
Int64 limit = query_block.limit_or_topn->topn().limit();
orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit);
orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, false);
}

void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit)
void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit, bool enable_fine_grained_shuffle)
{
const Settings & settings = context.getSettingsRef();

Expand All @@ -459,18 +467,33 @@ void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescript
stream = sorting_stream;
});

/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "for partial order");

/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(),
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
if (enable_fine_grained_shuffle)
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<MergeSortingBlockInputStream>(
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
stream,
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
});
}
else
{
/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "for partial order");

/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(),
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
}
}

void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key)
Expand All @@ -486,9 +509,12 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name];
size_t stream_id = 0;
for (size_t i = 0; i < max_streams; ++i)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, log->identifier(), query_block.source_name);
if (dagContext().enableFineGrainedShuffle())
stream_id = i;
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, log->identifier(), query_block.source_name, stream_id);
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, log->identifier());
stream->setExtraInfo("squashing after exchange receiver");
Expand Down Expand Up @@ -743,6 +769,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
std::vector<Int64> partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender);
TiDB::TiDBCollators partition_col_collators = ExchangeSenderInterpreterHelper::genPartitionColCollators(exchange_sender);
int stream_id = 0;

pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>(
Expand All @@ -753,7 +780,9 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dagContext());
dagContext(),
dagContext().getFineGrainedShuffleStreamCount(),
dagContext().getFineGrainedShuffleBatchSize());
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
});
}
Expand Down Expand Up @@ -783,4 +812,4 @@ BlockInputStreams DAGQueryBlockInterpreter::execute()

return pipeline.streams;
}
} // namespace DB
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DAGQueryBlockInterpreter
void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column, const String & extra_info = "");
void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info = "");
void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc);
void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit);
void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit, bool enable_fine_grained_shuffle);
void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns);
void executeLimit(DAGPipeline & pipeline);
void executeWindow(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ void DAGStorageInterpreter::buildRemoteStreams(std::vector<RemoteRequest> && rem
std::vector<pingcap::coprocessor::copTask> tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end);

auto coprocessor_reader = std::make_shared<CoprocessorReader>(schema, cluster, tasks, has_enforce_encode_type, 1);
BlockInputStreamPtr input = std::make_shared<CoprocessorBlockInputStream>(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID());
BlockInputStreamPtr input = std::make_shared<CoprocessorBlockInputStream>(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID(), /*stream_id=*/0);
pipeline.streams.push_back(input);
task_start = task_end;
}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/DecodeDetail.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ namespace DB
/// Detail of the packet that decoding in TiRemoteInputStream.RemoteReader.decodeChunks()
struct DecodeDetail
{
// For fine grained shuffle, each ExchangeReceiver/thread will decode its own blocks.
// So this is the row number of partial blocks of the original packet.
// This will be the row number of all blocks of the original packet if it's not fine grained shuffle.
Int64 rows = 0;
// byte size of origin packet.

// Total byte size of the origin packet, even for fine grained shuffle.
Int64 packet_bytes = 0;
};
} // namespace DB
} // namespace DB
Loading