diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index 9781b951e2d5..385e4c7eded8 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -14,24 +14,24 @@ namespace NKikimr::NKqp::NWorkload { struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal { - TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId) - : Database(database) + TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId) + : DatabaseId(databaseId) , PoolId(poolId) {} - const TString Database; + const TString DatabaseId; const TString PoolId; }; struct TEvPlaceRequestIntoPool : public NActors::TEventLocal { - TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr userToken) - : Database(database) + TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr userToken) + : DatabaseId(databaseId) , SessionId(sessionId) , PoolId(poolId) , UserToken(userToken) {} - const TString Database; + const TString DatabaseId; const TString SessionId; TString PoolId; // Can be changed to default pool id TIntrusiveConstPtr UserToken; @@ -52,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal { - TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed) - : Database(database) + TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed) + : DatabaseId(databaseId) , SessionId(sessionId) , PoolId(poolId) , Duration(duration) , CpuConsumed(cpuConsumed) {} - const TString Database; + const TString DatabaseId; const TString SessionId; const TString PoolId; const TDuration Duration; @@ -78,23 +78,24 @@ struct TEvCleanupResponse : public NActors::TEventLocal { - TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject) - : Database(database) + TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional& config, const std::optional& securityObject) + : DatabaseId(databaseId) , PoolId(poolId) , Config(config) , SecurityObject(securityObject) {} - const TString Database; + const TString DatabaseId; const TString PoolId; const std::optional Config; const std::optional SecurityObject; }; struct TEvFetchDatabaseResponse : public NActors::TEventLocal { - TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues) : Status(status) , Database(database) + , DatabaseId(databaseId) , Serverless(serverless) , PathId(pathId) , Issues(std::move(issues)) @@ -102,6 +103,7 @@ struct TEvFetchDatabaseResponse : public NActors::TEventLocal& resultMap) { resultMap["TraceId"] = ctx.TraceId; resultMap["Database"] = ctx.Database; + resultMap["DatabaseId"] = ctx.DatabaseId; resultMap["SessionId"] = ctx.SessionId; resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId; resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId; diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 1aa4a0574d67..9a17252fccf7 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -11,6 +11,7 @@ namespace NKikimr::NKqp { struct TUserRequestContext : public TAtomicRefCount { TString TraceId; TString Database; + TString DatabaseId; TString SessionId; TString CurrentExecutionId; TString CustomerSuppliedId; diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp index b2066bbd8b88..241abd3cf82c 100644 --- a/ydb/core/kqp/common/simple/query_id.cpp +++ b/ydb/core/kqp/common/simple/query_id.cpp @@ -10,11 +10,12 @@ namespace NKikimr::NKqp { -TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text, +TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text, const TKqpQuerySettings& settings, std::shared_ptr> queryParameterTypes, const TGUCSettings& gUCSettings) : Cluster(cluster) , Database(database) + , DatabaseId(databaseId) , Text(text) , Settings(settings) , QueryParameterTypes(queryParameterTypes) @@ -42,6 +43,7 @@ bool TKqpQueryId::IsSql() const { bool TKqpQueryId::operator==(const TKqpQueryId& other) const { if (!(Cluster == other.Cluster && Database == other.Database && + DatabaseId == other.DatabaseId && UserSid == other.UserSid && Text == other.Text && Settings == other.Settings && @@ -79,6 +81,7 @@ TString TKqpQueryId::SerializeToString() const { TStringBuilder result = TStringBuilder() << "{" << "Cluster: " << Cluster << ", " << "Database: " << Database << ", " + << "DatabaseId: " << DatabaseId << ", " << "UserSid: " << UserSid << ", " << "Text: " << EscapeC(Text) << ", " << "Settings: " << Settings.SerializeToString() << ", "; diff --git a/ydb/core/kqp/common/simple/query_id.h b/ydb/core/kqp/common/simple/query_id.h index 7dd7ef0dc5a7..120d524d9808 100644 --- a/ydb/core/kqp/common/simple/query_id.h +++ b/ydb/core/kqp/common/simple/query_id.h @@ -13,6 +13,7 @@ namespace NKikimr::NKqp { struct TKqpQueryId { TString Cluster; TString Database; + TString DatabaseId; TString UserSid; TString Text; TKqpQuerySettings Settings; @@ -21,7 +22,7 @@ struct TKqpQueryId { TGUCSettings GUCSettings; public: - TKqpQueryId(const TString& cluster, const TString& database, const TString& text, + TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text, const TKqpQuerySettings& settings, std::shared_ptr> queryParameterTypes, const TGUCSettings& gUCSettings); diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index c41e850595ef..81697e4693b4 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -266,7 +266,7 @@ class TKqpCompileActor : public TActorBootstrapped { std::shared_ptr loader = std::make_shared( QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState); - Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader), + Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig); Gateway->SetToken(QueryId.Cluster, UserToken); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 5c94153437ab..d95b9a7b6a30 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -153,7 +153,7 @@ class TKqpQueryCache { } } } - return TKqpQueryId{query.Cluster, query.Database, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; + return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; } TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 4f872c6ae904..d6807db0802d 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -235,6 +235,7 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetSchedulerGroup(UserRequestContext->PoolId); request.SetDatabase(Database); + request.SetDatabaseId(UserRequestContext->DatabaseId); if (UserRequestContext->PoolConfig.has_value()) { request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode); request.SetPoolMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0); diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 15835cf536e5..746c51b748b1 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -72,6 +72,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { , RequestType(requestType) , KqpTempTablesAgentActor(kqpTempTablesAgentActor) { + YQL_ENSURE(RequestContext); YQL_ENSURE(PhyTx); YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME); @@ -403,6 +404,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { NMetadata::NModifications::IOperationsManager::TExternalModificationContext context; context.SetDatabase(Database); + context.SetDatabaseId(RequestContext->DatabaseId); context.SetActorSystem(actorSystem); if (UserToken) { context.SetUserToken(*UserToken); diff --git a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp index b358f0efcf1c..4efa22e7e83d 100644 --- a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp +++ b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp @@ -42,7 +42,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr [[maybe_unused]] TIntrusivePtr MakeIcGateway(const TKikimrRunner& kikimr) { auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem(); - return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(), + return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", "/Root", TKqpGatewaySettings(), actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0), TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig->GetQueryServiceConfig()); } diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp index 7200999c7ca1..10f4d6b1f54c 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp @@ -48,13 +48,13 @@ class TRanksCheckerActor : public NKikimr::TQueryBase { using TBase = NKikimr::TQueryBase; public: - TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map& ranksToCheck) + TRanksCheckerActor(const TString& databaseId, const TString& sessionId, const TString& transactionId, const std::unordered_map& ranksToCheck) : TBase(NKikimrServices::KQP_GATEWAY, sessionId) - , Database(database) + , DatabaseId(databaseId) , RanksToCheck(ranksToCheck) { TxId = transactionId; - SetOperationInfo(__func__, Database); + SetOperationInfo(__func__, DatabaseId); } void OnRunQuery() override { @@ -62,13 +62,13 @@ class TRanksCheckerActor : public NKikimr::TQueryBase { TStringBuilder sql = TStringBuilder() << R"( -- TRanksCheckerActor::OnRunQuery - DECLARE $database AS Text; + DECLARE $database_id AS Text; )"; NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(CanonizePath(Database)) + .AddParam("$database_id") + .Utf8(CanonizePath(DatabaseId)) .Build(); if (!RanksToCheck.empty()) { @@ -79,7 +79,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase { SELECT rank, name FROM `)" << tablePath << R"(` - WHERE database = $database + WHERE database = $database_id AND rank IN $ranks; )"; @@ -97,7 +97,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase { MAX(rank) AS MaxRank, COUNT(*) AS NumberClassifiers FROM `)" << tablePath << R"(` - WHERE database = $database; + WHERE database = $database_id; )"; RunDataQuery(sql, ¶ms, TTxControl::ContinueTx()); @@ -149,7 +149,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase { } private: - const TString Database; + const TString DatabaseId; const std::unordered_map RanksToCheck; ui64 ExpectedResultSets = 1; @@ -235,8 +235,8 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrappedGetClassifierConfig(CanonizePath(object.GetDatabase()), object.GetName())) { - FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database " << object.GetDatabase()); + if (!snapshot->GetClassifierConfig(object.GetDatabase(), object.GetName())) { + FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database with id " << object.GetDatabase()); return; } } @@ -279,7 +279,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped>( - SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames + SelfId(), Context.GetExternalData().GetDatabaseId(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames )); } diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/manager.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/manager.cpp index dc884cd029ba..5791ab5befa7 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/manager.cpp @@ -13,7 +13,7 @@ using namespace NResourcePool; NMetadata::NInternal::TTableRecord GetResourcePoolClassifierRecord(const NYql::TObjectSettingsImpl& settings, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context) { NMetadata::NInternal::TTableRecord result; - result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(CanonizePath(context.GetExternalData().GetDatabase()))); + result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(context.GetExternalData().GetDatabaseId())); result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Name, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId())); return result; } diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 0ab6d63d96c3..1ebd527bfe42 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -189,6 +189,7 @@ class IKqpGateway : public NYql::IKikimrGateway { public: virtual TString GetDatabase() = 0; + virtual TString GetDatabaseId() = 0; virtual bool GetDomainLoginOnly() = 0; virtual TMaybe GetDomainName() = 0; @@ -232,7 +233,7 @@ class IKqpGateway : public NYql::IKikimrGateway { const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe& traceId) = 0; }; -TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, +TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, const TString& databaseId, std::shared_ptr&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index ee8698df4acf..bbc6360eaf81 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -538,10 +538,11 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped& requestType, const TString& database, - TIntrusiveConstPtr userToken, TPromise promise) + const TString& databaseId, TIntrusiveConstPtr userToken, TPromise promise) : PhyTx(std::move(phyTx)) , QueryType(queryType) , Database(database) + , DatabaseId(databaseId) , UserToken(std::move(userToken)) , Promise(promise) , RequestType(requestType) @@ -549,6 +550,7 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped(); + ctx->DatabaseId = DatabaseId; IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx); Register(actor); Become(&TThis::WaitState); @@ -580,6 +582,7 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped UserToken; TPromise Promise; const TMaybe RequestType; @@ -702,11 +705,12 @@ class TKikimrIcGateway : public IKqpGateway { using TNavigate = NSchemeCache::TSchemeCacheNavigate; public: - TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr&& metadataLoader, + TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, const TString& databaseId, std::shared_ptr&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) : Cluster(cluster) , QueryType(queryType) , Database(database) + , DatabaseId(databaseId) , ActorSystem(actorSystem) , NodeId(nodeId) , Counters(counters) @@ -729,6 +733,10 @@ class TKikimrIcGateway : public IKqpGateway { return Database; } + TString GetDatabaseId() override { + return DatabaseId; + } + TMaybe GetSetting(const TString& cluster, const TString& name) override { Y_UNUSED(cluster); Y_UNUSED(name); @@ -1473,6 +1481,7 @@ class TKikimrIcGateway : public IKqpGateway { context.SetUserToken(*GetUserToken()); } context.SetDatabase(Owner.Database); + context.SetDatabaseId(Owner.DatabaseId); context.SetActorSystem(Owner.ActorSystem); return DoExecute(cBehaviour, settings, context).Apply([](const NThreading::TFuture& f) { if (f.HasValue() && !f.HasException() && f.GetValue().Ok()) { @@ -2244,7 +2253,7 @@ class TKikimrIcGateway : public IKqpGateway { TFuture SendSchemeExecuterRequest(const TString&, const TMaybe& requestType, const std::shared_ptr& phyTx) override { auto promise = NewPromise(); - IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, QueryType, requestType, Database, UserToken, promise); + IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, QueryType, requestType, Database, DatabaseId, UserToken, promise); RegisterActor(requestHandler); return promise.GetFuture(); } @@ -2324,6 +2333,7 @@ class TKikimrIcGateway : public IKqpGateway { TString Cluster; const NKikimrKqp::EQueryType QueryType; TString Database; + TString DatabaseId; TActorSystem* ActorSystem; ui32 NodeId; TKqpRequestCounters::TPtr Counters; @@ -2335,11 +2345,11 @@ class TKikimrIcGateway : public IKqpGateway { } // namespace -TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, +TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, const TString& databaseId, std::shared_ptr&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) { - return MakeIntrusive(cluster, queryType, database, std::move(metadataLoader), actorSystem, nodeId, + return MakeIntrusive(cluster, queryType, database, databaseId, std::move(metadataLoader), actorSystem, nodeId, counters, queryServiceConfig); } diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 1df1ebd46a2b..442914cc67d1 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1255,6 +1255,7 @@ class TKqpGatewayProxy : public IKikimrGateway { NMetadata::NModifications::IOperationsManager::TExternalModificationContext context; context.SetDatabase(SessionCtx->GetDatabase()); + context.SetDatabaseId(SessionCtx->GetDatabaseId()); context.SetActorSystem(ActorSystem); if (SessionCtx->GetUserToken()) { context.SetUserToken(*SessionCtx->GetUserToken()); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index c98dc7453a20..c7da79a66566 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1063,6 +1063,7 @@ class TKqpHost : public IKqpHost { } SessionCtx->SetDatabase(database); + SessionCtx->SetDatabaseId(Gateway->GetDatabaseId()); SessionCtx->SetCluster(cluster); if (tempTablesState) { SessionCtx->SetSessionId(tempTablesState->SessionId); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index f3fa1611b8e8..7f993d150981 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -208,7 +208,7 @@ class TKqpNodeService : public TActorBootstrapped { } if (share > 0) { Scheduler->UpdateGroupShare(schedulerGroup, share, schedulerNow); - Send(SchedulerActorId, new TEvSchedulerNewPool(msg.GetDatabase(), schedulerGroup)); + Send(SchedulerActorId, new TEvSchedulerNewPool(msg.GetDatabaseId(), schedulerGroup)); } else { schedulerGroup = ""; } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 79777b77307f..a5422321bcf8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -74,7 +74,7 @@ TIntrusivePtr GetIcGateway(Tests::TServer& server) { counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr loader = std::make_shared(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr(nullptr), false); - return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), + return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig()); } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 59660484b455..2c14581e15f0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -493,6 +493,10 @@ class TKikimrSessionContext : public TThrRefBase { return Database; } + TString GetDatabaseId() const { + return DatabaseId; + } + const TString& GetSessionId() const { return SessionId; } @@ -505,6 +509,10 @@ class TKikimrSessionContext : public TThrRefBase { Database = database; } + void SetDatabaseId(const TString& databaseId) { + DatabaseId = databaseId; + } + void SetSessionId(const TString& sessionId) { SessionId = sessionId; } @@ -543,6 +551,7 @@ class TKikimrSessionContext : public TThrRefBase { TString UserName; TString Cluster; TString Database; + TString DatabaseId; TString SessionId; TKikimrConfiguration::TPtr Configuration; TIntrusivePtr TablesData; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index ebac128f8f9e..4b949031032c 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -94,7 +94,9 @@ class TDatabaseSubscriberActor : public TActor { } databaseStateIt->FetchRequestIsRunning = false; - UpdateDatabaseState(*databaseStateIt, ev->Get()->PathId, ev->Get()->Serverless); + databaseStateIt->LastUpdateTime = TInstant::Now(); + databaseStateIt->DatabaseId = ev->Get()->DatabaseId; + databaseStateIt->Serverless = ev->Get()->Serverless; SendSubscriberInfo(*databaseStateIt, ev->Get()->Status, ev->Get()->Issues); if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { @@ -151,12 +153,6 @@ class TDatabaseSubscriberActor : public TActor { ) private: - static void UpdateDatabaseState(TDatabaseState& databaseState, TPathId pathId, bool serverless) { - databaseState.LastUpdateTime = TInstant::Now(); - databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << databaseState.Database; - databaseState.Serverless = serverless; - } - void UnsubscribeFromSchemeCache(TDatabaseState& databaseState) const { if (databaseState.WatchKey) { Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(databaseState.WatchKey)); @@ -198,13 +194,6 @@ TDatabasesCache::TDatabasesCache(TDuration idleTimeout) : IdleTimeout(idleTimeout) {} -const TString& TDatabasesCache::GetTenantName() { - if (!TenantName) { - TenantName = CanonizePath(AppData()->TenantName); - } - return TenantName; -} - void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { auto it = DatabasesCache.find(event->Get()->Database); if (it == DatabasesCache.end()) { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 022ddadc58c8..bf782ec16021 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1586,19 +1586,19 @@ class TKqpProxyService : public TActorBootstrapped { bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) { ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext()); - const auto& database = ev->Get()->GetDatabase(); - if (!ResourcePoolsCache.ResourcePoolsEnabled(database)) { + const auto& databaseId = ev->Get()->GetDatabaseId(); + if (!ResourcePoolsCache.ResourcePoolsEnabled(databaseId)) { ev->Get()->SetPoolId(""); return true; } const auto& userToken = ev->Get()->GetUserToken(); if (!ev->Get()->GetPoolId()) { - ev->Get()->SetPoolId(ResourcePoolsCache.GetPoolId(database, userToken, ActorContext())); + ev->Get()->SetPoolId(ResourcePoolsCache.GetPoolId(databaseId, userToken, ActorContext())); } const auto& poolId = ev->Get()->GetPoolId(); - const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(database, poolId, ActorContext()); + const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(databaseId, poolId, ActorContext()); if (!poolInfo) { return true; } @@ -1857,12 +1857,12 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { - ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); + ResourcePoolsCache.UpdatePoolInfo(ev->Get()->DatabaseId, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->DatabaseId, ev->Get()->Serverless); } DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 3d40621124e4..94f8f0526dff 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -432,9 +432,9 @@ class TResourcePoolsCache { }; struct TDatabaseInfo { - std::unordered_map ResourcePoolsClassifiers = {}; - std::map RankToClassifierInfo = {}; - std::unordered_map> UserToResourcePool = {}; + std::unordered_map ResourcePoolsClassifiers = {}; // Classifier name to config + std::map RankToClassifierInfo = {}; // Classifier rank to config + std::unordered_map> UserToResourcePool = {}; // UserSID to (resource pool, classifier rank) bool Serverless = false; }; @@ -445,7 +445,7 @@ class TResourcePoolsCache { }; public: - bool ResourcePoolsEnabled(const TString& database) const { + bool ResourcePoolsEnabled(const TString& databaseId) const { if (!EnableResourcePools) { return false; } @@ -454,19 +454,19 @@ class TResourcePoolsCache { return true; } - const auto databaseInfo = GetDatabaseInfo(database); + const auto databaseInfo = GetDatabaseInfo(databaseId); return !databaseInfo || !databaseInfo->Serverless; } - TString GetPoolId(const TString& database, const TIntrusiveConstPtr& userToken, TActorContext actorContext) { + TString GetPoolId(const TString& databaseId, const TIntrusiveConstPtr& userToken, TActorContext actorContext) { if (!userToken || userToken->GetUserSID().empty()) { return NResourcePool::DEFAULT_POOL_ID; } - TDatabaseInfo& databaseInfo = *GetOrCreateDatabaseInfo(database); - auto [resultPoolId, resultRank] = GetPoolIdFromClassifiers(database, userToken->GetUserSID(), databaseInfo, userToken, actorContext); + TDatabaseInfo& databaseInfo = *GetOrCreateDatabaseInfo(databaseId); + auto [resultPoolId, resultRank] = GetPoolIdFromClassifiers(databaseId, userToken->GetUserSID(), databaseInfo, userToken, actorContext); for (const auto& userSID : userToken->GetGroupSIDs()) { - const auto& [poolId, rank] = GetPoolIdFromClassifiers(database, userSID, databaseInfo, userToken, actorContext); + const auto& [poolId, rank] = GetPoolIdFromClassifiers(databaseId, userSID, databaseInfo, userToken, actorContext); if (poolId && (!resultPoolId || resultRank > rank)) { resultPoolId = poolId; resultRank = rank; @@ -476,10 +476,10 @@ class TResourcePoolsCache { return resultPoolId ? resultPoolId : NResourcePool::DEFAULT_POOL_ID; } - std::optional GetPoolInfo(const TString& database, const TString& poolId, TActorContext actorContext) const { - auto it = PoolsCache.find(GetPoolKey(database, poolId)); + std::optional GetPoolInfo(const TString& databaseId, const TString& poolId, TActorContext actorContext) const { + auto it = PoolsCache.find(GetPoolKey(databaseId, poolId)); if (it == PoolsCache.end()) { - actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, poolId)); + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(databaseId, poolId)); return std::nullopt; } return it->second; @@ -491,14 +491,14 @@ class TResourcePoolsCache { UpdateResourcePoolClassifiersSubscription(actorContext); } - void UpdateDatabaseInfo(const TString& database, bool serverless) { - GetOrCreateDatabaseInfo(database)->Serverless = serverless; + void UpdateDatabaseInfo(const TString& databaseId, bool serverless) { + GetOrCreateDatabaseInfo(databaseId)->Serverless = serverless; } - void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject, TActorContext actorContext) { + void UpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional& config, const std::optional& securityObject, TActorContext actorContext) { bool clearClassifierCache = false; - const TString& poolKey = GetPoolKey(database, poolId); + const TString& poolKey = GetPoolKey(databaseId, poolId); if (!config) { auto it = PoolsCache.find(poolKey); if (it == PoolsCache.end()) { @@ -511,7 +511,7 @@ class TResourcePoolsCache { } else { // Refresh pool subscription it->second.Expired = true; - actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, poolId)); + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(databaseId, poolId)); } } else { auto& poolInfo = PoolsCache[poolKey]; @@ -522,16 +522,16 @@ class TResourcePoolsCache { } if (clearClassifierCache) { - GetOrCreateDatabaseInfo(database)->UserToResourcePool.clear(); + GetOrCreateDatabaseInfo(databaseId)->UserToResourcePool.clear(); } } void UpdateResourcePoolClassifiersInfo(const TResourcePoolClassifierSnapshot* snapsot, TActorContext actorContext) { auto resourcePoolClassifierConfigs = snapsot->GetResourcePoolClassifierConfigs(); - for (auto& [database, databaseInfo] : DatabasesCache) { - auto it = resourcePoolClassifierConfigs.find(database); + for (auto& [databaseId, databaseInfo] : DatabasesCache) { + auto it = resourcePoolClassifierConfigs.find(databaseId); if (it != resourcePoolClassifierConfigs.end()) { - UpdateDatabaseResourcePoolClassifiers(database, databaseInfo, std::move(it->second), actorContext); + UpdateDatabaseResourcePoolClassifiers(databaseId, databaseInfo, std::move(it->second), actorContext); resourcePoolClassifierConfigs.erase(it); } else if (!databaseInfo.ResourcePoolsClassifiers.empty()) { databaseInfo.ResourcePoolsClassifiers.clear(); @@ -539,8 +539,8 @@ class TResourcePoolsCache { databaseInfo.UserToResourcePool.clear(); } } - for (auto& [database, configsMap] : resourcePoolClassifierConfigs) { - UpdateDatabaseResourcePoolClassifiers(database, *GetOrCreateDatabaseInfo(database), std::move(configsMap), actorContext); + for (auto& [databaseId, configsMap] : resourcePoolClassifierConfigs) { + UpdateDatabaseResourcePoolClassifiers(databaseId, *GetOrCreateDatabaseInfo(databaseId), std::move(configsMap), actorContext); } } @@ -567,7 +567,7 @@ class TResourcePoolsCache { } } - void UpdateDatabaseResourcePoolClassifiers(const TString& database, TDatabaseInfo& databaseInfo, std::unordered_map&& configsMap, TActorContext actorContext) { + void UpdateDatabaseResourcePoolClassifiers(const TString& databaseId, TDatabaseInfo& databaseInfo, std::unordered_map&& configsMap, TActorContext actorContext) { if (databaseInfo.ResourcePoolsClassifiers == configsMap) { return; } @@ -579,12 +579,12 @@ class TResourcePoolsCache { const auto& classifierSettings = classifier.GetClassifierSettings(); databaseInfo.RankToClassifierInfo.insert({classifier.GetRank(), TClassifierInfo(classifierSettings)}); if (!PoolsCache.contains(classifierSettings.ResourcePool)) { - actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, classifierSettings.ResourcePool)); + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(databaseId, classifierSettings.ResourcePool)); } } } - std::pair GetPoolIdFromClassifiers(const TString& database, const TString& userSID, TDatabaseInfo& databaseInfo, const TIntrusiveConstPtr& userToken, TActorContext actorContext) const { + std::pair GetPoolIdFromClassifiers(const TString& databaseId, const TString& userSID, TDatabaseInfo& databaseInfo, const TIntrusiveConstPtr& userToken, TActorContext actorContext) const { auto& usersMap = databaseInfo.UserToResourcePool; if (const auto it = usersMap.find(userSID); it != usersMap.end()) { return it->second; @@ -597,9 +597,9 @@ class TResourcePoolsCache { continue; } - auto it = PoolsCache.find(GetPoolKey(database, classifier.PoolId)); + auto it = PoolsCache.find(GetPoolKey(databaseId, classifier.PoolId)); if (it == PoolsCache.end()) { - actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, classifier.PoolId)); + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(databaseId, classifier.PoolId)); continue; } @@ -616,21 +616,20 @@ class TResourcePoolsCache { return {poolId, rank}; } - TDatabaseInfo* GetOrCreateDatabaseInfo(const TString& database) { - const TString& path = CanonizePath(database); - if (const auto it = DatabasesCache.find(path); it != DatabasesCache.end()) { + TDatabaseInfo* GetOrCreateDatabaseInfo(const TString& databaseId) { + if (const auto it = DatabasesCache.find(databaseId); it != DatabasesCache.end()) { return &it->second; } - return &DatabasesCache.insert({path, TDatabaseInfo{}}).first->second; + return &DatabasesCache.insert({databaseId, TDatabaseInfo{}}).first->second; } - const TDatabaseInfo* GetDatabaseInfo(const TString& database) const { - const auto it = DatabasesCache.find(CanonizePath(database)); + const TDatabaseInfo* GetDatabaseInfo(const TString& databaseId) const { + const auto it = DatabasesCache.find(databaseId); return it != DatabasesCache.end() ? &it->second : nullptr; } - static TString GetPoolKey(const TString& database, const TString& poolId) { - return CanonizePath(TStringBuilder() << database << "/" << poolId); + static TString GetPoolKey(const TString& databaseId, const TString& poolId) { + return TStringBuilder() << databaseId << "/" << poolId; } private: @@ -660,13 +659,10 @@ class TDatabasesCache { template bool SetDatabaseIdOrDefer(TEvent& event, i32 requestType, TActorContext actorContext) { - if (!event->Get()->GetDatabaseId().empty()) { - return true; - } - const auto& database = CanonizePath(event->Get()->GetDatabase()); - if (database.empty() || database == GetTenantName()) { - event->Get()->SetDatabaseId(GetTenantName()); + const auto& tenantName = CanonizePath(AppData()->TenantName); + if (database.empty() || database == tenantName) { + event->Get()->SetDatabaseId(tenantName); return true; } @@ -690,7 +686,6 @@ class TDatabasesCache { void StopSubscriberActor(TActorContext actorContext) const; private: - const TString& GetTenantName(); void SubscribeOnDatabase(const TString& database, TActorContext actorContext); void PingDatabaseSubscription(const TString& database, TActorContext actorContext) const; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp index db3a9b844400..a29f837a7564 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -734,11 +734,11 @@ ::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCou struct TEvPingPool : public TEventLocal { - TString Database; + TString DatabaseId; TString Pool; - TEvPingPool(TString database, TString pool) - : Database(database) + TEvPingPool(TString databaseId, TString pool) + : DatabaseId(databaseId) , Pool(pool) { } @@ -805,11 +805,11 @@ class TSchedulerActor : public TActorBootstrapped { } void Handle(TEvSchedulerNewPool::TPtr& ev) { - Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->DatabaseId, ev->Get()->Pool)); } void Handle(TEvPingPool::TPtr& ev) { - Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->DatabaseId, ev->Get()->Pool)); } void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h index 37bb1b2008a0..4c1f5f8c972f 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -117,11 +117,11 @@ struct TEvSchedulerDeregister : public TEventLocal { - TString Database; + TString DatabaseId; TString Pool; - TEvSchedulerNewPool(TString database, TString pool) - : Database(database) + TEvSchedulerNewPool(TString databaseId, TString pool) + : DatabaseId(databaseId) , Pool(pool) { } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index f45f214b044f..984957857bd3 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -199,12 +199,12 @@ std::unique_ptr TKqpQueryState::BuildCompileRequest(s TGUCSettings gUCSettings = gUCSettingsPtr ? *gUCSettingsPtr : TGUCSettings(); switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); keepInCache = GetQueryKeepInCache() && query->IsSql(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); keepInCache = query->IsSql(); break; @@ -214,7 +214,7 @@ std::unique_ptr TKqpQueryState::BuildCompileRequest(s break; case NKikimrKqp::QUERY_ACTION_EXPLAIN: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); keepInCache = false; break; @@ -255,11 +255,11 @@ std::unique_ptr TKqpQueryState::BuildReCompileReque switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXPLAIN: case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); break; case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: @@ -299,7 +299,7 @@ std::unique_ptr TKqpQueryState::BuildCompileSplittedR switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Cluster, Database, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); break; default: YQL_ENSURE(false); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 7a6befd64f0e..6a095cb748d8 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -91,6 +91,7 @@ class TKqpQueryState : public TNonCopyable { } UserRequestContext->PoolId = RequestEv->GetPoolId(); UserRequestContext->PoolConfig = RequestEv->GetPoolConfig(); + UserRequestContext->DatabaseId = RequestEv->GetDatabaseId(); } // the monotonously growing counter, the ordinal number of the query, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c97381ba82e4..c88d854859d6 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -250,7 +250,7 @@ class TKqpSessionActor : public TActorBootstrapped { } Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvPlaceRequestIntoPool( - QueryState->Database, + QueryState->UserRequestContext->DatabaseId, SessionId, QueryState->UserRequestContext->PoolId, QueryState->UserToken @@ -417,6 +417,7 @@ class TKqpSessionActor : public TActorBootstrapped { << " text: " << QueryState->GetQuery() << " rpcActor: " << QueryState->RequestActorId << " database: " << QueryState->GetDatabase() + << " databaseId: " << QueryState->UserRequestContext->DatabaseId << " pool id: " << QueryState->UserRequestContext->PoolId ); @@ -2136,7 +2137,7 @@ class TKqpSessionActor : public TActorBootstrapped { const auto& stats = QueryState->QueryStats; auto event = std::make_unique( - QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId, + QueryState->UserRequestContext->DatabaseId, SessionId, QueryState->UserRequestContext->PoolId, TDuration::MicroSeconds(stats.DurationUs), TDuration::MicroSeconds(stats.WorkerCpuTimeUs) ); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 587bf6d010d4..721f821e0e55 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped { std::shared_ptr loader = std::make_shared( Settings.Cluster, TlsActivationContext->ActorSystem(), Config, false, nullptr); - Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader), + Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, QueryState->RequestEv->GetDatabaseId(), std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, QueryServiceConfig); Config->FeatureFlags = AppData(ctx)->FeatureFlags; diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 2074b571ba59..6a06bc69f861 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -35,7 +35,7 @@ TIntrusivePtr GetIcGateway(Tests::TServer& server) { counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters); counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr loader = std::make_shared(TestCluster, server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr(nullptr),false); - return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), + return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters, server.GetSettings().AppConfig->GetQueryServiceConfig()); } diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 95c9bb6eb3d7..271d5baab58b 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -6944,7 +6945,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { // DROP RESOURCE POOL CLASSIFIER checkQuery("DROP RESOURCE POOL CLASSIFIER MyResourcePoolClassifier;", EStatus::GENERIC_ERROR, - "Classifier with name MyResourcePoolClassifier not found in database /Root"); + "Classifier with name MyResourcePoolClassifier not found in database with id /Root"); } Y_UNIT_TEST(DisableResourcePoolClassifiersOnServerless) { @@ -7125,16 +7126,19 @@ Y_UNIT_TEST_SUITE(KqpScheme) { UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "The rank could not be set automatically, the maximum rank of the resource pool classifier is too high: 9223372036854775807"); } - TString FetchResourcePoolClassifiers(TKikimrRunner& kikimr) { - auto& runtime = *kikimr.GetTestServer().GetRuntime(); - const TActorId edgeActor = runtime.AllocateEdgeActor(); - runtime.Send(NMetadata::NProvider::MakeServiceId(runtime.GetNodeId()), edgeActor, new NMetadata::NProvider::TEvAskSnapshot(std::make_shared())); + TString FetchResourcePoolClassifiers(TTestActorRuntime& runtime, ui32 nodeIndex) { + const TActorId edgeActor = runtime.AllocateEdgeActor(nodeIndex); + runtime.Send(NMetadata::NProvider::MakeServiceId(runtime.GetNodeId(nodeIndex)), edgeActor, new NMetadata::NProvider::TEvAskSnapshot(std::make_shared()), nodeIndex); const auto response = runtime.GrabEdgeEvent(edgeActor); UNIT_ASSERT(response); return response->Get()->GetSnapshotAs()->SerializeToString(); } + TString FetchResourcePoolClassifiers(TKikimrRunner& kikimr) { + return FetchResourcePoolClassifiers(*kikimr.GetTestServer().GetRuntime(), 0); + } + Y_UNIT_TEST(CreateResourcePoolClassifier) { NKikimrConfig::TAppConfig config; config.MutableFeatureFlags()->SetEnableResourcePools(true); @@ -7168,6 +7172,31 @@ Y_UNIT_TEST_SUITE(KqpScheme) { UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"member_name\":\"test@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":1020,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{\"member_name\":\"another@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"}]}"); } + Y_UNIT_TEST(CreateResourcePoolClassifierOnServerless) { + auto ydb = NWorkload::TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .Create(); + + const auto& serverlessTenant = ydb->GetSettings().GetServerlessTenantName(); + NWorkload::TSampleQueries::CheckSuccess(ydb->ExecuteQuery(R"( + CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH ( + RANK=20, + RESOURCE_POOL="test_pool" + );)", + NWorkload::TQueryRunnerSettings() + .PoolId("") + .Database(serverlessTenant) + .NodeIndex(1) + )); + + const auto pathId = ydb->FetchDatabase(serverlessTenant)->Get()->PathId; + UNIT_ASSERT_VALUES_EQUAL( + FetchResourcePoolClassifiers(*ydb->GetRuntime(), 1), + TStringBuilder() << "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"resource_pool\":\"test_pool\"},\"database\":\"" << pathId.OwnerId << ":" << pathId.LocalPathId << ":\\/Root\\/test-serverless\"}]}" + ); + } + Y_UNIT_TEST(DoubleCreateResourcePoolClassifier) { NKikimrConfig::TAppConfig config; config.MutableFeatureFlags()->SetEnableResourcePools(true); @@ -7277,7 +7306,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Classifier with name MyResourcePoolClassifier not found in database /Root"); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Classifier with name MyResourcePoolClassifier not found in database with id /Root"); } Y_UNIT_TEST(DropResourcePoolClassifier) { @@ -7324,7 +7353,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto query = "DROP RESOURCE POOL CLASSIFIER MyResourcePoolClassifier;"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Classifier with name MyResourcePoolClassifier not found in database /Root"); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Classifier with name MyResourcePoolClassifier not found in database with id /Root"); } Y_UNIT_TEST(DisableMetadataObjectsOnServerless) { diff --git a/ydb/core/kqp/workload_service/actors/actors.h b/ydb/core/kqp/workload_service/actors/actors.h index 6e55f7553f55..d21706f3df7b 100644 --- a/ydb/core/kqp/workload_service/actors/actors.h +++ b/ydb/core/kqp/workload_service/actors/actors.h @@ -6,16 +6,16 @@ namespace NKikimr::NKqp::NWorkload { // Pool state holder -NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreatePoolHandlerActor(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); // Fetch pool and create default pool if needed NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists); // Fetch and create pool in scheme shard -NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken); -NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); +NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, TIntrusiveConstPtr userToken); +NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); -// Checks that database is serverless +// Checks that database is serverless and return database id NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database, TIntrusiveConstPtr userToken = nullptr, NACLib::EAccessRights checkAccess = NACLib::EAccessRights::NoAccess); // Cpu load fetcher actor diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 77512a4d529f..63fb1adbe068 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -46,9 +46,9 @@ class TPoolHandlerActorBase : public TActor { NMonitoring::TDynamicCounters::TCounterPtr QueueSizeLimit; NMonitoring::TDynamicCounters::TCounterPtr LoadCpuThreshold; - TCommonCounters(NMonitoring::TDynamicCounterPtr counters, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) + TCommonCounters(NMonitoring::TDynamicCounterPtr counters, const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) : CountersRoot(counters) - , CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId))) + , CountersSubgroup(counters->GetSubgroup("pool", TStringBuilder() << databaseId << "/" << poolId)) { Register(); UpdateConfigCounters(poolConfig); @@ -125,10 +125,10 @@ class TPoolHandlerActorBase : public TActor { }; public: - TPoolHandlerActorBase(void (TDerived::* requestFunc)(TAutoPtr& ev), const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) + TPoolHandlerActorBase(void (TDerived::* requestFunc)(TAutoPtr& ev), const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(requestFunc) - , Counters(counters, database, poolId, poolConfig) - , Database(database) + , Counters(counters, databaseId, poolId, poolConfig) + , DatabaseId(databaseId) , PoolId(poolId) , QueueSizeLimit(GetMaxQueueSize(poolConfig)) , InFlightLimit(GetMaxInFlight(poolConfig)) @@ -161,7 +161,7 @@ class TPoolHandlerActorBase : public TActor { } SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers); - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId)); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(DatabaseId, PoolId)); Counters.OnCleanup(ResetCountersOnStrop); @@ -187,7 +187,7 @@ class TPoolHandlerActorBase : public TActor { void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { auto event = std::move(ev->Get()->Event); const TString& sessionId = event->Get()->SessionId; - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId)); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(DatabaseId, PoolId, sessionId)); const TActorId& workerActorId = event->Sender; if (!InFlightLimit) { @@ -360,7 +360,7 @@ class TPoolHandlerActorBase : public TActor { void SendPoolInfoUpdate(const std::optional& config, const std::optional& securityObject, const std::unordered_set& subscribers) const { for (const auto& subscriber : subscribers) { - this->Send(subscriber, new TEvUpdatePoolInfo(Database, PoolId, config, securityObject)); + this->Send(subscriber, new TEvUpdatePoolInfo(DatabaseId, PoolId, config, securityObject)); } } @@ -390,7 +390,7 @@ class TPoolHandlerActorBase : public TActor { void RemoveRequest(TRequest* request) { auto event = std::make_unique( - Database, PoolId, request->Duration, request->CpuConsumed, request->UsedCpuQuota + DatabaseId, PoolId, request->Duration, request->CpuConsumed, request->UsedCpuQuota ); this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), event.release()); @@ -424,7 +424,7 @@ class TPoolHandlerActorBase : public TActor { } TString LogPrefix() const { - return TStringBuilder() << "[TPoolHandlerActorBase] ActorId: " << this->SelfId() << ", Database: " << Database << ", PoolId: " << PoolId << ", "; + return TStringBuilder() << "[TPoolHandlerActorBase] ActorId: " << this->SelfId() << ", DatabaseId: " << DatabaseId << ", PoolId: " << PoolId << ", "; } private: @@ -482,8 +482,8 @@ class TPoolHandlerActorBase : public TActor { RefreshState(true); if (ShouldResign()) { - const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(Database, PoolId, poolConfig, Counters.CountersRoot)); - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvResignPoolHandler(Database, PoolId, newHandler)); + const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(DatabaseId, PoolId, poolConfig, Counters.CountersRoot)); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvResignPoolHandler(DatabaseId, PoolId, newHandler)); } } @@ -501,7 +501,7 @@ class TPoolHandlerActorBase : public TActor { TCommonCounters Counters; // Configuration - const TString Database; + const TString DatabaseId; const TString PoolId; ui64 QueueSizeLimit = std::numeric_limits::max(); ui64 InFlightLimit = std::numeric_limits::max(); @@ -527,8 +527,8 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase; public: - TUnlimitedPoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) - : TBase(&TBase::StateFuncBase, database, poolId, poolConfig, counters) + TUnlimitedPoolHandlerActor(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) + : TBase(&TBase::StateFuncBase, databaseId, poolId, poolConfig, counters) { Y_ENSURE(!ShouldResign()); } @@ -591,8 +591,8 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseInc(); if (!PreparingFinished) { - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPrepareTablesRequest(Database, PoolId)); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPrepareTablesRequest(DatabaseId, PoolId)); } RefreshState(); @@ -679,7 +679,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateRefreshPoolStateActor(this->SelfId(), Database, PoolId, LEASE_DURATION, Counters.CountersSubgroup)); + this->Register(CreateRefreshPoolStateActor(this->SelfId(), DatabaseId, PoolId, LEASE_DURATION, Counters.CountersSubgroup)); } } @@ -813,7 +813,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), DatabaseId, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); GetRequest(sessionId)->CleanupRequired = true; } break; @@ -846,7 +846,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase(); event->Record.SetPoolId(PoolId); - event->Record.SetDatabase(Database); + event->Record.SetDatabase(DatabaseId); this->Send(MakeKqpWorkloadServiceId(nodeId), std::move(event)); RefreshState(); return; @@ -911,7 +911,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), DatabaseId, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); GetRequest(sessionId)->CleanupRequired = true; } } @@ -928,7 +928,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, Counters.CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), DatabaseId, PoolId, std::nullopt, LEASE_DURATION, Counters.CountersSubgroup)); } } } @@ -943,7 +943,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateDelayRequestActor(this->SelfId(), Database, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters.CountersSubgroup)); + this->Register(CreateDelayRequestActor(this->SelfId(), DatabaseId, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters.CountersSubgroup)); DelayedRequests.emplace_back(sessionId); request->CleanupRequired = true; } @@ -956,7 +956,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, Counters.CountersSubgroup)); + this->Register(CreateCleanupRequestsActor(this->SelfId(), DatabaseId, PoolId, FinishedRequests, Counters.CountersSubgroup)); FinishedRequests.clear(); FifoCounters.FinishingRequestsCount->Set(0); } @@ -1054,11 +1054,11 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase { void StartPoolFetchRequest() const { LOG_D("Start pool fetching"); - Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken)); + Register(CreatePoolFetcherActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, Event->Get()->UserToken)); } void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { @@ -74,7 +74,7 @@ class TPoolResolverActor : public TActorBootstrapped { diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}); - Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); + Register(CreatePoolCreatorActor(SelfId(), Event->Get()->DatabaseId, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); } void Handle(TEvPrivate::TEvCreatePoolResponse::TPtr& ev) { @@ -96,7 +96,7 @@ class TPoolResolverActor : public TActorBootstrapped { private: TString LogPrefix() const { - return TStringBuilder() << "[TPoolResolverActor] ActorId: " << SelfId() << ", Database: " << Event->Get()->Database << ", PoolId: " << Event->Get()->PoolId << ", SessionId: " << Event->Get()->SessionId << ", "; + return TStringBuilder() << "[TPoolResolverActor] ActorId: " << SelfId() << ", DatabaseId: " << Event->Get()->DatabaseId << ", PoolId: " << Event->Get()->PoolId << ", SessionId: " << Event->Get()->SessionId << ", "; } void Reply(NResourcePool::TPoolSettings poolConfig, TPathId pathId) { @@ -122,9 +122,9 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) + TPoolFetcherActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, TIntrusiveConstPtr userToken) : ReplyActorId(replyActorId) - , Database(database) + , DatabaseId(databaseId) , PoolId(poolId) , UserToken(userToken) {} @@ -180,7 +180,7 @@ class TPoolFetcherActor : public TSchemeActorBase { LOG_D("Start pool fetching"); auto event = NTableCreator::BuildSchemeCacheNavigateRequest( {{".metadata/workload_manager/pools", PoolId}}, - Database ? Database : AppData()->TenantName, + DatabaseIdToDatabase(DatabaseId), UserToken ); event->ResultSet[0].Access |= NACLib::SelectRow; @@ -193,7 +193,7 @@ class TPoolFetcherActor : public TSchemeActorBase { } TString LogPrefix() const override { - return TStringBuilder() << "[TPoolFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", PoolId: " << PoolId << ", "; + return TStringBuilder() << "[TPoolFetcherActor] ActorId: " << SelfId() << ", DatabaseId: " << DatabaseId << ", PoolId: " << PoolId << ", "; } private: @@ -221,13 +221,13 @@ class TPoolFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchPoolResponse(status, Database, PoolId, PoolConfig, PathIdFromPathId(PathId), std::move(Issues))); + Send(ReplyActorId, new TEvPrivate::TEvFetchPoolResponse(status, DatabaseId, PoolId, PoolConfig, PathIdFromPathId(PathId), std::move(Issues))); PassAway(); } private: const TActorId ReplyActorId; - const TString Database; + const TString DatabaseId; const TString PoolId; const TIntrusiveConstPtr UserToken; @@ -240,9 +240,9 @@ class TPoolCreatorActor : public TSchemeActorBase { using TBase = TSchemeActorBase; public: - TPoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) + TPoolCreatorActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) : ReplyActorId(replyActorId) - , Database(database) + , DatabaseId(databaseId) , PoolId(poolId) , UserToken(userToken) , DiffAcl(diffAcl) @@ -326,7 +326,7 @@ class TPoolCreatorActor : public TSchemeActorBase { auto event = std::make_unique(); auto& schemeTx = *event->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(JoinPath({Database ? Database : AppData()->TenantName, ".metadata/workload_manager/pools"})); + schemeTx.SetWorkingDir(JoinPath({DatabaseIdToDatabase(DatabaseId), ".metadata/workload_manager/pools"})); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool); schemeTx.SetInternal(true); @@ -345,7 +345,7 @@ class TPoolCreatorActor : public TSchemeActorBase { } TString LogPrefix() const override { - return TStringBuilder() << "[TPoolCreatorActor] ActorId: " << SelfId() << ", Database: " << Database << ", PoolId: " << PoolId << ", "; + return TStringBuilder() << "[TPoolCreatorActor] ActorId: " << SelfId() << ", DatabaseId: " << DatabaseId << ", PoolId: " << PoolId << ", "; } private: @@ -432,7 +432,7 @@ class TPoolCreatorActor : public TSchemeActorBase { private: const TActorId ReplyActorId; - const TString Database; + const TString DatabaseId; const TString PoolId; const TIntrusiveConstPtr UserToken; const NACLibProto::TDiffACL DiffAcl; @@ -532,13 +532,13 @@ class TDatabaseFetcherActor : public TSchemeActorBase { void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { if (status == Ydb::StatusIds::SUCCESS) { - LOG_D("Database info successfully fetched"); + LOG_D("Database info successfully fetched, serverless: " << Serverless); } else { LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString()); } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); + Send(ReplyActorId, new TEvFetchDatabaseResponse(status, Database, CreateDatabaseId(Database, Serverless, PathId), Serverless, PathId, std::move(Issues))); PassAway(); } @@ -570,12 +570,12 @@ IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaul return new TPoolResolverActor(std::move(event), defaultPoolExists); } -IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) { - return new TPoolFetcherActor(replyActorId, database, poolId, userToken); +IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, TIntrusiveConstPtr userToken) { + return new TPoolFetcherActor(replyActorId, databaseId, poolId, userToken); } -IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) { - return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl); +IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) { + return new TPoolCreatorActor(replyActorId, databaseId, poolId, poolConfig, userToken, diffAcl); } IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database, TIntrusiveConstPtr userToken, NACLib::EAccessRights checkAccess) { diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 57e332ac66b3..911dccd36f0e 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -74,9 +74,9 @@ struct TEvPrivate { }; struct TEvFetchPoolResponse : public NActors::TEventLocal { - TEvFetchPoolResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TPathId pathId, NYql::TIssues issues) + TEvFetchPoolResponse(Ydb::StatusIds::StatusCode status, const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TPathId pathId, NYql::TIssues issues) : Status(status) - , Database(database) + , DatabaseId(databaseId) , PoolId(poolId) , PoolConfig(poolConfig) , PathId(pathId) @@ -84,7 +84,7 @@ struct TEvPrivate { {} const Ydb::StatusIds::StatusCode Status; - const TString Database; + const TString DatabaseId; const TString PoolId; const NResourcePool::TPoolSettings PoolConfig; const TPathId PathId; @@ -102,37 +102,37 @@ struct TEvPrivate { }; struct TEvPrepareTablesRequest : public NActors::TEventLocal { - TEvPrepareTablesRequest(const TString& database, const TString& poolId) - : Database(database) + TEvPrepareTablesRequest(const TString& databaseId, const TString& poolId) + : DatabaseId(databaseId) , PoolId(poolId) {} - const TString Database; + const TString DatabaseId; const TString PoolId; }; struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal { - TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId) - : Database(database) + TEvPlaceRequestIntoPoolResponse(const TString& databaseId, const TString& poolId, const TString& sessionId) + : DatabaseId(databaseId) , PoolId(poolId) , SessionId(sessionId) {} - const TString Database; + const TString DatabaseId; const TString PoolId; const TString SessionId; }; struct TEvFinishRequestInPool : public NActors::TEventLocal { - TEvFinishRequestInPool(const TString& database, const TString& poolId, TDuration duration, TDuration cpuConsumed, bool adjustCpuQuota) - : Database(database) + TEvFinishRequestInPool(const TString& databaseId, const TString& poolId, TDuration duration, TDuration cpuConsumed, bool adjustCpuQuota) + : DatabaseId(databaseId) , PoolId(poolId) , Duration(duration) , CpuConsumed(cpuConsumed) , AdjustCpuQuota(adjustCpuQuota) {} - const TString Database; + const TString DatabaseId; const TString PoolId; const TDuration Duration; const TDuration CpuConsumed; @@ -140,13 +140,13 @@ struct TEvPrivate { }; struct TEvResignPoolHandler : public NActors::TEventLocal { - TEvResignPoolHandler(const TString& database, const TString& poolId, const TActorId& newHandler) - : Database(database) + TEvResignPoolHandler(const TString& databaseId, const TString& poolId, const TActorId& newHandler) + : DatabaseId(databaseId) , PoolId(poolId) , NewHandler(newHandler) {} - const TString Database; + const TString DatabaseId; const TString PoolId; const TActorId NewHandler; }; @@ -160,12 +160,12 @@ struct TEvPrivate { }; struct TEvStopPoolHandlerResponse : public NActors::TEventLocal { - TEvStopPoolHandlerResponse(const TString& database, const TString& poolId) - : Database(database) + TEvStopPoolHandlerResponse(const TString& databaseId, const TString& poolId) + : DatabaseId(databaseId) , PoolId(poolId) {} - const TString Database; + const TString DatabaseId; const TString PoolId; }; diff --git a/ydb/core/kqp/workload_service/common/helpers.cpp b/ydb/core/kqp/workload_service/common/helpers.cpp index 79fd3fff5c69..b9ad774c6f84 100644 --- a/ydb/core/kqp/workload_service/common/helpers.cpp +++ b/ydb/core/kqp/workload_service/common/helpers.cpp @@ -1,8 +1,32 @@ #include "helpers.h" +#include +#include + namespace NKikimr::NKqp::NWorkload { +TString CreateDatabaseId(const TString& database, bool serverless, TPathId pathId) { + TString databasePath = CanonizePath(database); + TString tennantPath = CanonizePath(AppData()->TenantName); + if (databasePath.empty() || databasePath == tennantPath) { + return tennantPath; + } + + if (serverless) { + databasePath = TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" << databasePath; + } + return databasePath; +} + +TString DatabaseIdToDatabase(TStringBuf databaseId) { + TStringBuf id; + TStringBuf database; + return databaseId.TrySplit("/", id, database) + ? CanonizePath(TString(database)) // Serverless + : CanonizePath(TString(databaseId)); // Dedicated +} + NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message) { NYql::TIssue rootIssue(message); for (const NYql::TIssue& issue : issues) { diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index 163b2d765ed1..a4c933b92356 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -99,6 +99,9 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { }; +TString CreateDatabaseId(const TString& database, bool serverless, TPathId pathId); +TString DatabaseIdToDatabase(TStringBuf databaseId); + NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message); void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& description, NResourcePool::TPoolSettings& poolConfig); diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 9bcfe49f4a4e..af4c0536b45f 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -143,15 +143,15 @@ class TKqpWorkloadService : public TActorBootstrapped { } void Handle(TEvSubscribeOnPoolChanges::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; if (!EnabledResourcePools) { - Send(ev->Sender, new TEvUpdatePoolInfo(database, poolId, std::nullopt, std::nullopt)); + Send(ev->Sender, new TEvUpdatePoolInfo(databaseId, poolId, std::nullopt, std::nullopt)); return; } - LOG_D("Recieved subscription request, Database: " << database << ", PoolId: " << poolId); - GetOrCreateDatabaseState(database)->DoSubscribeRequest(std::move(ev)); + LOG_D("Recieved subscription request, DatabaseId: " << databaseId << ", PoolId: " << poolId); + GetOrCreateDatabaseState(databaseId)->DoSubscribeRequest(std::move(ev)); } void Handle(TEvPlaceRequestIntoPool::TPtr& ev) { @@ -161,28 +161,28 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - const TString& database = ev->Get()->Database; - LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); - GetOrCreateDatabaseState(database)->DoPlaceRequest(std::move(ev)); + const TString& databaseId = ev->Get()->DatabaseId; + LOG_D("Recieved new request from " << workerActorId << ", DatabaseId: " << databaseId << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); + GetOrCreateDatabaseState(databaseId)->DoPlaceRequest(std::move(ev)); } void Handle(TEvCleanupRequest::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; const TString& sessionId = ev->Get()->SessionId; - if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) { - LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); - GetOrCreateDatabaseState(database)->PendingCancelRequests[sessionId].emplace_back(std::move(ev)); + if (GetOrCreateDatabaseState(databaseId)->PendingSessionIds.contains(sessionId)) { + LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, DatabaseId: " << databaseId << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); + GetOrCreateDatabaseState(databaseId)->PendingCancelRequests[sessionId].emplace_back(std::move(ev)); return; } - auto poolState = GetPoolState(database, poolId); + auto poolState = GetPoolState(databaseId, poolId); if (!poolState) { ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found"); return; } - LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); + LOG_D("Finished request with worker actor " << ev->Sender << ", DatabaseId: " << databaseId << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); poolState->DoCleanupRequest(std::move(ev)); } @@ -232,28 +232,33 @@ class TKqpWorkloadService : public TActorBootstrapped { private: void Handle(TEvFetchDatabaseResponse::TPtr& ev) { - GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + LOG_D("Successfully fetched database info, DatabaseId: " << ev->Get()->DatabaseId << ", Serverless: " << ev->Get()->Serverless); + } else { + LOG_D("Failed to fetch database info, DatabaseId: " << ev->Get()->DatabaseId << ", Status: " << ev->Get()->Status << ", Issues: " << ev->Get()->Issues.ToOneLineString()); + } + GetOrCreateDatabaseState(ev->Get()->DatabaseId)->UpdateDatabaseInfo(ev); } void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; TActorId poolHandler; if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - LOG_D("Successfully fetched pool " << poolId << ", Database: " << database); - poolHandler = GetOrCreatePoolState(database, poolId, ev->Get()->PoolConfig)->PoolHandler; + LOG_D("Successfully fetched pool " << poolId << ", DatabaseId: " << databaseId); + poolHandler = GetOrCreatePoolState(databaseId, poolId, ev->Get()->PoolConfig)->PoolHandler; } else { - LOG_W("Failed to fetch pool " << poolId << ", Database: " << database << ", status: " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); + LOG_W("Failed to fetch pool " << poolId << ", DatabaseId: " << databaseId << ", status: " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); } - GetOrCreateDatabaseState(database)->UpdatePoolInfo(ev, poolHandler); + GetOrCreateDatabaseState(databaseId)->UpdatePoolInfo(ev, poolHandler); } void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { const auto& event = ev->Get()->Event; - const TString& database = event->Get()->Database; - auto databaseState = GetOrCreateDatabaseState(database); + const TString& databaseId = event->Get()->DatabaseId; + auto databaseState = GetOrCreateDatabaseState(databaseId); if (ev->Get()->DefaultPoolCreated) { databaseState->HasDefaultPool = true; } @@ -267,21 +272,21 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - LOG_D("Successfully fetched pool " << poolId << ", Database: " << database << ", SessionId: " << event->Get()->SessionId); + LOG_D("Successfully fetched pool " << poolId << ", DatabaseId: " << databaseId << ", SessionId: " << event->Get()->SessionId); - auto poolState = GetOrCreatePoolState(database, poolId, ev->Get()->PoolConfig); + auto poolState = GetOrCreatePoolState(databaseId, poolId, ev->Get()->PoolConfig); poolState->PendingRequests.emplace(std::move(ev)); poolState->StartPlaceRequest(); } void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; const TString& sessionId = ev->Get()->SessionId; - LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId); + LOG_T("Request placed into pool, DatabaseId: " << databaseId << ", PoolId: " << poolId << ", SessionId: " << sessionId); - auto poolState = GetPoolState(database, poolId); - GetOrCreateDatabaseState(database)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) { + auto poolState = GetPoolState(databaseId, poolId); + GetOrCreateDatabaseState(databaseId)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) { if (poolState) { poolState->DoCleanupRequest(std::move(event)); } else { @@ -302,11 +307,11 @@ class TKqpWorkloadService : public TActorBootstrapped { void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) { const auto& event = ev->Get()->Record; - const TString& database = event.GetDatabase(); + const TString& databaseId = event.GetDatabase(); const TString& poolId = event.GetPoolId(); - LOG_T("Got remote refresh request, Database: " << database << ", PoolId: " << poolId << ", NodeId: " << ev->Sender.NodeId()); + LOG_T("Got remote refresh request, DatabaseId: " << databaseId << ", PoolId: " << poolId << ", NodeId: " << ev->Sender.NodeId()); - if (auto poolState = GetPoolState(database, poolId)) { + if (auto poolState = GetPoolState(databaseId, poolId)) { Send(ev->Forward(poolState->PoolHandler)); } } @@ -321,11 +326,11 @@ class TKqpWorkloadService : public TActorBootstrapped { } void Handle(TEvPrivate::TEvFinishRequestInPool::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; - LOG_T("Request finished in pool, Database: " << database << ", PoolId: " << poolId << ", Duration: " << ev->Get()->Duration << ", CpuConsumed: " << ev->Get()->CpuConsumed << ", AdjustCpuQuota: " << ev->Get()->AdjustCpuQuota); + LOG_T("Request finished in pool, DatabaseId: " << databaseId << ", PoolId: " << poolId << ", Duration: " << ev->Get()->Duration << ", CpuConsumed: " << ev->Get()->CpuConsumed << ", AdjustCpuQuota: " << ev->Get()->AdjustCpuQuota); - if (auto poolState = GetPoolState(database, poolId)) { + if (auto poolState = GetPoolState(databaseId, poolId)) { poolState->OnRequestFinished(); } if (ev->Get()->AdjustCpuQuota) { @@ -335,11 +340,11 @@ class TKqpWorkloadService : public TActorBootstrapped { } void Handle(TEvPrivate::TEvPrepareTablesRequest::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; - LOG_T("Got create teables request, Database: " << database << ", PoolId: " << poolId); + LOG_T("Got create teables request, DatabaseId: " << databaseId << ", PoolId: " << poolId); - auto poolState = GetPoolState(database, poolId); + auto poolState = GetPoolState(databaseId, poolId); if (!poolState) { return; } @@ -348,7 +353,7 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(poolState->PoolHandler, new TEvPrivate::TEvTablesCreationFinished(true, {})); } else { poolState->WaitingInitialization = true; - PendingHandlers.emplace(GetPoolKey(database, poolId)); + PendingHandlers.emplace(GetPoolKey(databaseId, poolId)); PrepareWorkloadServiceTables(); } } @@ -397,11 +402,11 @@ class TKqpWorkloadService : public TActorBootstrapped { } void Handle(TEvPrivate::TEvResignPoolHandler::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; - LOG_T("Got resign request, Database: " << database << ", PoolId: " << poolId); + LOG_T("Got resign request, DatabaseId: " << databaseId << ", PoolId: " << poolId); - if (auto poolState = GetPoolState(database, poolId)) { + if (auto poolState = GetPoolState(databaseId, poolId)) { if (poolState->NewPoolHandler) { Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler(false)); } @@ -411,12 +416,12 @@ class TKqpWorkloadService : public TActorBootstrapped { } void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; const TString& poolId = ev->Get()->PoolId; - LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId); + LOG_T("Got stop pool handler response, DatabaseId: " << databaseId << ", PoolId: " << poolId); Counters.ActivePools->Dec(); - if (auto poolState = GetPoolState(database, poolId)) { + if (auto poolState = GetPoolState(databaseId, poolId)) { poolState->PreviousPoolHandlers.erase(ev->Sender); } } @@ -544,24 +549,24 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - TDatabaseState* GetOrCreateDatabaseState(TString database) { - database = CanonizePath(database); - auto databaseIt = DatabaseToState.find(database); + TDatabaseState* GetOrCreateDatabaseState(TString databaseId) { + auto databaseIt = DatabaseToState.find(databaseId); if (databaseIt != DatabaseToState.end()) { return &databaseIt->second; } - return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; + LOG_I("Creating new database state for id " << databaseId); + return &DatabaseToState.insert({databaseId, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; } - TPoolState* GetOrCreatePoolState(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) { - const auto& poolKey = GetPoolKey(database, poolId); + TPoolState* GetOrCreatePoolState(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) { + const auto& poolKey = GetPoolKey(databaseId, poolId); if (auto poolState = GetPoolState(poolKey)) { return poolState; } LOG_I("Creating new handler for pool " << poolKey); - const auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : MakeIntrusive())); + const auto poolHandler = Register(CreatePoolHandlerActor(databaseId, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : MakeIntrusive())); const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; Counters.ActivePools->Inc(); @@ -570,8 +575,8 @@ class TKqpWorkloadService : public TActorBootstrapped { return poolState; } - TPoolState* GetPoolState(const TString& database, const TString& poolId) { - return GetPoolState(GetPoolKey(database, poolId)); + TPoolState* GetPoolState(const TString& databaseId, const TString& poolId) { + return GetPoolState(GetPoolKey(databaseId, poolId)); } TPoolState* GetPoolState(const TString& key) { @@ -582,8 +587,8 @@ class TKqpWorkloadService : public TActorBootstrapped { return nullptr; } - static TString GetPoolKey(const TString& database, const TString& poolId) { - return CanonizePath(TStringBuilder() << database << "/" << poolId); + static TString GetPoolKey(const TString& databaseId, const TString& poolId) { + return CanonizePath(TStringBuilder() << databaseId << "/" << poolId); } TString LogPrefix() const { @@ -599,10 +604,10 @@ class TKqpWorkloadService : public TActorBootstrapped { bool ServiceInitialized = false; bool IdleChecksStarted = false; ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup; - std::unordered_set PendingHandlers; + std::unordered_set PendingHandlers; // DatabaseID/PoolID - std::unordered_map DatabaseToState; - std::unordered_map PoolIdToState; + std::unordered_map DatabaseToState; // DatabaseID to state + std::unordered_map PoolIdToState; // DatabaseID/PoolID to state std::unique_ptr CpuQuotaManager; ui32 NodeCount = 0; }; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index e9e292d81dfc..5fd22dbb032b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -20,8 +21,8 @@ struct TDatabaseState { std::vector PendingRequersts = {}; std::unordered_set PendingSessionIds = {}; - std::unordered_map> PendingCancelRequests = {}; - std::unordered_map> PendingSubscriptions = {}; + std::unordered_map> PendingCancelRequests = {}; // Session ID to requests + std::unordered_map> PendingSubscriptions = {}; // Pool ID to subscribers bool HasDefaultPool = false; bool Serverless = false; bool DatabaseUnsupported = false; @@ -32,23 +33,23 @@ struct TDatabaseState { const TString& poolId = ev->Get()->PoolId; auto& subscribers = PendingSubscriptions[poolId]; if (subscribers.empty()) { - ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->Database, poolId, nullptr)); + ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->DatabaseId, poolId, nullptr)); } subscribers.emplace(ev->Sender); } void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) { - TString database = ev->Get()->Database; + TString databaseId = ev->Get()->DatabaseId; PendingSessionIds.emplace(ev->Get()->SessionId); PendingRequersts.emplace_back(std::move(ev)); if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) { - ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, database)); + ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, DatabaseIdToDatabase(databaseId))); } else if (!DatabaseUnsupported) { StartPendingRequests(); } else { - ReplyContinueError(Ydb::StatusIds::UNSUPPORTED, {NYql::TIssue(TStringBuilder() << "Unsupported database: " << database)}); + ReplyContinueError(Ydb::StatusIds::UNSUPPORTED, {NYql::TIssue(TStringBuilder() << "Unsupported database: " << databaseId)}); } } @@ -62,9 +63,9 @@ struct TDatabaseState { if (ev->Get()->Status == Ydb::StatusIds::SUCCESS && poolHandler) { ActorContext.Send(poolHandler, new TEvPrivate::TEvUpdatePoolSubscription(ev->Get()->PathId, subscribers)); } else { - const TString& database = ev->Get()->Database; + const TString& databaseId = ev->Get()->DatabaseId; for (const auto& subscriber : subscribers) { - ActorContext.Send(subscriber, new TEvUpdatePoolInfo(database, poolId, std::nullopt, std::nullopt)); + ActorContext.Send(subscriber, new TEvUpdatePoolInfo(databaseId, poolId, std::nullopt, std::nullopt)); } } subscribers.clear(); @@ -77,6 +78,10 @@ struct TDatabaseState { return; } + if (Serverless != ev->Get()->Serverless) { + ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvKqp::TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless)); + } + LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); @@ -159,8 +164,8 @@ struct TPoolState { void DoCleanupRequest(TEvCleanupRequest::TPtr event) { for (const auto& poolHandler : PreviousPoolHandlers) { ActorContext.Send(poolHandler, new TEvCleanupRequest( - event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId, - event->Get()->Duration, event->Get()->CpuConsumed + event->Get()->DatabaseId, event->Get()->SessionId, + event->Get()->PoolId, event->Get()->Duration, event->Get()->CpuConsumed )); } ActorContext.Send(event->Forward(PoolHandler)); diff --git a/ydb/core/kqp/workload_service/tables/table_queries.cpp b/ydb/core/kqp/workload_service/tables/table_queries.cpp index ba6457288c94..2a18cadaaa6e 100644 --- a/ydb/core/kqp/workload_service/tables/table_queries.cpp +++ b/ydb/core/kqp/workload_service/tables/table_queries.cpp @@ -28,17 +28,17 @@ class TQueryBase : public NKikimr::TQueryBase { SetOperationInfo(operationName, traceId, counters); } - TQueryBase(const TString& operationName, const TString& traceId, const TString& database, const TString& sessionId, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(operationName, ComposeTraceId(traceId, database, sessionId), counters) + TQueryBase(const TString& operationName, const TString& traceId, const TString& databaseId, const TString& sessionId, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(operationName, ComposeTraceId(traceId, databaseId, sessionId), counters) {} - void UpdateLogInfo(const TString& traceId, const TString& database, const TString& sessionId) { - SetOperationInfo(OperationName, ComposeTraceId(traceId, database, sessionId), nullptr); + void UpdateLogInfo(const TString& traceId, const TString& databaseId, const TString& sessionId) { + SetOperationInfo(OperationName, ComposeTraceId(traceId, databaseId, sessionId), nullptr); } private: - static TString ComposeTraceId(const TString& traceId, const TString& database, const TString& sessionId) { - return TStringBuilder() << traceId << ", RequestDatabase: " << database << ", RequestSessionId: " << sessionId; + static TString ComposeTraceId(const TString& traceId, const TString& databaseId, const TString& sessionId) { + return TStringBuilder() << traceId << ", RequestDatabase: " << databaseId << ", RequestSessionId: " << sessionId; } }; @@ -306,9 +306,9 @@ class TCleanupTablesActor : public TSchemeActorBase { class TRefreshPoolStateQuery : public TQueryBase { public: - TRefreshPoolStateQuery(const TString& database, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(__func__, poolId, database, "", counters) - , Database(database) + TRefreshPoolStateQuery(const TString& databaseId, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(__func__, poolId, databaseId, "", counters) + , DatabaseId(databaseId) , PoolId(poolId) , LeaseDuration(leaseDuration) {} @@ -321,14 +321,14 @@ class TRefreshPoolStateQuery : public TQueryBase { TString sql = TStringBuilder() << R"( -- TRefreshPoolStateQuery::OnRunQuery - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; DECLARE $node_id AS Uint32; DECLARE $lease_duration AS Interval; UPDATE `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` SET lease_deadline = CurrentUtcTimestamp() + $lease_duration - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND node_id = $node_id AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) @@ -336,7 +336,7 @@ class TRefreshPoolStateQuery : public TQueryBase { UPDATE `)" << TTablesCreator::GetRunningRequestsPath() << R"(` SET lease_deadline = CurrentUtcTimestamp() + $lease_duration - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND node_id = $node_id AND lease_deadline >= CurrentUtcTimestamp(); @@ -344,8 +344,8 @@ class TRefreshPoolStateQuery : public TQueryBase { NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -364,27 +364,27 @@ class TRefreshPoolStateQuery : public TQueryBase { void OnLeaseUpdated() { TString sql = TStringBuilder() << R"( -- TRefreshPoolStateQuery::OnLeaseUpdated - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; SELECT COUNT(*) AS delayed_requests FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) AND lease_deadline >= CurrentUtcTimestamp(); SELECT COUNT(*) AS running_requests FROM `)" << TTablesCreator::GetRunningRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND lease_deadline >= CurrentUtcTimestamp(); )"; NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -428,7 +428,7 @@ class TRefreshPoolStateQuery : public TQueryBase { } private: - const TString Database; + const TString DatabaseId; const TString PoolId; const TDuration LeaseDuration; @@ -438,9 +438,9 @@ class TRefreshPoolStateQuery : public TQueryBase { class TDelayRequestQuery : public TQueryBase { public: - TDelayRequestQuery(const TString& database, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(__func__, poolId, database, sessionId, counters) - , Database(database) + TDelayRequestQuery(const TString& databaseId, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(__func__, poolId, databaseId, sessionId, counters) + , DatabaseId(databaseId) , PoolId(poolId) , SessionId(sessionId) , StartTime(startTime) @@ -451,7 +451,7 @@ class TDelayRequestQuery : public TQueryBase { void OnRunQuery() override { TString sql = TStringBuilder() << R"( -- TDelayRequestQuery::OnRunQuery - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; DECLARE $start_time AS Timestamp; DECLARE $session_id AS Text; @@ -462,15 +462,15 @@ class TDelayRequestQuery : public TQueryBase { UPSERT INTO `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` (database, pool_id, start_time, session_id, node_id, wait_deadline, lease_deadline) VALUES ( - $database, $pool_id, $start_time, $session_id, $node_id, $wait_deadline, + $database_id, $pool_id, $start_time, $session_id, $node_id, $wait_deadline, CurrentUtcTimestamp() + $lease_duration ); )"; NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -503,7 +503,7 @@ class TDelayRequestQuery : public TQueryBase { } private: - const TString Database; + const TString DatabaseId; const TString PoolId; const TString SessionId; const TInstant StartTime; @@ -514,9 +514,9 @@ class TDelayRequestQuery : public TQueryBase { class TStartFirstDelayedRequestQuery : public TQueryBase { public: - TStartFirstDelayedRequestQuery(const TString& database, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(__func__, poolId, database, "", counters) - , Database(database) + TStartFirstDelayedRequestQuery(const TString& databaseId, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(__func__, poolId, databaseId, "", counters) + , DatabaseId(databaseId) , PoolId(poolId) , LeaseDuration(leaseDuration) {} @@ -526,12 +526,12 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { TString sql = TStringBuilder() << R"( -- TStartFirstDelayedRequestQuery::OnRunQuery - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; SELECT database, pool_id, start_time, session_id, node_id FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) AND lease_deadline >= CurrentUtcTimestamp() @@ -541,8 +541,8 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -583,7 +583,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { } RequestSessionId = *sessionId; - UpdateLogInfo(PoolId, Database, RequestSessionId); + UpdateLogInfo(PoolId, DatabaseId, RequestSessionId); TMaybe startTime = result.ColumnParser("start_time").GetOptionalTimestamp(); if (!startTime) { @@ -598,7 +598,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { void StartQueuedRequest() { TString sql = TStringBuilder() << R"( -- TStartFirstDelayedRequestQuery::StartQueuedRequest - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; DECLARE $start_time AS Timestamp; DECLARE $session_id AS Text; @@ -606,7 +606,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { DECLARE $lease_duration AS Interval; DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND node_id = $node_id AND start_time = $start_time @@ -615,15 +615,15 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { UPSERT INTO `)" << TTablesCreator::GetRunningRequestsPath() << R"(` (database, pool_id, session_id, node_id, lease_deadline) VALUES ( - $database, $pool_id, $session_id, $node_id, + $database_id, $pool_id, $session_id, $node_id, CurrentUtcTimestamp() + $lease_duration ); )"; NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -654,7 +654,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { } private: - const TString Database; + const TString DatabaseId; const TString PoolId; const TDuration LeaseDuration; @@ -665,9 +665,9 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { class TStartRequestQuery : public TQueryBase { public: - TStartRequestQuery(const TString& database, const TString& poolId, const TString& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(__func__, poolId, database, sessionId, counters) - , Database(database) + TStartRequestQuery(const TString& databaseId, const TString& poolId, const TString& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(__func__, poolId, databaseId, sessionId, counters) + , DatabaseId(databaseId) , PoolId(poolId) , SessionId(sessionId) , LeaseDuration(leaseDuration) @@ -676,7 +676,7 @@ class TStartRequestQuery : public TQueryBase { void OnRunQuery() override { TString sql = TStringBuilder() << R"( -- TStartRequestQuery::OnRunQuery - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; DECLARE $session_id AS Text; DECLARE $node_id AS Uint32; @@ -685,15 +685,15 @@ class TStartRequestQuery : public TQueryBase { UPSERT INTO `)" << TTablesCreator::GetRunningRequestsPath() << R"(` (database, pool_id, session_id, node_id, lease_deadline) VALUES ( - $database, $pool_id, $session_id, $node_id, + $database_id, $pool_id, $session_id, $node_id, CurrentUtcTimestamp() + $lease_duration ); )"; NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -720,7 +720,7 @@ class TStartRequestQuery : public TQueryBase { } private: - const TString Database; + const TString DatabaseId; const TString PoolId; const TString SessionId; const TDuration LeaseDuration; @@ -731,9 +731,9 @@ class TStartRequestActor : public TActorBootstrapped { using TStartRequestRetryQuery = TQueryRetryActor; public: - TStartRequestActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) + TStartRequestActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) : ReplyActorId(replyActorId) - , Database(database) + , DatabaseId(databaseId) , PoolId(poolId) , SessionId(sessionId) , LeaseDuration(leaseDuration) @@ -744,9 +744,9 @@ class TStartRequestActor : public TActorBootstrapped { Become(&TStartRequestActor::StateFunc); if (!SessionId) { - Register(new TStartFirstDelayedRequestRetryQuery(SelfId(), Database, PoolId, LeaseDuration, Counters)); + Register(new TStartFirstDelayedRequestRetryQuery(SelfId(), DatabaseId, PoolId, LeaseDuration, Counters)); } else { - Register(new TStartRequestRetryQuery(SelfId(), Database, PoolId, *SessionId, LeaseDuration, Counters)); + Register(new TStartRequestRetryQuery(SelfId(), DatabaseId, PoolId, *SessionId, LeaseDuration, Counters)); } } @@ -760,7 +760,7 @@ class TStartRequestActor : public TActorBootstrapped { private: const TActorId ReplyActorId; - const TString Database; + const TString DatabaseId; const TString PoolId; const std::optional SessionId; const TDuration LeaseDuration; @@ -770,9 +770,9 @@ class TStartRequestActor : public TActorBootstrapped { class TCleanupRequestsQuery : public TQueryBase { public: - TCleanupRequestsQuery(const TString& database, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters) - : TQueryBase(__func__, poolId, database, "", counters) - , Database(database) + TCleanupRequestsQuery(const TString& databaseId, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters) + : TQueryBase(__func__, poolId, databaseId, "", counters) + , DatabaseId(databaseId) , PoolId(poolId) , SessionIds(sessionIds) {} @@ -782,19 +782,19 @@ class TCleanupRequestsQuery : public TQueryBase { -- TCleanupRequestsQuery::OnRunQuery PRAGMA AnsiInForEmptyOrNullableItemsCollections; - DECLARE $database AS Text; + DECLARE $database_id AS Text; DECLARE $pool_id AS Text; DECLARE $node_id AS Uint32; DECLARE $session_ids AS List; DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND node_id = $node_id AND session_id IN $session_ids; DELETE FROM `)" << TTablesCreator::GetRunningRequestsPath() << R"(` - WHERE database = $database + WHERE database = $database_id AND pool_id = $pool_id AND node_id = $node_id AND session_id IN $session_ids; @@ -802,8 +802,8 @@ class TCleanupRequestsQuery : public TQueryBase { NYdb::TParamsBuilder params; params - .AddParam("$database") - .Utf8(Database) + .AddParam("$database_id") + .Utf8(DatabaseId) .Build() .AddParam("$pool_id") .Utf8(PoolId) @@ -832,11 +832,11 @@ class TCleanupRequestsQuery : public TQueryBase { private: TString LogPrefix() const { - return TStringBuilder() << "[TCleanupRequestsQuery] ActorId: " << SelfId() << ", Database: " << Database << ", PoolId: " << PoolId << ", "; + return TStringBuilder() << "[TCleanupRequestsQuery] ActorId: " << SelfId() << ", DatabaseId: " << DatabaseId << ", PoolId: " << PoolId << ", "; } private: - const TString Database; + const TString DatabaseId; const TString PoolId; const std::vector SessionIds; }; @@ -851,20 +851,20 @@ IActor* CreateCleanupTablesActor() { return new TCleanupTablesActor(); } -IActor* CreateRefreshPoolStateActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { - return new TQueryRetryActor(replyActorId, database, poolId, leaseDuration, counters); +IActor* CreateRefreshPoolStateActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { + return new TQueryRetryActor(replyActorId, databaseId, poolId, leaseDuration, counters); } -IActor* CreateDelayRequestActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { - return new TQueryRetryActor, TDuration, NMonitoring::TDynamicCounterPtr>(replyActorId, database, poolId, sessionId, startTime, waitDeadline, leaseDuration, counters); +IActor* CreateDelayRequestActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { + return new TQueryRetryActor, TDuration, NMonitoring::TDynamicCounterPtr>(replyActorId, databaseId, poolId, sessionId, startTime, waitDeadline, leaseDuration, counters); } -IActor* CreateStartRequestActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { - return new TStartRequestActor(replyActorId, database, poolId, sessionId, leaseDuration, counters); +IActor* CreateStartRequestActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { + return new TStartRequestActor(replyActorId, databaseId, poolId, sessionId, leaseDuration, counters); } -IActor* CreateCleanupRequestsActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters) { - return new TQueryRetryActor, NMonitoring::TDynamicCounterPtr>(replyActorId, database, poolId, sessionIds, counters); +IActor* CreateCleanupRequestsActor(const TActorId& replyActorId, const TString& databaseId, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters) { + return new TQueryRetryActor, NMonitoring::TDynamicCounterPtr>(replyActorId, databaseId, poolId, sessionIds, counters); } } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/tables/table_queries.h b/ydb/core/kqp/workload_service/tables/table_queries.h index f44afac3ba74..b7727bbff28a 100644 --- a/ydb/core/kqp/workload_service/tables/table_queries.h +++ b/ydb/core/kqp/workload_service/tables/table_queries.h @@ -12,11 +12,11 @@ NActors::IActor* CreateTablesCreator(); NActors::IActor* CreateCleanupTablesActor(); // Updates pool lease and returns pool description -NActors::IActor* CreateRefreshPoolStateActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreateRefreshPoolStateActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); // Push / Start / Finish requests in pool -NActors::IActor* CreateDelayRequestActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); -NActors::IActor* CreateStartRequestActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); -NActors::IActor* CreateCleanupRequestsActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreateDelayRequestActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreateStartRequestActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreateCleanupRequestsActor(const NActors::TActorId& replyActorId, const TString& databaseId, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters); } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 43dbad70c660..7ff63e246426 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -452,7 +452,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { TPoolStateDescription GetPoolDescription(TDuration leaseDuration = FUTURE_WAIT_TIMEOUT, const TString& poolId = "") const override { const auto& edgeActor = GetRuntime()->AllocateEdgeActor(); - GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, Settings_.DomainName_, poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters)); + GetRuntime()->Register(CreateRefreshPoolStateActor(edgeActor, CanonizePath(Settings_.DomainName_), poolId ? poolId : Settings_.PoolId_, leaseDuration, GetRuntime()->GetAppData().Counters)); auto response = GetRuntime()->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); @@ -498,6 +498,15 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { } } + TEvFetchDatabaseResponse::TPtr FetchDatabase(const TString& database) const override { + const TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); + GetRuntime()->Register(CreateDatabaseFetcherActor(edgeActor, database)); + const auto response = GetRuntime()->GrabEdgeEvent(edgeActor); + UNIT_ASSERT_C(response, "Got empty response from DatabaseFetcherActor"); + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); + return response; + } + // Coomon helpers TTestActorRuntime* GetRuntime() const override { return Server_->GetRuntime(); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index bb0cd7d17938..ceca1831cd61 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -116,6 +116,7 @@ class IYdbSetup : public TThrRefBase { virtual void WaitPoolHandlersCount(i64 finalCount, std::optional initialCount = std::nullopt, TDuration timeout = FUTURE_WAIT_TIMEOUT) const = 0; virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0; virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0; + virtual TEvFetchDatabaseResponse::TPtr FetchDatabase(const TString& database) const = 0; // Coomon helpers virtual TTestActorRuntime* GetRuntime() const = 0; diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp index a004917a2a03..f5972e873f4e 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp @@ -27,7 +27,7 @@ void DelayRequest(TIntrusivePtr ydb, const TString& sessionId, TDurat auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreateDelayRequestActor(edgeActor, settings.DomainName_, settings.PoolId_, sessionId, TInstant::Now(), Nothing(), leaseDuration, runtime->GetAppData().Counters)); + runtime->Register(CreateDelayRequestActor(edgeActor, CanonizePath(settings.DomainName_), settings.PoolId_, sessionId, TInstant::Now(), Nothing(), leaseDuration, runtime->GetAppData().Counters)); auto response = runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); UNIT_ASSERT_VALUES_EQUAL(response->Get()->SessionId, sessionId); @@ -38,7 +38,7 @@ void StartRequest(TIntrusivePtr ydb, const TString& sessionId, TDurat auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreateStartRequestActor(edgeActor, settings.DomainName_, settings.PoolId_, sessionId, leaseDuration, runtime->GetAppData().Counters)); + runtime->Register(CreateStartRequestActor(edgeActor, CanonizePath(settings.DomainName_), settings.PoolId_, sessionId, leaseDuration, runtime->GetAppData().Counters)); auto response = runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); UNIT_ASSERT_VALUES_EQUAL(response->Get()->SessionId, sessionId); diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index bb0ee347e6be..1677cfc5d079 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -422,6 +422,56 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); } + Y_UNIT_TEST(TestCreateResourcePoolOnServerless) { + auto ydb = TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .Create(); + + const auto& serverlessTenant = ydb->GetSettings().GetServerlessTenantName(); + auto settings = TQueryRunnerSettings() + .PoolId("") + .Database(serverlessTenant) + .NodeIndex(1); + + const TString& poolId = "my_pool"; + TSampleQueries::CheckSuccess(ydb->ExecuteQuery(TStringBuilder() << R"( + CREATE RESOURCE POOL )" << poolId << R"( WITH ( + CONCURRENT_QUERY_LIMIT=1, + QUEUE_SIZE=0 + ); + )", settings)); + settings.PoolId(poolId); + + auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings.HangUpDuringExecution(true)); + ydb->WaitQueryExecution(hangingRequest); + + settings.HangUpDuringExecution(false); + + { // Rejected result + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.PoolId(poolId)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << poolId); + } + + { // Check tables + auto result = ydb->ExecuteQuery(R"( + SELECT * FROM `.metadata/workload_manager/running_requests` + )", settings.PoolId(NResourcePool::DEFAULT_POOL_ID).Database(ydb->GetSettings().GetSharedTenantName())); + TSampleQueries::CheckSuccess(result); + + NYdb::TResultSetParser resultSet(result.GetResultSet(0)); + UNIT_ASSERT_C(resultSet.TryNextRow(), "Unexpected row count"); + + const auto& databaseId = resultSet.ColumnParser("database").GetOptionalUtf8(); + UNIT_ASSERT_C(databaseId, "Unexpected database response"); + UNIT_ASSERT_VALUES_EQUAL_C(*databaseId, ydb->FetchDatabase(serverlessTenant)->Get()->DatabaseId, "Unexpected database id"); + } + + ydb->ContinueQueryExecution(hangingRequest); + TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); + } + Y_UNIT_TEST(TestDefaultPoolRestrictions) { auto ydb = TYdbSetupSettings().Create(); @@ -615,22 +665,27 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { UNIT_ASSERT_STRING_CONTAINS(alterResult.GetIssues().ToOneLineString(), "You don't have access permissions for database Root"); } - void CreateSampleResourcePoolClassifier(TIntrusivePtr ydb, const TString& classifierId, const TString& userSID, const TString& poolId) { - ydb->ExecuteSchemeQuery(TStringBuilder() << R"( - GRANT ALL ON `/Root` TO `)" << userSID << R"(`; + void CreateSampleResourcePoolClassifier(TIntrusivePtr ydb, const TString& classifierId, const TQueryRunnerSettings& settings, const TString& poolId) { + TSampleQueries::CheckSuccess(ydb->ExecuteQuery(TStringBuilder() << R"( + GRANT ALL ON `)" << CanonizePath(settings.Database_) << R"(` TO `)" << settings.UserSID_ << R"(`; CREATE RESOURCE POOL )" << poolId << R"( WITH ( CONCURRENT_QUERY_LIMIT=0 ); CREATE RESOURCE POOL CLASSIFIER )" << classifierId << R"( WITH ( RESOURCE_POOL=")" << poolId << R"(", - MEMBER_NAME=")" << userSID << R"(" + MEMBER_NAME=")" << settings.UserSID_ << R"(" ); - )"); + )", TQueryRunnerSettings() + .UserSID(BUILTIN_ACL_METADATA) + .Database(settings.Database_) + .NodeIndex(settings.NodeIndex_) + .PoolId(NResourcePool::DEFAULT_POOL_ID) + )); } TString CreateSampleResourcePoolClassifier(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { const TString& classifierId = "my_pool_classifier"; - CreateSampleResourcePoolClassifier(ydb, classifierId, settings.UserSID_, poolId); + CreateSampleResourcePoolClassifier(ydb, classifierId, settings, poolId); return classifierId; } @@ -662,6 +717,24 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { WaitForFail(ydb, settings, poolId); } + Y_UNIT_TEST(TestCreateResourcePoolClassifierOnServerless) { + auto ydb = TYdbSetupSettings() + .CreateSampleTenants(true) + .EnableResourcePoolsOnServerless(true) + .Create(); + + auto settings = TQueryRunnerSettings() + .PoolId("") + .UserSID("test@user") + .Database(ydb->GetSettings().GetServerlessTenantName()) + .NodeIndex(1); + + const TString& poolId = "my_pool"; + CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + } + Y_UNIT_TEST(TestAlterResourcePoolClassifier) { auto ydb = TYdbSetupSettings().Create(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index a4c6906783b5..57a8fb52aa74 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -553,6 +553,7 @@ message TEvStartKqpTasksRequest { optional string SchedulerGroup = 9; optional double MemoryPoolPercent = 10 [default = 100]; optional string Database = 11; + optional string DatabaseId = 17; optional uint64 LockTxId = 13; optional uint32 LockNodeId = 14; diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h index 74a34997a1c9..65b6759628bc 100644 --- a/ydb/services/metadata/manager/abstract.h +++ b/ydb/services/metadata/manager/abstract.h @@ -76,6 +76,7 @@ class IOperationsManager { private: YDB_ACCESSOR_DEF(std::optional, UserToken); YDB_ACCESSOR_DEF(TString, Database); + YDB_ACCESSOR_DEF(TString, DatabaseId); using TActorSystemPtr = TActorSystem*; YDB_ACCESSOR_DEF(TActorSystemPtr, ActorSystem); YDB_ACCESSOR_DEF(NSQLTranslation::TTranslationSettings, TranslationSettings); diff --git a/ydb/tools/query_replay/query_compiler.cpp b/ydb/tools/query_replay/query_compiler.cpp index 40046ca100ff..fc613017a7dc 100644 --- a/ydb/tools/query_replay/query_compiler.cpp +++ b/ydb/tools/query_replay/query_compiler.cpp @@ -254,9 +254,11 @@ class TReplayCompileActor: public TActorBootstrapped { } TKqpQuerySettings settings(queryType); + const auto& database = ReplayDetails["query_database"].GetStringSafe(); Query = std::make_unique( ReplayDetails["query_cluster"].GetStringSafe(), - ReplayDetails["query_database"].GetStringSafe(), + database, + database, queryText, settings, !queryParameterTypes.empty() @@ -288,7 +290,7 @@ class TReplayCompileActor: public TActorBootstrapped { counters->Counters = new TKqpCounters(c); counters->TxProxyMon = new NTxProxy::TTxProxyMon(c); - Gateway = CreateKikimrIcGateway(Query->Cluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, Query->Database, std::move(loader), + Gateway = CreateKikimrIcGateway(Query->Cluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, Query->Database, Query->DatabaseId, std::move(loader), TlsActivationContext->ExecutorThread.ActorSystem, SelfId().NodeId(), counters); auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr, {}}); KqpHost = CreateKqpHost(Gateway, Query->Cluster, Query->Database, Config, ModuleResolverState->ModuleResolver, diff --git a/ydb/tools/query_replay_yt/query_compiler.cpp b/ydb/tools/query_replay_yt/query_compiler.cpp index 68a67de79611..4c9ae292e7cf 100644 --- a/ydb/tools/query_replay_yt/query_compiler.cpp +++ b/ydb/tools/query_replay_yt/query_compiler.cpp @@ -591,9 +591,11 @@ class TReplayCompileActor: public TActorBootstrapped { QueryId = ReplayDetails["query_id"].GetStringSafe(); TKqpQuerySettings settings(queryType); + const auto& database = ReplayDetails["query_database"].GetStringSafe(); Query = std::make_unique( ReplayDetails["query_cluster"].GetStringSafe(), - ReplayDetails["query_database"].GetStringSafe(), + database, + database, queryText, settings, !queryParameterTypes.empty() @@ -623,7 +625,7 @@ class TReplayCompileActor: public TActorBootstrapped { counters->Counters = new TKqpCounters(c); counters->TxProxyMon = new NTxProxy::TTxProxyMon(c); - Gateway = CreateKikimrIcGateway(Query->Cluster, queryType, Query->Database, std::move(loader), + Gateway = CreateKikimrIcGateway(Query->Cluster, queryType, Query->Database, Query->DatabaseId, std::move(loader), TlsActivationContext->ExecutorThread.ActorSystem, SelfId().NodeId(), counters); auto federatedQuerySetup = std::make_optional({HttpGateway, nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr, {}}); KqpHost = CreateKqpHost(Gateway, Query->Cluster, Query->Database, Config, ModuleResolverState->ModuleResolver,