Skip to content

Commit

Permalink
Ping service (#12100)
Browse files Browse the repository at this point in the history
  • Loading branch information
eivanov89 authored Nov 30, 2024
1 parent c8c54d4 commit d7f99d9
Show file tree
Hide file tree
Showing 25 changed files with 974 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
#include <ydb/services/backup/grpc_service.h>
#include <ydb/services/ydb/ydb_logstore.h>
#include <ydb/services/ydb/ydb_operation.h>
#include <ydb/services/ydb/ydb_debug.h>
#include <ydb/services/ydb/ydb_query.h>
#include <ydb/services/ydb/ydb_scheme.h>
#include <ydb/services/ydb/ydb_scripting.h>
Expand Down Expand Up @@ -883,6 +884,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}

server.AddService(new NGRpcService::TGRpcYdbDebugService(ActorSystem.Get(), Counters,
grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));

if (hasLogStore) {
server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasLogStore.IsRlAllowed()));
Expand Down
251 changes: 251 additions & 0 deletions ydb/core/grpc_services/rpc_ping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
#include "service_debug.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <ydb/public/api/protos/ydb_debug.pb.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;

namespace {

using namespace NActors;

////////////////////////////////////////////////////////////////////////////////

using TEvKqpProxyRequest = TGrpcRequestNoOperationCall<Debug::KqpProxyRequest, Debug::KqpProxyResponse>;

class TExecuteKqpPingRPC : public TActorBootstrapped<TExecuteKqpPingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteKqpPingRPC(TEvKqpProxyRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(NKqp::TEvKqp::TEvProxyPingResponse, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to KQP proxy");
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), new NKqp::TEvKqp::TEvProxyPingRequest())) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(NKqp::TEvKqp::TEvProxyPingResponse::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteKqpPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvKqpProxyRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvSchemeCacheRequest = TGrpcRequestNoOperationCall<Debug::SchemeCacheRequest, Debug::SchemeCacheResponse>;

class TExecuteSchemeCachePingRPC : public TActorBootstrapped<TExecuteSchemeCachePingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteSchemeCachePingRPC(TEvSchemeCacheRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to SchemeCache");

auto* request = new TEvTxProxySchemeCache::TEvNavigateKeySet(new NSchemeCache::TSchemeCacheNavigate());
if (!ctx.Send(MakeSchemeCacheID(), request)) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to SchemeCache");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from SchemeCache");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteSchemeCachePingRPC in state " << state <<
" received unexpected event " << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvSchemeCacheRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvTxProxyRequest = TGrpcRequestNoOperationCall<Debug::TxProxyRequest, Debug::TxProxyResponse>;

class TExecuteTxProxyPingRPC : public TActorBootstrapped<TExecuteTxProxyPingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteTxProxyPingRPC(TEvTxProxyRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to TxProxy");
if (!ctx.Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId)) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to TxProxy");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from TxProxy");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteTxProxyPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvTxProxyRequest> Request_;
};

} // anonymous

////////////////////////////////////////////////////////////////////////////////

void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
// we are in the GRPC proxy already (or in the check actor in case of auth check),
// thus ready to reply right here
using TRequest = TGrpcRequestNoOperationCall<Debug::GrpcProxyRequest, Debug::GrpcProxyResponse>;
TRequest* request = dynamic_cast<TRequest *>(p.get());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoGrpcProxyPing");
request->ReplyWithYdbStatus(StatusIds::SUCCESS);
}

void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvKqpProxyRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoKqpPing");
f.RegisterActor(new TExecuteKqpPingRPC(request));
}

void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvSchemeCacheRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoSchemeCachePing");
f.RegisterActor(new TExecuteSchemeCachePingRPC(request));
}

void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvTxProxyRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoTxProxyPing");
f.RegisterActor(new TExecuteTxProxyPingRPC(request));
}

} // namespace NKikimr::NGRpcService
16 changes: 16 additions & 0 deletions ydb/core/grpc_services/service_debug.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <memory>

namespace NKikimr::NGRpcService {

class IRequestOpCtx;
class IRequestNoOpCtx;
class IFacilityProvider;

void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ SRCS(
rpc_make_directory.cpp
rpc_modify_permissions.cpp
rpc_monitoring.cpp
rpc_ping.cpp
rpc_prepare_data_query.cpp
rpc_rate_limiter_api.cpp
rpc_read_columns.cpp
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ enum class ERequestType: size_t {
BSCONFIG_REPLACESTORAGECONFIG,
BSCONFIG_FETCHSTORAGECONFIG,

PING_GRPC,
PING_PROXY,
PING_KQP,
PING_SCHEME_CACHE,
PING_TX_PROXY,

REQUEST_TYPES_CNT, // Add new types above this line
};

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ struct TEvKqp {
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};

struct TEvProxyPingRequest : public TEventLocal<TEvProxyPingRequest, TKqpEvents::EvProxyPingRequest> {
};

struct TEvProxyPingResponse : public TEventLocal<TEvProxyPingResponse, TKqpEvents::EvProxyPingResponse> {
};
};

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct TKqpEvents {
EvDelayedRequestError,
EvBufferWrite,
EvBufferWriteResult,
EvProxyPingRequest,
EvProxyPingResponse,
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Send(ev->Sender, responseEv.Release(), 0, ev->Cookie);
}

void Handle(TEvKqp::TEvProxyPingRequest::TPtr& ev) {
Send(ev->Sender, new TEvKqp::TEvProxyPingResponse());
}

void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!DatabasesCache.SetDatabaseIdOrDefer(ev, static_cast<i32>(EDelayedRequestType::QueryRequest), ActorContext())) {
return;
Expand Down Expand Up @@ -1349,6 +1353,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
hFunc(TEvKqp::TEvProxyPingRequest, Handle);
hFunc(TEvKqp::TEvQueryRequest, Handle);
hFunc(TEvKqp::TEvScriptRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SRCS(
ydb_auth_v1.proto
ydb_cms_v1.proto
ydb_coordination_v1.proto
ydb_debug_v1.proto
ydb_discovery_v1.proto
ydb_export_v1.proto
ydb_import_v1.proto
Expand Down
14 changes: 14 additions & 0 deletions ydb/public/api/grpc/ydb_debug_v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package Ydb.Debug.V1;
option java_package = "com.yandex.ydb.debug.v1";

import "ydb/public/api/protos/ydb_debug.proto";

service DebugService {
rpc PingPlainGrpc(Debug.PlainGrpcRequest) returns (Debug.PlainGrpcResponse);
rpc PingGrpcProxy(Debug.GrpcProxyRequest) returns (Debug.GrpcProxyResponse);
rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse);
rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse);
rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse);
}
1 change: 1 addition & 0 deletions ydb/public/api/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ SRCS(
ydb_federation_discovery.proto
persqueue_error_codes_v1.proto
ydb_auth.proto
ydb_debug.proto
ydb_persqueue_v1.proto
ydb_persqueue_cluster_discovery.proto
ydb_clickhouse_internal.proto
Expand Down
Loading

0 comments on commit d7f99d9

Please sign in to comment.