Skip to content

Commit

Permalink
stable async setting
Browse files Browse the repository at this point in the history
Signed-off-by: bestwoody <[email protected]>
  • Loading branch information
bestwoody committed Mar 29, 2022
1 parent e49339b commit a8c726e
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ ::grpc::Status FlashService::DispatchMPPTask(
return status;
}

MPPHandler mpp_handler(*request);
MPPHandler mpp_handler(*request, is_async_enabled);
return mpp_handler.execute(context, response);
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class FlashService : public tikvpb::Tikv::Service
IServer & server;
const TiFlashSecurityConfig & security_config;
Poco::Logger * log;
bool is_async_enabled = false;

// Put thread pool member(s) at the end so that ensure it will be destroyed firstly.
std::unique_ptr<ThreadPool> cop_pool, batch_cop_pool;
Expand All @@ -97,6 +98,7 @@ class AsyncFlashService final : public FlashService
explicit AsyncFlashService(IServer & server)
: FlashService(server)
{
is_async_enabled = true;
::grpc::Service::MarkMethodAsync(EstablishMPPConnectionApiID);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ grpc::Status MPPHandler::execute(const ContextPtr & context, mpp::DispatchTaskRe
try
{
Stopwatch stopwatch;
task = MPPTask::newTask(task_request.meta(), context);
task = MPPTask::newTask(task_request.meta(), context, is_async_enabled);

task->prepare(task_request);
for (const auto & table_region_info : context->getDAGContext()->tables_regions_info.getTableRegionsInfoMap())
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ class MPPHandler
const mpp::DispatchTaskRequest & task_request;

Poco::Logger * log;
bool is_async_enabled;

public:
MPPHandler(const mpp::DispatchTaskRequest & task_request_)
MPPHandler(const mpp::DispatchTaskRequest & task_request_, bool is_async_enabled)
: task_request(task_request_)
, log(&Poco::Logger::get("MPPHandler"))
, is_async_enabled(is_async_enabled)
{}
grpc::Status execute(const ContextPtr & context, mpp::DispatchTaskResponse * response);
void handleError(const MPPTaskPtr & task, String error);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ extern const char exception_during_mpp_write_err_to_tunnel[];
extern const char force_no_local_region_for_mpp_task[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_, bool is_async_enabled)
: context(context_)
, meta(meta_)
, id(meta.start_ts(), meta.task_id())
, log(Logger::get("MPPTask", id.toString()))
, mpp_task_statistics(id, meta.address())
, schedule_state(ScheduleState::WAITING)
, is_async_enabled(is_async_enabled)
{}

MPPTask::~MPPTask()
Expand Down Expand Up @@ -222,7 +223,7 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
if (!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i)))
throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest);
bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
bool is_async = !is_local && is_async_enabled;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(task_meta, task_request.meta(), timeout, context->getSettingsRef().max_threads, is_local, is_async, log->identifier());
LOG_FMT_DEBUG(log, "begin to register the tunnel {}", tunnel->id());
registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
~MPPTask();

private:
MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_);
MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_, bool is_async_enabled);

void runImpl();

Expand Down Expand Up @@ -141,6 +141,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
std::mutex schedule_mu;
std::condition_variable schedule_cv;
ScheduleState schedule_state;
bool is_async_enabled;
};

using MPPTaskPtr = std::shared_ptr<MPPTask>;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class Server::FlashGrpcServerHolder
}

/// Init and register flash service.
bool enable_async_server = server.context().getSettingsRef().enable_async_server;
enable_async_server = server.context().getSettingsRef().enable_async_server;
if (enable_async_server)
flash_service = std::make_unique<AsyncFlashService>(server);
else
Expand Down Expand Up @@ -672,6 +672,7 @@ class Server::FlashGrpcServerHolder
private:
Poco::Logger * log;
std::shared_ptr<std::atomic<bool>> is_shutdown;
bool enable_async_server;
std::unique_ptr<FlashService> flash_service = nullptr;
std::unique_ptr<DiagnosticsService> diagnostics_service = nullptr;
std::unique_ptr<grpc::Server> flash_grpc_server = nullptr;
Expand Down

0 comments on commit a8c726e

Please sign in to comment.