Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stable async setting #4485

Merged
merged 7 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ ::grpc::Status FlashService::DispatchMPPTask(
return status;
}

MPPHandler mpp_handler(*request);
MPPHandler mpp_handler(*request, is_async_enabled);
return mpp_handler.execute(context, response);
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class FlashService : public tikvpb::Tikv::Service
IServer & server;
const TiFlashSecurityConfig & security_config;
Poco::Logger * log;
bool is_async_enabled = false;

// Put thread pool member(s) at the end so that ensure it will be destroyed firstly.
std::unique_ptr<ThreadPool> cop_pool, batch_cop_pool;
Expand All @@ -97,6 +98,7 @@ class AsyncFlashService final : public FlashService
explicit AsyncFlashService(IServer & server)
: FlashService(server)
{
is_async_enabled = true;
::grpc::Service::MarkMethodAsync(EstablishMPPConnectionApiID);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ grpc::Status MPPHandler::execute(const ContextPtr & context, mpp::DispatchTaskRe
try
{
Stopwatch stopwatch;
task = MPPTask::newTask(task_request.meta(), context);
task = MPPTask::newTask(task_request.meta(), context, is_async_enabled);

task->prepare(task_request);
for (const auto & table_region_info : context->getDAGContext()->tables_regions_info.getTableRegionsInfoMap())
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ class MPPHandler
const mpp::DispatchTaskRequest & task_request;

Poco::Logger * log;
bool is_async_enabled;

public:
MPPHandler(const mpp::DispatchTaskRequest & task_request_)
MPPHandler(const mpp::DispatchTaskRequest & task_request_, bool is_async_enabled)
: task_request(task_request_)
, log(&Poco::Logger::get("MPPHandler"))
, is_async_enabled(is_async_enabled)
{}
grpc::Status execute(const ContextPtr & context, mpp::DispatchTaskResponse * response);
void handleError(const MPPTaskPtr & task, String error);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ extern const char exception_during_mpp_write_err_to_tunnel[];
extern const char force_no_local_region_for_mpp_task[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_, bool is_async_enabled)
: context(context_)
, meta(meta_)
, id(meta.start_ts(), meta.task_id())
, log(Logger::get("MPPTask", id.toString()))
, mpp_task_statistics(id, meta.address())
, schedule_state(ScheduleState::WAITING)
, is_async_enabled(is_async_enabled)
{}

MPPTask::~MPPTask()
Expand Down Expand Up @@ -222,7 +223,7 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
if (!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i)))
throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest);
bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
bool is_async = !is_local && is_async_enabled;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(task_meta, task_request.meta(), timeout, context->getSettingsRef().max_threads, is_local, is_async, log->identifier());
LOG_FMT_DEBUG(log, "begin to register the tunnel {}", tunnel->id());
registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
~MPPTask();

private:
MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_);
MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_, bool is_async_enabled);

void runImpl();

Expand Down Expand Up @@ -141,6 +141,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
std::mutex schedule_mu;
std::condition_variable schedule_cv;
ScheduleState schedule_state;
bool is_async_enabled;
};

using MPPTaskPtr = std::shared_ptr<MPPTask>;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class Server::FlashGrpcServerHolder
}

/// Init and register flash service.
bool enable_async_server = server.context().getSettingsRef().enable_async_server;
enable_async_server = server.context().getSettingsRef().enable_async_server;
if (enable_async_server)
flash_service = std::make_unique<AsyncFlashService>(server);
else
Expand Down Expand Up @@ -672,6 +672,7 @@ class Server::FlashGrpcServerHolder
private:
Poco::Logger * log;
std::shared_ptr<std::atomic<bool>> is_shutdown;
bool enable_async_server;
std::unique_ptr<FlashService> flash_service = nullptr;
std::unique_ptr<DiagnosticsService> diagnostics_service = nullptr;
std::unique_ptr<grpc::Server> flash_grpc_server = nullptr;
Expand Down