Skip to content

Commit

Permalink
disagg: Fix error when there are empty partitions (#8221)
Browse files Browse the repository at this point in the history
close #8220
  • Loading branch information
JaySon-Huang authored Oct 19, 2023
1 parent 429d27b commit fea463e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MPMCQueue
using Result = MPMCQueueResult;
using ElementAuxiliaryMemoryUsageFunc = std::function<Int64(const T & element)>;

MPMCQueue(
explicit MPMCQueue(
const CapacityLimits & capacity_limits_,
ElementAuxiliaryMemoryUsageFunc && get_auxiliary_memory_usage_ = [](const T &) { return 0; })
: capacity_limits(capacity_limits_)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ DAGContext::DAGContext(
const String & resource_group_name_,
LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_query_string(dag_request->ShortDebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(
Expand All @@ -79,7 +79,7 @@ DAGContext::DAGContext(
// for mpp
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_query_string(dag_request->ShortDebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(
dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
Expand Down Expand Up @@ -109,7 +109,7 @@ DAGContext::DAGContext(
const String & compute_node_host_,
LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_query_string(dag_request->ShortDebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(compute_node_host_)
, collect_execution_summaries(
Expand Down Expand Up @@ -149,7 +149,7 @@ DAGContext::DAGContext(UInt64 max_error_count_)
// for tests need to run query tasks.
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_query_string(dag_request->ShortDebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(
Expand Down
23 changes: 18 additions & 5 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ RNWorkers::RNWorkers(const Context & context, const Options & options, size_t nu
{
RUNTIME_CHECK(num_streams > 0, num_streams);
size_t n = options.read_task->segment_read_tasks.size();
RUNTIME_CHECK(n > 0, n);
if (n == 0)
{
empty_channel = std::make_shared<Channel>(0);
empty_channel->finish();
return;
}

auto fetch_pages_concurrency = n;
auto prepare_streams_concurrency = n;
Expand Down Expand Up @@ -69,18 +74,26 @@ RNWorkers::RNWorkers(const Context & context, const Options & options, size_t nu

void RNWorkers::startInBackground()
{
worker_fetch_pages->startInBackground();
worker_prepare_streams->startInBackground();
if (!empty_channel)
{
worker_fetch_pages->startInBackground();
worker_prepare_streams->startInBackground();
}
}

void RNWorkers::wait()
{
worker_fetch_pages->wait();
worker_prepare_streams->wait();
if (!empty_channel)
{
worker_fetch_pages->wait();
worker_prepare_streams->wait();
}
}

RNWorkers::ChannelPtr RNWorkers::getReadyChannel() const
{
if (empty_channel)
return empty_channel;
return worker_prepare_streams->result_queue;
}
} // namespace DB::DM::Remote
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class RNWorkers : private boost::noncopyable
}

private:
ChannelPtr empty_channel;

RNWorkerFetchPagesPtr worker_fetch_pages;
RNWorkerPrepareStreamsPtr worker_prepare_streams;
};
Expand Down

0 comments on commit fea463e

Please sign in to comment.