-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline: use notify instead of polling for ExchangeSender
#9072
Changes from 17 commits
fa590d8
62d3add
e7d8c9b
2ffee76
f76b7bd
563573b
e1e920c
c5655c6
249cfa9
d2814b8
d9640df
df23506
0c43a11
94d954b
dd1316f
4ddcf98
9c9ffe6
f39562e
a328e54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright 2024 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#pragma once | ||
|
||
namespace DB | ||
{ | ||
enum class WaitResult | ||
{ | ||
Ready, | ||
WaitForPolling, | ||
WaitForNotify | ||
}; | ||
} // namespace DB |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,11 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include <Flash/Coprocessor/DAGContext.h> | ||
#include <Flash/Executor/PipelineExecutorContext.h> | ||
#include <Flash/Executor/ResultQueue.h> | ||
#include <Flash/Mpp/MPPTunnelSet.h> | ||
#include <Flash/Mpp/Utils.h> | ||
#include <Flash/Pipeline/Schedule/TaskScheduler.h> | ||
#include <Operators/SharedQueue.h> | ||
|
||
|
@@ -52,6 +55,24 @@ String PipelineExecutorContext::getExceptionMsg() | |
} | ||
} | ||
|
||
String PipelineExecutorContext::getTrimmedErrMsg() | ||
{ | ||
try | ||
{ | ||
auto cur_exception_ptr = getExceptionPtr(); | ||
if (!cur_exception_ptr) | ||
return ""; | ||
std::rethrow_exception(cur_exception_ptr); | ||
} | ||
catch (...) | ||
{ | ||
auto err_msg = getCurrentExceptionMessage(true, true); | ||
if (likely(!err_msg.empty())) | ||
trimStackTrace(err_msg); | ||
return err_msg; | ||
} | ||
} | ||
|
||
void PipelineExecutorContext::onErrorOccurred(const String & err_msg) | ||
{ | ||
DB::Exception e(err_msg); | ||
|
@@ -155,6 +176,12 @@ void PipelineExecutorContext::cancel() | |
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) | ||
{ | ||
cancelSharedQueues(); | ||
if (likely(dag_context)) | ||
{ | ||
// Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified. | ||
if (dag_context->tunnel_set) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tunnels will be always closed at the destruct of mpp task, is that too late so you need to close it here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, if a pipeline task is in the wait_for_notify state, and another pipeline task throws an error and cannot terminate the query, then abortMPPTunnel cannot be called, this is a deadlock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
dag_context->tunnel_set->close(getTrimmedErrMsg(), false); | ||
} | ||
cancelResultQueueIfNeed(); | ||
if likely (TaskScheduler::instance && !query_id.empty()) | ||
TaskScheduler::instance->cancel(query_id, resource_group_name); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -759,13 +759,13 @@ void MPPTask::abort(const String & message, AbortType abort_type) | |
} | ||
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) | ||
{ | ||
/// abort the components from top to bottom because if bottom components are aborted | ||
/// first, the top components may see an error caused by the abort, which is not | ||
/// abort mpptunnels first because if others components are aborted | ||
/// first, the mpptunnels may see an error caused by the abort, which is not | ||
/// the original error | ||
setErrString(message); | ||
abortTunnels(message, false); | ||
abortQueryExecutor(); | ||
abortReceivers(); | ||
abortQueryExecutor(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why abort receiver first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is currently useless and is only used to notify exchange_receiver for the same reason as #9072 (comment). |
||
scheduleThisTask(ScheduleState::FAILED); | ||
/// runImpl is running, leave remaining work to runImpl | ||
LOG_WARNING(log, "Finish abort task from running"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems these
registerPipeReadTask
s are not used in this PR?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they are used in the exchange receiver PR, but not in this PR.
https://github.com/pingcap/tiflash/pull/9073/files