-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline: use notify instead of polling for ExchangeReceiver
#9073
Changes from 34 commits
fa590d8
62d3add
e7d8c9b
2ffee76
f76b7bd
563573b
e1e920c
2f258ae
f8e1dc6
e2adf5e
c5655c6
da81c3c
249cfa9
2c12e6c
03dd7b6
d2814b8
c0d784e
d9640df
df23506
0c43a11
94d954b
dd1316f
22ea753
902291b
4ddcf98
95fdb7f
ee4ad65
7f878ce
d4eac21
ec2838f
bd66511
d13644d
1f2423f
8da6615
b38899f
c6033a8
bb18d78
c85801b
275134b
af512d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,10 @@ | |
#include <Common/TiFlashMetrics.h> | ||
#include <Flash/Mpp/ReceivedMessage.h> | ||
#include <Flash/Mpp/TrackedMppDataPacket.h> | ||
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h> | ||
|
||
#include <memory> | ||
#include <utility> | ||
|
||
namespace DB | ||
{ | ||
|
@@ -55,6 +57,65 @@ enum class ReceiverMode | |
Async | ||
}; | ||
|
||
class GRPCNotifyQueue : public NotifyFuture | ||
{ | ||
public: | ||
template <typename... Args> | ||
explicit GRPCNotifyQueue(const LoggerPtr & log_, Args &&... args) | ||
: queue(log_, std::forward<Args>(args)...) | ||
{} | ||
|
||
void registerTask(TaskPtr && task) override { queue.registerPipeReadTask(std::move(task)); } | ||
|
||
MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue.pop(data); } | ||
|
||
MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue.tryPop(data); } | ||
|
||
MPMCQueueResult forcePush(ReceivedMessagePtr && data) { return queue.forcePush(std::move(data)); } | ||
|
||
MPMCQueueResult push(ReceivedMessagePtr && data) { return queue.push(std::move(data)); } | ||
|
||
MPMCQueueResult tryDequeue() { return queue.tryDequeue(); } | ||
|
||
MPMCQueueResult pushWithTag(ReceivedMessagePtr && data, GRPCKickTag * new_tag) | ||
{ | ||
return queue.pushWithTag(std::move(data), new_tag); | ||
} | ||
|
||
void setKickFuncForTest(GRPCKickFunc && func) { queue.setKickFuncForTest(std::move(func)); } | ||
|
||
bool finish() { return queue.finish(); } | ||
bool cancel() { return queue.cancel(); } | ||
|
||
bool isWritable() const { return queue.isWritable(); } | ||
|
||
void registerPipeWriteTask(TaskPtr && task) { queue.registerPipeWriteTask(std::move(task)); } | ||
|
||
private: | ||
GRPCRecvQueue<ReceivedMessagePtr> queue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about directly letting |
||
}; | ||
|
||
class MSGChannel : public NotifyFuture | ||
{ | ||
public: | ||
void registerTask(TaskPtr && task) override { queue_ref.registerPipeReadTask(std::move(task)); } | ||
|
||
MPMCQueueResult pop(ReceivedMessagePtr & data) { return queue_ref.pop(data); } | ||
|
||
MPMCQueueResult tryPop(ReceivedMessagePtr & data) { return queue_ref.tryPop(data); } | ||
|
||
MPMCQueueResult forcePush(const ReceivedMessagePtr & data) { return queue_ref.forcePush(data); } | ||
|
||
bool finish() { return queue_ref.finish(); } | ||
bool cancel() { return queue_ref.cancel(); } | ||
|
||
private: | ||
using QueueImpl = LooseBoundedMPMCQueue<ReceivedMessagePtr>; | ||
// these are unbounded queues. | ||
std::unique_ptr<QueueImpl> queue = std::make_unique<QueueImpl>(std::numeric_limits<size_t>::max()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ref #7963, |
||
QueueImpl & queue_ref = *queue; | ||
}; | ||
|
||
class ReceivedMessageQueue | ||
{ | ||
public: | ||
|
@@ -86,21 +147,20 @@ class ReceivedMessageQueue | |
grpc_recv_queue.finish(); | ||
/// msg_channels_for_fine_grained_shuffle must be finished after msg_channel is finished | ||
for (auto & channel : msg_channels_for_fine_grained_shuffle) | ||
channel->finish(); | ||
channel.finish(); | ||
} | ||
|
||
void cancel() | ||
{ | ||
grpc_recv_queue.cancel(); | ||
/// msg_channels_for_fine_grained_shuffle must be cancelled after msg_channel is cancelled | ||
for (auto & channel : msg_channels_for_fine_grained_shuffle) | ||
channel->cancel(); | ||
channel.cancel(); | ||
} | ||
|
||
bool isWritable() const { return grpc_recv_queue.isWritable(); } | ||
void notifyNextPipelineWriter() { grpc_recv_queue.notifyNextPipelineWriter(); } | ||
|
||
void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); } | ||
void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); } | ||
|
||
#ifndef DBMS_PUBLIC_GTEST | ||
|
@@ -119,8 +179,8 @@ class ReceivedMessageQueue | |
/// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle | ||
/// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then | ||
/// remove the msg from msg_channel/grpc_recv_queue | ||
std::vector<std::shared_ptr<LooseBoundedMPMCQueue<ReceivedMessagePtr>>> msg_channels_for_fine_grained_shuffle; | ||
GRPCRecvQueue<ReceivedMessagePtr> grpc_recv_queue; | ||
std::vector<MSGChannel> msg_channels_for_fine_grained_shuffle; | ||
GRPCNotifyQueue grpc_recv_queue; | ||
}; | ||
|
||
} // namespace DB |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,10 @@ struct NotifyFuture | |
virtual ~NotifyFuture() = default; | ||
virtual void registerTask(TaskPtr && task) = 0; | ||
}; | ||
using NotifyFuturePtr = std::shared_ptr<NotifyFuture>; | ||
|
||
extern thread_local NotifyFuturePtr current_notify_future; | ||
extern thread_local NotifyFuture * current_notify_future; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this change may increase the risk of using a object that have been already released, is it by design that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, |
||
|
||
void setNotifyFuture(NotifyFuturePtr new_future); | ||
void setNotifyFuture(NotifyFuture * new_future); | ||
void clearNotifyFuture(); | ||
void registerTaskToFuture(TaskPtr && task); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename it to
registerPipelineReadTask
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is an override of NotifyFuture::registerTask, it cannot be renamed