Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream #4285

Merged
merged 8 commits into from
Mar 15, 2022
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt
parent.exceptions[thread_num] = exception;
/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
parent.processor.cancel(false);
/// kill the processor so ExchangeReceiver will be closed
parent.processor.cancel(true);
}


Expand Down
15 changes: 13 additions & 2 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
* otherwise ParallelInputsProcessor can be blocked during insertion into the queue.
*/
OutputQueue output_queue;
std::mutex mu;
bool meet_exception = false;
windtalker marked this conversation as resolved.
Show resolved Hide resolved
void handleException(std::exception_ptr & exception)
windtalker marked this conversation as resolved.
Show resolved Hide resolved
{
std::unique_lock<std::mutex> lock(mu);
windtalker marked this conversation as resolved.
Show resolved Hide resolved
if (meet_exception)
return;
meet_exception = true;
output_queue.emplace(exception);
}

struct Handler
{
Expand Down Expand Up @@ -313,9 +323,10 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
/// when before exception, an empty block (end of data) will be put into the queue,
/// and the exception is lost.

parent.output_queue.emplace(exception);
parent.handleException(exception);
/// can not cancel parent inputStream or the exception might be lost
parent.processor.cancel(false); /// Does not throw exceptions.
/// kill the processor so ExchangeReceiver will be closed
parent.processor.cancel(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not also do parent.processor.cancel(true) in handleException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}

String getName() const
Expand Down