diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index f77d76647e9..c52b0d63e23 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -351,6 +351,8 @@ class DAGContext UInt64 getConnectionID() const { return connection_id; } const String & getConnectionAlias() const { return connection_alias; } + MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; } + public: DAGRequest dag_request; /// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast, @@ -443,6 +445,7 @@ class DAGContext /// warning_count is the actual warning count during the entire execution std::atomic warning_count; + // `mpp_receiver_set` is always set by `MPPTask` and is used later. MPPReceiverSetPtr mpp_receiver_set; std::vector coprocessor_readers; /// vector of SubqueriesForSets(such as join build subquery). diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 97010827f2d..32c1ca37e4c 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -180,9 +181,12 @@ void PipelineExecutorContext::cancel() cancelOneTimeFutures(); if (likely(dag_context)) { - // Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. + // Cancel the tunnel_set and mpp_receiver_set here to prevent + // pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. if (dag_context->tunnel_set) dag_context->tunnel_set->close(getTrimmedErrMsg(), false); + if (auto mpp_receiver_set = dag_context->getMPPReceiverSet(); mpp_receiver_set) + mpp_receiver_set->cancel(); } cancelResultQueueIfNeed(); if likely (TaskScheduler::instance && !query_id.empty()) diff --git a/dbms/src/Flash/Mpp/LocalRequestHandler.h b/dbms/src/Flash/Mpp/LocalRequestHandler.h index e598f67bae8..bafcc180a53 100644 --- a/dbms/src/Flash/Mpp/LocalRequestHandler.h +++ b/dbms/src/Flash/Mpp/LocalRequestHandler.h @@ -43,7 +43,6 @@ struct LocalRequestHandler bool isWritable() const { return msg_queue->isWritable(); } void notifyNextPipelineWriter() const { return msg_queue->notifyNextPipelineWriter(); } - void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); } void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); } void writeDone(bool meet_error, const String & local_err_msg) const diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 9a16322d241..be692c7d00d 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -403,7 +403,7 @@ WaitResult MPPTunnel::waitForWritable() const RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); if (!tunnel_sender->isWritable()) { - setNotifyFuture(tunnel_sender); + setNotifyFuture(tunnel_sender.get()); return WaitResult::WaitForNotify; } return WaitResult::Ready; diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.cpp b/dbms/src/Flash/Mpp/ReceivedMessage.cpp index 732e603616a..eb1116e7481 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessage.cpp @@ -18,7 +18,7 @@ namespace DB { const std::vector & ReceivedMessage::getChunks(size_t stream_id) const { - if (remaining_consumers != nullptr) + if (fine_grained_consumer_size > 0) return fine_grained_chunks[stream_id]; else return chunks; @@ -31,17 +31,18 @@ ReceivedMessage::ReceivedMessage( const mpp::Error * error_ptr_, const String * resp_ptr_, std::vector && chunks_, - size_t fine_grained_consumer_size) + size_t fine_grained_consumer_size_) : source_index(source_index_) , req_info(req_info_) , packet(packet_) , error_ptr(error_ptr_) , resp_ptr(resp_ptr_) , chunks(chunks_) + , remaining_consumers(fine_grained_consumer_size_) + , fine_grained_consumer_size(fine_grained_consumer_size_) { if (fine_grained_consumer_size > 0) { - remaining_consumers = std::make_shared>(fine_grained_consumer_size); fine_grained_chunks.resize(fine_grained_consumer_size); if (packet->packet.chunks_size() > 0) { diff --git a/dbms/src/Flash/Mpp/ReceivedMessage.h b/dbms/src/Flash/Mpp/ReceivedMessage.h index a94531de0b8..664a1dd4a64 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessage.h +++ b/dbms/src/Flash/Mpp/ReceivedMessage.h @@ -33,7 +33,8 @@ class ReceivedMessage std::vector chunks; /// used for fine grained shuffle, remaining_consumers will be nullptr for non fine grained shuffle std::vector> fine_grained_chunks; - std::shared_ptr> remaining_consumers; + std::atomic remaining_consumers; + size_t fine_grained_consumer_size; public: // Constructor that move chunks. @@ -50,7 +51,7 @@ class ReceivedMessage const String & getReqInfo() const { return req_info; } const mpp::Error * getErrorPtr() const { return error_ptr; } const String * getRespPtr(size_t stream_id) const { return stream_id == 0 ? resp_ptr : nullptr; } - std::shared_ptr> & getRemainingConsumers() { return remaining_consumers; } + std::atomic & getRemainingConsumers() { return remaining_consumers; } const std::vector & getChunks(size_t stream_id) const; const mpp::MPPDataPacket & getPacket() const { return packet->packet; } bool containUsefulMessage() const; diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp index 63096695eb8..f89dc988826 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.cpp @@ -114,9 +114,7 @@ ReceivedMessageQueue::ReceivedMessageQueue( assert(fine_grained_channel_size > 0); msg_channels_for_fine_grained_shuffle.reserve(fine_grained_channel_size); for (size_t i = 0; i < fine_grained_channel_size; ++i) - /// these are unbounded queues - msg_channels_for_fine_grained_shuffle.push_back( - std::make_shared>(std::numeric_limits::max())); + msg_channels_for_fine_grained_shuffle.emplace_back(std::make_unique()); } } @@ -133,7 +131,7 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & if (res == MPMCQueueResult::OK) { - if (recv_msg->getRemainingConsumers()->fetch_sub(1) == 1) + if (recv_msg->getRemainingConsumers().fetch_sub(1) == 1) { #ifndef NDEBUG ReceivedMessagePtr original_msg; @@ -145,12 +143,21 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & "The result of 'grpc_recv_queue->tryPop' is definitely not EMPTY."); if likely (original_msg != nullptr) RUNTIME_CHECK_MSG( - *original_msg->getRemainingConsumers() == 0, + original_msg->getRemainingConsumers() == 0, "Fine grained receiver pop a message that is not full consumed, remaining consumer: {}", - *original_msg->getRemainingConsumers()); + original_msg->getRemainingConsumers()); #else grpc_recv_queue.tryDequeue(); #endif + ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + } + } + else + { + if constexpr (!need_wait) + { + if (res == MPMCQueueResult::EMPTY) + setNotifyFuture(msg_channels_for_fine_grained_shuffle[stream_id].get()); } } } @@ -160,13 +167,20 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & res = grpc_recv_queue.pop(recv_msg); else res = grpc_recv_queue.tryPop(recv_msg); - } - if (res == MPMCQueueResult::OK) - { - ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + if (res == MPMCQueueResult::OK) + { + ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); + } + else + { + if constexpr (!need_wait) + { + if (res == MPMCQueueResult::EMPTY) + setNotifyFuture(&grpc_recv_queue); + } + } } - return res; } diff --git a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h index e160dfcc49c..d0abb200455 100644 --- a/dbms/src/Flash/Mpp/ReceivedMessageQueue.h +++ b/dbms/src/Flash/Mpp/ReceivedMessageQueue.h @@ -21,8 +21,10 @@ #include #include #include +#include #include +#include namespace DB { @@ -55,6 +57,31 @@ enum class ReceiverMode Async }; +class GRPCNotifyRecvQueue final + : public NotifyFuture + , public GRPCRecvQueue +{ +public: + template + explicit GRPCNotifyRecvQueue(const LoggerPtr & log_, Args &&... args) + : GRPCRecvQueue(log_, std::forward(args)...) + {} + + void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); } +}; + +class MSGUnboundedQueue final + : public NotifyFuture + , public LooseBoundedMPMCQueue +{ +public: + MSGUnboundedQueue() + : LooseBoundedMPMCQueue(std::numeric_limits::max()) + {} + + void registerTask(TaskPtr && task) override { registerPipeReadTask(std::move(task)); } +}; + class ReceivedMessageQueue { public: @@ -100,7 +127,6 @@ class ReceivedMessageQueue bool isWritable() const { return grpc_recv_queue.isWritable(); } void notifyNextPipelineWriter() { grpc_recv_queue.notifyNextPipelineWriter(); } - void registerPipeReadTask(TaskPtr && task) { grpc_recv_queue.registerPipeReadTask(std::move(task)); } void registerPipeWriteTask(TaskPtr && task) { grpc_recv_queue.registerPipeWriteTask(std::move(task)); } #ifndef DBMS_PUBLIC_GTEST @@ -119,8 +145,8 @@ class ReceivedMessageQueue /// write: the writer first write the msg to msg_channel/grpc_recv_queue, if write success, then write msg to msg_channels_for_fine_grained_shuffle /// read: the reader read msg from msg_channels_for_fine_grained_shuffle, and reduce the `remaining_consumers` in msg, if `remaining_consumers` is 0, then /// remove the msg from msg_channel/grpc_recv_queue - std::vector>> msg_channels_for_fine_grained_shuffle; - GRPCRecvQueue grpc_recv_queue; + std::vector> msg_channels_for_fine_grained_shuffle; + GRPCNotifyRecvQueue grpc_recv_queue; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp index a08a1a9aff6..105435896f4 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp @@ -48,7 +48,7 @@ ExecTaskStatus StreamRestoreTask::tryFlush() t_block.clear(); return ExecTaskStatus::IO_IN; case MPMCQueueResult::FULL: - setNotifyFuture(sink); + setNotifyFuture(sink.get()); return ExecTaskStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::CANCELLED: return ExecTaskStatus::CANCELLED; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp index b0a4fef9b05..587c20c2ff5 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp @@ -16,9 +16,9 @@ namespace DB { -thread_local NotifyFuturePtr current_notify_future = nullptr; +thread_local NotifyFuture * current_notify_future = nullptr; -void setNotifyFuture(NotifyFuturePtr new_future) +void setNotifyFuture(NotifyFuture * new_future) { assert(current_notify_future == nullptr); current_notify_future = std::move(new_future); @@ -26,13 +26,13 @@ void setNotifyFuture(NotifyFuturePtr new_future) void clearNotifyFuture() { - current_notify_future.reset(); + current_notify_future = nullptr; } void registerTaskToFuture(TaskPtr && task) { assert(current_notify_future != nullptr); current_notify_future->registerTask(std::move(task)); - current_notify_future.reset(); + current_notify_future = nullptr; } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h index 5ba6c977f02..ca86dad7587 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h @@ -24,11 +24,10 @@ struct NotifyFuture virtual ~NotifyFuture() = default; virtual void registerTask(TaskPtr && task) = 0; }; -using NotifyFuturePtr = std::shared_ptr; -extern thread_local NotifyFuturePtr current_notify_future; +extern thread_local NotifyFuture * current_notify_future; -void setNotifyFuture(NotifyFuturePtr new_future); +void setNotifyFuture(NotifyFuture * new_future); void clearNotifyFuture(); void registerTaskToFuture(TaskPtr && task); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 59b74e0ffcb..c15299e6f32 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -26,12 +26,12 @@ namespace DB * CANCELLED/ERROR/FINISHED * ▲ * │ - * ┌───────────────────────────────┐ - * │ ┌──►RUNNING◄──┐ │ - * INIT───►│ │ │ │ - * │ ▼ ▼ │ - * │ WATITING◄────────►IO_IN/OUT │ - * └───────────────────────────────┘ + * ┌───────────────────────────────────────────────┐ + * │ ┌──────────►RUNNING◄──────────┐ │ + * │ │ │ │ + * │ ▼ ▼ │ + * │ WATITING/WAIT_FOR_NOTIFY◄────────►IO_IN/OUT │ + * └───────────────────────────────────────────────┘ */ enum class ExecTaskStatus { diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 466868041be..873772014f8 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -2157,7 +2157,7 @@ bool Join::isProbeFinishedForPipeline() const { if (!probe_finished) { - setNotifyFuture(wait_probe_finished_future); + setNotifyFuture(wait_probe_finished_future.get()); return false; } return true; @@ -2167,7 +2167,7 @@ bool Join::isBuildFinishedForPipeline() const { if (!build_finished) { - setNotifyFuture(wait_build_finished_future); + setNotifyFuture(wait_build_finished_future.get()); return false; } return true; diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp index 8e017fa497c..968d55f3425 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.cpp +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.cpp @@ -29,6 +29,37 @@ Block ExchangeReceiverSourceOp::popFromBlockQueue() return block; } +void ExchangeReceiverSourceOp::recordDecodeDetail( + const DecodeDetail & decode_detail, + size_t index, + const String & req_info) +{ + auto & connection_profile_info = io_profile_info->connection_profile_infos[index]; + connection_profile_info.packets += decode_detail.packets; + connection_profile_info.bytes += decode_detail.packet_bytes; + total_rows += decode_detail.rows; + LOG_TRACE( + log, + "recv {} rows from exchange receiver for {}, total recv row num: {}", + decode_detail.rows, + req_info, + total_rows); +} + +void ExchangeReceiverSourceOp::handleError(const ExchangeReceiverResult & result) const +{ + if (result.meet_error) + { + LOG_WARNING(log, "exchange receiver meets error: {}", result.error_msg); + throw Exception(result.error_msg); + } + if (result.resp != nullptr && result.resp->has_error()) + { + LOG_WARNING(log, "exchange receiver meets error: {}", result.resp->error().DebugString()); + throw Exception(result.resp->error().DebugString()); + } +} + OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) { if (!block_queue.empty()) @@ -40,77 +71,41 @@ OperatorStatus ExchangeReceiverSourceOp::readImpl(Block & block) while (true) { assert(block_queue.empty()); - auto await_status = awaitImpl(); - if (await_status == OperatorStatus::HAS_OUTPUT) + ReceivedMessagePtr recv_msg = nullptr; + auto receive_status = exchange_receiver->tryReceive(stream_id, recv_msg); + switch (receive_status) { - assert(receive_status != ReceiveStatus::empty); - auto result - = exchange_receiver - ->toExchangeReceiveResult(stream_id, receive_status, recv_msg, block_queue, header, decoder_ptr); - recv_msg = nullptr; - receive_status = ReceiveStatus::empty; - - if (result.meet_error) - { - LOG_WARNING(log, "exchange receiver meets error: {}", result.error_msg); - throw Exception(result.error_msg); - } - if (result.resp != nullptr && result.resp->has_error()) - { - LOG_WARNING(log, "exchange receiver meets error: {}", result.resp->error().DebugString()); - throw Exception(result.resp->error().DebugString()); - } - if (result.eof) - { - LOG_DEBUG(log, "exchange receiver meets eof"); - return OperatorStatus::HAS_OUTPUT; - } - - /// only the last response contains execution summaries - if (result.resp != nullptr) - io_profile_info->remote_execution_summary.add(*result.resp); - - const auto & decode_detail = result.decode_detail; - auto & connection_profile_info = io_profile_info->connection_profile_infos[result.call_index]; - connection_profile_info.packets += decode_detail.packets; - connection_profile_info.bytes += decode_detail.packet_bytes; - - total_rows += decode_detail.rows; - LOG_TRACE( - log, - "recv {} rows from exchange receiver for {}, total recv row num: {}", - decode_detail.rows, - result.req_info, - total_rows); + case ReceiveStatus::empty: + assert(!recv_msg); + assert(current_notify_future); + return OperatorStatus::WAIT_FOR_NOTIFY; + case ReceiveStatus::ok: + assert(recv_msg); + case ReceiveStatus::eof: + break; + } - if (decode_detail.rows <= 0) - continue; + auto result + = exchange_receiver + ->toExchangeReceiveResult(stream_id, receive_status, recv_msg, block_queue, header, decoder_ptr); - block = popFromBlockQueue(); + handleError(result); + if (result.eof) + { + LOG_DEBUG(log, "exchange receiver meets eof"); return OperatorStatus::HAS_OUTPUT; } - return await_status; - } -} -OperatorStatus ExchangeReceiverSourceOp::awaitImpl() -{ - if unlikely (!block_queue.empty()) - return OperatorStatus::HAS_OUTPUT; - if unlikely (receive_status != ReceiveStatus::empty) - return OperatorStatus::HAS_OUTPUT; + /// only the last response contains execution summaries + if (result.resp != nullptr) + io_profile_info->remote_execution_summary.add(*result.resp); - assert(!recv_msg); - receive_status = exchange_receiver->tryReceive(stream_id, recv_msg); - switch (receive_status) - { - case ReceiveStatus::ok: - assert(recv_msg); - return OperatorStatus::HAS_OUTPUT; - case ReceiveStatus::empty: - assert(!recv_msg); - return OperatorStatus::WAITING; - case ReceiveStatus::eof: + const auto & decode_detail = result.decode_detail; + recordDecodeDetail(result.decode_detail, result.call_index, result.req_info); + if (decode_detail.rows <= 0) + continue; + + block = popFromBlockQueue(); return OperatorStatus::HAS_OUTPUT; } } diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.h b/dbms/src/Operators/ExchangeReceiverSourceOp.h index f7f1e86c99f..9185f51bf0d 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.h +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.h @@ -48,20 +48,19 @@ class ExchangeReceiverSourceOp : public SourceOp OperatorStatus readImpl(Block & block) override; - OperatorStatus awaitImpl() override; - private: Block popFromBlockQueue(); + void recordDecodeDetail(const DecodeDetail & decode_detail, size_t index, const String & req_info); + + void handleError(const ExchangeReceiverResult & result) const; + private: std::shared_ptr exchange_receiver; std::unique_ptr decoder_ptr; uint64_t total_rows{}; std::queue block_queue; - ReceivedMessagePtr recv_msg = nullptr; - ReceiveStatus receive_status = ReceiveStatus::empty; - size_t stream_id; IOProfileInfoPtr io_profile_info; diff --git a/dbms/src/Operators/GetResultSinkOp.cpp b/dbms/src/Operators/GetResultSinkOp.cpp index bc904676b6b..db52776aec7 100644 --- a/dbms/src/Operators/GetResultSinkOp.cpp +++ b/dbms/src/Operators/GetResultSinkOp.cpp @@ -38,7 +38,7 @@ OperatorStatus GetResultSinkOp::tryFlush() switch (queue_result) { case MPMCQueueResult::FULL: - setNotifyFuture(result_queue); + setNotifyFuture(result_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: t_block.reset(); diff --git a/dbms/src/Operators/HashProbeTransformExec.cpp b/dbms/src/Operators/HashProbeTransformExec.cpp index fe1bf03521a..bc156cfed92 100644 --- a/dbms/src/Operators/HashProbeTransformExec.cpp +++ b/dbms/src/Operators/HashProbeTransformExec.cpp @@ -131,7 +131,7 @@ bool HashProbeTransformExec::prepareProbeRestoredBlock() case MPMCQueueResult::OK: return true; case MPMCQueueResult::EMPTY: - setNotifyFuture(probe_source_holder); + setNotifyFuture(probe_source_holder.get()); return false; case MPMCQueueResult::FINISHED: case MPMCQueueResult::CANCELLED: diff --git a/dbms/src/Operators/SharedAggregateRestorer.cpp b/dbms/src/Operators/SharedAggregateRestorer.cpp index c726823d49a..741f1f72c6b 100644 --- a/dbms/src/Operators/SharedAggregateRestorer.cpp +++ b/dbms/src/Operators/SharedAggregateRestorer.cpp @@ -203,7 +203,7 @@ bool SharedAggregateRestorer::tryPop(Block & block) case SharedLoadResult::FINISHED: return true; case SharedLoadResult::WAIT: - setNotifyFuture(loader); + setNotifyFuture(loader.get()); return false; } } diff --git a/dbms/src/Operators/SharedQueue.cpp b/dbms/src/Operators/SharedQueue.cpp index 11269cd21e5..12ba634d8c8 100644 --- a/dbms/src/Operators/SharedQueue.cpp +++ b/dbms/src/Operators/SharedQueue.cpp @@ -96,7 +96,7 @@ OperatorStatus SharedQueueSinkOp::tryFlush() switch (queue_result) { case MPMCQueueResult::FULL: - setNotifyFuture(shared_queue); + setNotifyFuture(shared_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: buffer.reset(); @@ -118,7 +118,7 @@ OperatorStatus SharedQueueSourceOp::readImpl(Block & block) switch (queue_result) { case MPMCQueueResult::EMPTY: - setNotifyFuture(shared_queue); + setNotifyFuture(shared_queue.get()); return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: return OperatorStatus::HAS_OUTPUT; diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index d7bd2d6188d..09533826dc6 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -45,7 +45,7 @@ OperatorStatus UnorderedSourceOp::readImpl(Block & block) { if (!task_pool->tryPopBlock(block)) { - setNotifyFuture(task_pool); + setNotifyFuture(task_pool.get()); return OperatorStatus::WAIT_FOR_NOTIFY; }