Skip to content

Commit

Permalink
Pipeline: use notify instead of polling for ExchangeReceiver (#9073)
Browse files Browse the repository at this point in the history
ref #8869

Signed-off-by: gengliqi <[email protected]>

Co-authored-by: gengliqi <[email protected]>
Co-authored-by: Liqi Geng <[email protected]>
  • Loading branch information
SeaRise and gengliqi authored Sep 27, 2024
1 parent ea1105d commit 92e8dac
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 113 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ class DAGContext
UInt64 getConnectionID() const { return connection_id; }
const String & getConnectionAlias() const { return connection_alias; }

MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; }

public:
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
Expand Down Expand Up @@ -443,6 +445,7 @@ class DAGContext
/// warning_count is the actual warning count during the entire execution
std::atomic<UInt64> warning_count;

// `mpp_receiver_set` is always set by `MPPTask` and is used later.
MPPReceiverSetPtr mpp_receiver_set;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
/// vector of SubqueriesForSets(such as join build subquery).
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
Expand Down Expand Up @@ -180,9 +181,12 @@ void PipelineExecutorContext::cancel()
cancelOneTimeFutures();
if (likely(dag_context))
{
// Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
// Cancel the tunnel_set and mpp_receiver_set here to prevent
// pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
if (dag_context->tunnel_set)
dag_context->tunnel_set->close(getTrimmedErrMsg(), false);
if (auto mpp_receiver_set = dag_context->getMPPReceiverSet(); mpp_receiver_set)
mpp_receiver_set->cancel();
}
cancelResultQueueIfNeed();
if likely (TaskScheduler::instance && !query_id.empty())
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ struct LocalRequestHandler
bool isWritable() const { return msg_queue->isWritable(); }
void notifyNextPipelineWriter() const { return msg_queue->notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); }

void writeDone(bool meet_error, const String & local_err_msg) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ WaitResult MPPTunnel::waitForWritable() const
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
if (!tunnel_sender->isWritable())
{
setNotifyFuture(tunnel_sender);
setNotifyFuture(tunnel_sender.get());
return WaitResult::WaitForNotify;
}
return WaitResult::Ready;
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/ReceivedMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace DB
{
const std::vector<const String *> & ReceivedMessage::getChunks(size_t stream_id) const
{
if (remaining_consumers != nullptr)
if (fine_grained_consumer_size > 0)
return fine_grained_chunks[stream_id];
else
return chunks;
Expand All @@ -31,17 +31,18 @@ ReceivedMessage::ReceivedMessage(
const mpp::Error * error_ptr_,
const String * resp_ptr_,
std::vector<const String *> && chunks_,
size_t fine_grained_consumer_size)
size_t fine_grained_consumer_size_)
: source_index(source_index_)
, req_info(req_info_)
, packet(packet_)
, error_ptr(error_ptr_)
, resp_ptr(resp_ptr_)
, chunks(chunks_)
, remaining_consumers(fine_grained_consumer_size_)
, fine_grained_consumer_size(fine_grained_consumer_size_)
{
if (fine_grained_consumer_size > 0)
{
remaining_consumers = std::make_shared<std::atomic<size_t>>(fine_grained_consumer_size);
fine_grained_chunks.resize(fine_grained_consumer_size);
if (packet->packet.chunks_size() > 0)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/ReceivedMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ReceivedMessage
std::vector<const String *> chunks;
/// used for fine grained shuffle, remaining_consumers will be nullptr for non fine grained shuffle
std::vector<std::vector<const String *>> fine_grained_chunks;
std::shared_ptr<std::atomic<size_t>> remaining_consumers;
std::atomic<size_t> remaining_consumers;
size_t fine_grained_consumer_size;

public:
// Constructor that move chunks.
Expand All @@ -50,7 +51,7 @@ class ReceivedMessage
const String & getReqInfo() const { return req_info; }
const mpp::Error * getErrorPtr() const { return error_ptr; }
const String * getRespPtr(size_t stream_id) const { return stream_id == 0 ? resp_ptr : nullptr; }
std::shared_ptr<std::atomic<size_t>> & getRemainingConsumers() { return remaining_consumers; }
std::atomic<size_t> & getRemainingConsumers() { return remaining_consumers; }
const std::vector<const String *> & getChunks(size_t stream_id) const;
const mpp::MPPDataPacket & getPacket() const { return packet->packet; }
bool containUsefulMessage() const;
Expand Down
36 changes: 25 additions & 11 deletions dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ ReceivedMessageQueue::ReceivedMessageQueue(
assert(fine_grained_channel_size > 0);
msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size);
for (size_t i = 0; i < fine_grained_channel_size; ++i)
/// these are unbounded queues
msg_channels_for_fine_grained_shuffle.push_back(
std::make_shared<LooseBoundedMPMCQueue<ReceivedMessagePtr>>(std::numeric_limits<size_t>::max()));
msg_channels_for_fine_grained_shuffle.emplace_back(std::make_unique<MSGUnboundedQueue>());
}
}

Expand All @@ -133,7 +131,7 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &

if (res == MPMCQueueResult::OK)
{
if (recv_msg->getRemainingConsumers()->fetch_sub(1) == 1)
if (recv_msg->getRemainingConsumers().fetch_sub(1) == 1)
{
#ifndef NDEBUG
ReceivedMessagePtr original_msg;
Expand All @@ -145,12 +143,21 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &
"The result of 'grpc_recv_queue->tryPop' is definitely not EMPTY.");
if likely (original_msg != nullptr)
RUNTIME_CHECK_MSG(
*original_msg->getRemainingConsumers() == 0,
original_msg->getRemainingConsumers() == 0,
"Fine grained receiver pop a message that is not full consumed, remaining consumer: {}",
*original_msg->getRemainingConsumers());
original_msg->getRemainingConsumers());
#else
grpc_recv_queue.tryDequeue();
#endif
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
}
}
else
{
if constexpr (!need_wait)
{
if (res == MPMCQueueResult::EMPTY)
setNotifyFuture(msg_channels_for_fine_grained_shuffle[stream_id].get());
}
}
}
Expand All @@ -160,13 +167,20 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr &
res = grpc_recv_queue.pop(recv_msg);
else
res = grpc_recv_queue.tryPop(recv_msg);
}

if (res == MPMCQueueResult::OK)
{
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
if (res == MPMCQueueResult::OK)
{
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong());
}
else
{
if constexpr (!need_wait)
{
if (res == MPMCQueueResult::EMPTY)
setNotifyFuture(&grpc_recv_queue);
}
}
}

return res;
}

Expand Down
32 changes: 29 additions & 3 deletions dbms/src/Flash/Mpp/ReceivedMessageQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <Common/TiFlashMetrics.h>
#include <Flash/Mpp/ReceivedMessage.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>

#include <memory>
#include <utility>

namespace DB
{
Expand Down Expand Up @@ -55,6 +57,31 @@ enum class ReceiverMode
Async
};

class GRPCNotifyRecvQueue final
: public NotifyFuture
, public GRPCRecvQueue<ReceivedMessagePtr>
{
public:
template <typename... Args>
explicit GRPCNotifyRecvQueue(const LoggerPtr & log_, Args &&... args)
: GRPCRecvQueue<ReceivedMessagePtr>(log_, std::forward<Args>(args)...)
{}

void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); }
};

class MSGUnboundedQueue final
: public NotifyFuture
, public LooseBoundedMPMCQueue<ReceivedMessagePtr>
{
public:
MSGUnboundedQueue()
: LooseBoundedMPMCQueue<ReceivedMessagePtr>(std::numeric_limits<size_t>::max())
{}

void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); }
};

class ReceivedMessageQueue
{
public:
Expand Down Expand Up @@ -100,7 +127,6 @@ class ReceivedMessageQueue
bool isWritable() const { return grpc_recv_queue.isWritable(); }
void notifyNextPipelineWriter() { grpc_recv_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); }

#ifndef DBMS_PUBLIC_GTEST
Expand All @@ -119,8 +145,8 @@ class ReceivedMessageQueue
/// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle
/// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then
/// remove the msg from msg_channel/grpc_recv_queue
std::vector<std::shared_ptr<LooseBoundedMPMCQueue<ReceivedMessagePtr>>> msg_channels_for_fine_grained_shuffle;
GRPCRecvQueue<ReceivedMessagePtr> grpc_recv_queue;
std::vector<std::unique_ptr<MSGUnboundedQueue>> msg_channels_for_fine_grained_shuffle;
GRPCNotifyRecvQueue grpc_recv_queue;
};

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ExecTaskStatus StreamRestoreTask::tryFlush()
t_block.clear();
return ExecTaskStatus::IO_IN;
case MPMCQueueResult::FULL:
setNotifyFuture(sink);
setNotifyFuture(sink.get());
return ExecTaskStatus::WAIT_FOR_NOTIFY;
case MPMCQueueResult::CANCELLED:
return ExecTaskStatus::CANCELLED;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

namespace DB
{
thread_local NotifyFuturePtr current_notify_future = nullptr;
thread_local NotifyFuture * current_notify_future = nullptr;

void setNotifyFuture(NotifyFuturePtr new_future)
void setNotifyFuture(NotifyFuture * new_future)
{
assert(current_notify_future == nullptr);
current_notify_future = std::move(new_future);
}

void clearNotifyFuture()
{
current_notify_future.reset();
current_notify_future = nullptr;
}

void registerTaskToFuture(TaskPtr && task)
{
assert(current_notify_future != nullptr);
current_notify_future->registerTask(std::move(task));
current_notify_future.reset();
current_notify_future = nullptr;
}
} // namespace DB
5 changes: 2 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ struct NotifyFuture
virtual ~NotifyFuture() = default;
virtual void registerTask(TaskPtr && task) = 0;
};
using NotifyFuturePtr = std::shared_ptr<NotifyFuture>;

extern thread_local NotifyFuturePtr current_notify_future;
extern thread_local NotifyFuture * current_notify_future;

void setNotifyFuture(NotifyFuturePtr new_future);
void setNotifyFuture(NotifyFuture * new_future);
void clearNotifyFuture();
void registerTaskToFuture(TaskPtr && task);

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ namespace DB
* CANCELLED/ERROR/FINISHED
* ▲
* │
* ───────────────────────────────┐
* │ ┌──►RUNNING◄──┐ │
* INIT───►│ │ │ │
* ▼ │
* │ WATITING◄────────►IO_IN/OUT │
* ───────────────────────────────┘
* ┌───────────────────────────────────────────────┐
* │ ┌──────────►RUNNING◄──────────┐ │
* │ │ │ │
* ▼ │
* │ WATITING/WAIT_FOR_NOTIFY◄────────►IO_IN/OUT │
* └───────────────────────────────────────────────┘
*/
enum class ExecTaskStatus
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,7 @@ bool Join::isProbeFinishedForPipeline() const
{
if (!probe_finished)
{
setNotifyFuture(wait_probe_finished_future);
setNotifyFuture(wait_probe_finished_future.get());
return false;
}
return true;
Expand All @@ -2167,7 +2167,7 @@ bool Join::isBuildFinishedForPipeline() const
{
if (!build_finished)
{
setNotifyFuture(wait_build_finished_future);
setNotifyFuture(wait_build_finished_future.get());
return false;
}
return true;
Expand Down
Loading

0 comments on commit 92e8dac

Please sign in to comment.