diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 7705ef8847a..79511dd30c8 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -183,7 +183,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt parent.exceptions[thread_num] = exception; /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) - parent.processor.cancel(false); + /// kill the processor so ExchangeReceiver will be closed + parent.processor.cancel(true); } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 6e6e1807a9c..275261c4331 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -26,11 +26,19 @@ struct OutputData Block block; std::exception_ptr exception; +<<<<<<< HEAD OutputData() {} OutputData(Block & block_) : block(block_) {} OutputData(std::exception_ptr & exception_) +======= + OutputData() = default; + explicit OutputData(Block & block_) + : block(block_) + {} + explicit OutputData(const std::exception_ptr & exception_) +>>>>>>> 752793bd7f (fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream (#4285)) : exception(exception_) {} }; @@ -43,12 +51,16 @@ struct OutputData BlockExtraInfo extra_info; std::exception_ptr exception; - OutputData() {} + OutputData() = default; OutputData(Block & block_, BlockExtraInfo & extra_info_) : block(block_) , extra_info(extra_info_) {} +<<<<<<< HEAD OutputData(std::exception_ptr & exception_) +======= + explicit OutputData(const std::exception_ptr & exception_) +>>>>>>> 752793bd7f (fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream (#4285)) : exception(exception_) {} }; @@ -263,6 +275,23 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream * otherwise ParallelInputsProcessor can be blocked during insertion into the queue. */ OutputQueue output_queue; + std::mutex mu; + bool meet_exception = false; + + void handleException(const std::exception_ptr & exception) + { + std::unique_lock lock(mu); + if (meet_exception) + return; + meet_exception = true; + /// The order of the rows matters. If it is changed, then the situation is possible, + /// when before exception, an empty block (end of data) will be put into the queue, + /// and the exception is lost. + output_queue.emplace(exception); + /// can not cancel itself or the exception might be lost + /// kill the processor so ExchangeReceiver will be closed + processor.cancel(true); + } struct Handler { @@ -291,6 +320,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream void onException(std::exception_ptr & exception, size_t /*thread_num*/) { +<<<<<<< HEAD //std::cerr << "pushing exception\n"; /// The order of the rows matters. If it is changed, then the situation is possible, @@ -300,6 +330,9 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream parent.output_queue.push(exception); /// can not cancel parent inputStream or the exception might be lost parent.processor.cancel(false); /// Does not throw exceptions. +======= + parent.handleException(exception); +>>>>>>> 752793bd7f (fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream (#4285)) } String getName() const