Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream #4285

Merged
merged 8 commits into from
Mar 15, 2022
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt
parent.exceptions[thread_num] = exception;
/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
parent.processor.cancel(false);
/// kill the processor so ExchangeReceiver will be closed
parent.processor.cancel(true);
}


Expand Down
33 changes: 22 additions & 11 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ struct OutputData<StreamUnionMode::Basic>
Block block;
std::exception_ptr exception;

OutputData() {}
OutputData() = default;
explicit OutputData(Block & block_)
: block(block_)
{}
explicit OutputData(std::exception_ptr & exception_)
explicit OutputData(const std::exception_ptr & exception_)
: exception(exception_)
{}
};
Expand All @@ -57,12 +57,12 @@ struct OutputData<StreamUnionMode::ExtraInfo>
BlockExtraInfo extra_info;
std::exception_ptr exception;

OutputData() {}
OutputData() = default;
OutputData(Block & block_, BlockExtraInfo & extra_info_)
: block(block_)
, extra_info(extra_info_)
{}
explicit OutputData(std::exception_ptr & exception_)
explicit OutputData(const std::exception_ptr & exception_)
: exception(exception_)
{}
};
Expand Down Expand Up @@ -279,6 +279,23 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
* otherwise ParallelInputsProcessor can be blocked during insertion into the queue.
*/
OutputQueue output_queue;
std::mutex mu;
bool meet_exception = false;
windtalker marked this conversation as resolved.
Show resolved Hide resolved

void handleException(const std::exception_ptr & exception)
{
std::unique_lock lock(mu);
if (meet_exception)
return;
meet_exception = true;
/// The order of the rows matters. If it is changed, then the situation is possible,
/// when before exception, an empty block (end of data) will be put into the queue,
/// and the exception is lost.
output_queue.emplace(exception);
/// can not cancel itself or the exception might be lost
/// kill the processor so ExchangeReceiver will be closed
processor.cancel(true);
}

struct Handler
{
Expand Down Expand Up @@ -309,13 +326,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream

void onException(std::exception_ptr & exception, size_t /*thread_num*/)
{
/// The order of the rows matters. If it is changed, then the situation is possible,
/// when before exception, an empty block (end of data) will be put into the queue,
/// and the exception is lost.

parent.output_queue.emplace(exception);
/// can not cancel parent inputStream or the exception might be lost
parent.processor.cancel(false); /// Does not throw exceptions.
parent.handleException(exception);
}

String getName() const
Expand Down