Skip to content

Commit

Permalink
YQ-3689 added database id for workload manager (#9768)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Oct 7, 2024
1 parent 737f797 commit 54d83fb
Show file tree
Hide file tree
Showing 48 changed files with 530 additions and 356 deletions.
28 changes: 15 additions & 13 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId)
: DatabaseId(databaseId)
, PoolId(poolId)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, UserToken(userToken)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
TString PoolId; // Can be changed to default pool id
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Expand All @@ -52,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
};

struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: Database(database)
TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, Duration(duration)
, CpuConsumed(cpuConsumed)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
const TString PoolId;
const TDuration Duration;
Expand All @@ -78,30 +78,32 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
};

struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: Database(database)
TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: DatabaseId(databaseId)
, PoolId(poolId)
, Config(config)
, SecurityObject(securityObject)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
const std::optional<NResourcePool::TPoolSettings> Config;
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues)
: Status(status)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, PathId(pathId)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const TString DatabaseId;
const bool Serverless;
const TPathId PathId;
const NYql::TIssues Issues;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_user_request_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace NKikimr::NKqp {

void TUserRequestContext::Out(IOutputStream& o) const {
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", DatabaseId: " << DatabaseId << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
}

void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
resultMap["TraceId"] = ctx.TraceId;
resultMap["Database"] = ctx.Database;
resultMap["DatabaseId"] = ctx.DatabaseId;
resultMap["SessionId"] = ctx.SessionId;
resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NKikimr::NKqp {
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
TString TraceId;
TString Database;
TString DatabaseId;
TString SessionId;
TString CurrentExecutionId;
TString CustomerSuppliedId;
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/simple/query_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

namespace NKikimr::NKqp {

TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings)
: Cluster(cluster)
, Database(database)
, DatabaseId(databaseId)
, Text(text)
, Settings(settings)
, QueryParameterTypes(queryParameterTypes)
Expand Down Expand Up @@ -42,6 +43,7 @@ bool TKqpQueryId::IsSql() const {
bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
if (!(Cluster == other.Cluster &&
Database == other.Database &&
DatabaseId == other.DatabaseId &&
UserSid == other.UserSid &&
Text == other.Text &&
Settings == other.Settings &&
Expand Down Expand Up @@ -79,6 +81,7 @@ TString TKqpQueryId::SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "Cluster: " << Cluster << ", "
<< "Database: " << Database << ", "
<< "DatabaseId: " << DatabaseId << ", "
<< "UserSid: " << UserSid << ", "
<< "Text: " << EscapeC(Text) << ", "
<< "Settings: " << Settings.SerializeToString() << ", ";
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/simple/query_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NKikimr::NKqp {
struct TKqpQueryId {
TString Cluster;
TString Database;
TString DatabaseId;
TString UserSid;
TString Text;
TKqpQuerySettings Settings;
Expand All @@ -21,7 +22,7 @@ struct TKqpQueryId {
TGUCSettings GUCSettings;

public:
TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class TKqpQueryCache {
}
}
}
return TKqpQueryId{query.Cluster, query.Database, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
}

TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque

request.SetSchedulerGroup(UserRequestContext->PoolId);
request.SetDatabase(Database);
request.SetDatabaseId(UserRequestContext->DatabaseId);
if (UserRequestContext->PoolConfig.has_value()) {
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
request.SetPoolMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
, RequestType(requestType)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
{
YQL_ENSURE(RequestContext);
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);

Expand Down Expand Up @@ -403,6 +404,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {

NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
context.SetDatabase(Database);
context.SetDatabaseId(RequestContext->DatabaseId);
context.SetActorSystem(actorSystem);
if (UserToken) {
context.SetUserToken(*UserToken);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
[[maybe_unused]]
TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) {
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem();
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(),
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", "/Root", TKqpGatewaySettings(),
actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0),
TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig->GetQueryServiceConfig());
}
Expand Down
24 changes: 12 additions & 12 deletions ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,27 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
using TBase = NKikimr::TQueryBase;

public:
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
TRanksCheckerActor(const TString& databaseId, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
, Database(database)
, DatabaseId(databaseId)
, RanksToCheck(ranksToCheck)
{
TxId = transactionId;
SetOperationInfo(__func__, Database);
SetOperationInfo(__func__, DatabaseId);
}

void OnRunQuery() override {
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();

TStringBuilder sql = TStringBuilder() << R"(
-- TRanksCheckerActor::OnRunQuery
DECLARE $database AS Text;
DECLARE $database_id AS Text;
)";

NYdb::TParamsBuilder params;
params
.AddParam("$database")
.Utf8(CanonizePath(Database))
.AddParam("$database_id")
.Utf8(CanonizePath(DatabaseId))
.Build();

if (!RanksToCheck.empty()) {
Expand All @@ -79,7 +79,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
SELECT
rank, name
FROM `)" << tablePath << R"(`
WHERE database = $database
WHERE database = $database_id
AND rank IN $ranks;
)";

Expand All @@ -97,7 +97,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
MAX(rank) AS MaxRank,
COUNT(*) AS NumberClassifiers
FROM `)" << tablePath << R"(`
WHERE database = $database;
WHERE database = $database_id;
)";

RunDataQuery(sql, &params, TTxControl::ContinueTx());
Expand Down Expand Up @@ -149,7 +149,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
}

private:
const TString Database;
const TString DatabaseId;
const std::unordered_map<i64, TString> RanksToCheck;

ui64 ExpectedResultSets = 1;
Expand Down Expand Up @@ -235,8 +235,8 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
TResourcePoolClassifierConfig object;
TResourcePoolClassifierConfig::TDecoder::DeserializeFromRecord(object, objectRecord);

if (!snapshot->GetClassifierConfig(CanonizePath(object.GetDatabase()), object.GetName())) {
FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database " << object.GetDatabase());
if (!snapshot->GetClassifierConfig(object.GetDatabase(), object.GetName())) {
FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database with id " << object.GetDatabase());
return;
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
}

Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames
SelfId(), Context.GetExternalData().GetDatabaseId(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace NResourcePool;

NMetadata::NInternal::TTableRecord GetResourcePoolClassifierRecord(const NYql::TObjectSettingsImpl& settings, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context) {
NMetadata::NInternal::TTableRecord result;
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(CanonizePath(context.GetExternalData().GetDatabase())));
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(context.GetExternalData().GetDatabaseId()));
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Name, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
return result;
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/gateway/kqp_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class IKqpGateway : public NYql::IKikimrGateway {

public:
virtual TString GetDatabase() = 0;
virtual TString GetDatabaseId() = 0;
virtual bool GetDomainLoginOnly() = 0;
virtual TMaybe<TString> GetDomainName() = 0;

Expand Down Expand Up @@ -232,7 +233,7 @@ class IKqpGateway : public NYql::IKikimrGateway {
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
};

TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, const TString& databaseId,
std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem,
ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig());

Expand Down
Loading

0 comments on commit 54d83fb

Please sign in to comment.