Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for ExchangeSender #9072

Merged
merged 19 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbms/src/Common/GRPCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class GRPCSendQueue

bool isWritable() const { return send_queue.isWritable(); }

void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems these registerPipeReadTasks are not used in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are used in the exchange receiver PR, but not in this PR.
https://github.com/pingcap/tiflash/pull/9073/files

void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); }

private:
friend class tests::TestGRPCSendQueue;

Expand Down Expand Up @@ -297,6 +300,9 @@ class GRPCRecvQueue

bool isWritable() const { return recv_queue.isWritable(); }

void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); }

private:
friend class tests::TestGRPCRecvQueue;

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Core/Block.h>
#include <Flash/Coprocessor/WaitResult.h>
#include <common/types.h>
#include <tipb/select.pb.h>

Expand All @@ -30,12 +31,13 @@ class DAGResponseWriter
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;

// For async writer, `isWritable` need to be called before calling `write`.
// For async writer, `waitForWritable` need to be called before calling `write`.
// ```
// while (!isWritable()) {}
// auto res = waitForWritable();
// switch (res) case...
// write(block);
// ```
virtual bool isWritable() const { throw Exception("Unsupport"); }
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Common/Exception.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/WaitResult.h>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
Expand Down Expand Up @@ -57,7 +58,7 @@ struct CopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isWritable() const { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
};

struct BatchCopStreamWriter
Expand All @@ -81,7 +82,7 @@ struct BatchCopStreamWriter
if (!writer->Write(resp))
throw Exception("Failed to write resp");
}
bool isWritable() const { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
};

using CopStreamWriterPtr = std::shared_ptr<CopStreamWriter>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::isWritable() const
WaitResult StreamingDAGResponseWriter<StreamWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class StreamWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Flash/Coprocessor/WaitResult.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace DB
{
enum class WaitResult
{
Ready,
WaitForPolling,
WaitForNotify
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ struct MockStreamWriter
{}

void write(tipb::SelectResponse & response) { checker(response); }
static bool isWritable() { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }

private:
MockStreamWriterChecker checker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ struct MockWriter
queue->push(tracked_packet);
}
static uint16_t getPartitionNum() { return 1; }
static bool isWritable() { throw Exception("Unsupport async write"); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }

std::vector<tipb::FieldType> result_field_types;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ PipelineExecutor::PipelineExecutor(
/*query_id=*/context.getDAGContext()->isMPPTask() ? context.getDAGContext()->getMPPTaskId().toString() : "",
req_id,
memory_tracker_,
context.getDAGContext(),
auto_spill_trigger,
register_operator_spill_context,
context.getDAGContext()->getResourceGroupName())
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Operators/SharedQueue.h>

Expand Down Expand Up @@ -52,6 +55,24 @@ String PipelineExecutorContext::getExceptionMsg()
}
}

String PipelineExecutorContext::getTrimmedErrMsg()
{
try
{
auto cur_exception_ptr = getExceptionPtr();
if (!cur_exception_ptr)
return "";
std::rethrow_exception(cur_exception_ptr);
}
catch (...)
{
auto err_msg = getCurrentExceptionMessage(true, true);
if (likely(!err_msg.empty()))
trimStackTrace(err_msg);
return err_msg;
}
}

void PipelineExecutorContext::onErrorOccurred(const String & err_msg)
{
DB::Exception e(err_msg);
Expand Down Expand Up @@ -155,6 +176,12 @@ void PipelineExecutorContext::cancel()
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release))
{
cancelSharedQueues();
if (likely(dag_context))
{
// Cancel the tunnel_set here to prevent pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
if (dag_context->tunnel_set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tunnels will be always closed at the destruct of mpp task, is that too late so you need to close it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if a pipeline task is in the wait_for_notify state, and another pipeline task throws an error and cannot terminate the query, then abortMPPTunnel cannot be called, this is a deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

dag_context->tunnel_set->close(getTrimmedErrMsg(), false);
}
cancelResultQueueIfNeed();
if likely (TaskScheduler::instance && !query_id.empty())
TaskScheduler::instance->cancel(query_id, resource_group_name);
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ using RegisterOperatorSpillContext = std::function<void(const std::shared_ptr<Op
class SharedQueue;
using SharedQueuePtr = std::shared_ptr<SharedQueue>;

class DAGContext;

class PipelineExecutorContext : private boost::noncopyable
{
public:
Expand All @@ -51,12 +53,14 @@ class PipelineExecutorContext : private boost::noncopyable
const String & query_id_,
const String & req_id,
const MemoryTrackerPtr & mem_tracker_,
DAGContext * dag_context_ = nullptr,
AutoSpillTrigger * auto_spill_trigger_ = nullptr,
const RegisterOperatorSpillContext & register_operator_spill_context_ = nullptr,
const String & resource_group_name_ = "")
: query_id(query_id_)
, log(Logger::get(req_id))
, mem_tracker(mem_tracker_)
, dag_context(dag_context_)
, auto_spill_trigger(auto_spill_trigger_)
, register_operator_spill_context(register_operator_spill_context_)
, resource_group_name(resource_group_name_)
Expand Down Expand Up @@ -134,6 +138,8 @@ class PipelineExecutorContext : private boost::noncopyable
private:
bool setExceptionPtr(const std::exception_ptr & exception_ptr_);

String getTrimmedErrMsg();

// Need to be called under lock.
bool isWaitMode();

Expand All @@ -149,6 +155,8 @@ class PipelineExecutorContext : private boost::noncopyable

MemoryTrackerPtr mem_tracker;

DAGContext * dag_context{nullptr};

std::mutex mu;
std::condition_variable cv;
std::exception_ptr exception_ptr;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::isWritable() const
WaitResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::isWritable() const
WaitResult FineGrainedShuffleWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
tipb::CompressionMode compression_mode_);
void prepare(const Block & sample_block) override;
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/HashPartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ void HashPartitionWriter<ExchangeWriterPtr>::flush()
}

template <class ExchangeWriterPtr>
bool HashPartitionWriter<ExchangeWriterPtr>::isWritable() const
WaitResult HashPartitionWriter<ExchangeWriterPtr>::waitForWritable() const
{
return writer->isWritable();
return writer->waitForWritable();
}

template <class ExchangeWriterPtr>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/HashPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HashPartitionWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_);
void write(const Block & block) override;
bool isWritable() const override;
WaitResult waitForWritable() const override;
void flush() override;

private:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct LocalRequestHandler

bool isWritable() const { return msg_queue->isWritable(); }

void registerPipeReadTask(TaskPtr && task) const { msg_queue->registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) const { msg_queue->registerPipeWriteTask(std::move(task)); }

void writeDone(bool meet_error, const String & local_err_msg) const
{
notify_write_done(meet_error, local_err_msg);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,13 @@ void MPPTask::abort(const String & message, AbortType abort_type)
}
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status))
{
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// abort mpptunnels first because if others components are aborted
/// first, the mpptunnels may see an error caused by the abort, which is not
/// the original error
setErrString(message);
abortTunnels(message, false);
abortQueryExecutor();
abortReceivers();
abortQueryExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why abort receiver first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently useless and is only used to notify exchange_receiver for the same reason as #9072 (comment).

scheduleThisTask(ScheduleState::FAILED);
/// runImpl is running, leave remaining work to runImpl
LOG_WARNING(log, "Finish abort task from running");
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk)
throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id));
}

bool MPPTunnel::isWritable() const
WaitResult MPPTunnel::waitForWritable() const
{
std::unique_lock lk(mu);
switch (status)
Expand All @@ -396,12 +396,17 @@ bool MPPTunnel::isWritable() const
if (unlikely(timeout_stopwatch->elapsed() > timeout_nanoseconds))
throw Exception(fmt::format("{} is timeout", tunnel_id));
}
return false;
return WaitResult::WaitForPolling;
}
case TunnelStatus::Connected:
case TunnelStatus::WaitingForSenderFinish:
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
return tunnel_sender->isWritable();
if (!tunnel_sender->isWritable())
{
setNotifyFuture(tunnel_sender);
return WaitResult::WaitForNotify;
}
return WaitResult::Ready;
case TunnelStatus::Finished:
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
throw Exception(fmt::format(
Expand Down
Loading