Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3689 added database id for workload manager #9768

Merged
28 changes: 15 additions & 13 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId)
: DatabaseId(databaseId)
, PoolId(poolId)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, UserToken(userToken)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
TString PoolId; // Can be changed to default pool id
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Expand All @@ -52,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
};

struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: Database(database)
TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, Duration(duration)
, CpuConsumed(cpuConsumed)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
const TString PoolId;
const TDuration Duration;
Expand All @@ -78,30 +78,32 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
};

struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: Database(database)
TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: DatabaseId(databaseId)
, PoolId(poolId)
, Config(config)
, SecurityObject(securityObject)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
const std::optional<NResourcePool::TPoolSettings> Config;
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues)
: Status(status)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, PathId(pathId)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const TString DatabaseId;
const bool Serverless;
const TPathId PathId;
const NYql::TIssues Issues;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_user_request_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace NKikimr::NKqp {

void TUserRequestContext::Out(IOutputStream& o) const {
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", DatabaseId: " << DatabaseId << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
}

void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
resultMap["TraceId"] = ctx.TraceId;
resultMap["Database"] = ctx.Database;
resultMap["DatabaseId"] = ctx.DatabaseId;
resultMap["SessionId"] = ctx.SessionId;
resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NKikimr::NKqp {
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
TString TraceId;
TString Database;
TString DatabaseId;
TString SessionId;
TString CurrentExecutionId;
TString CustomerSuppliedId;
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/simple/query_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

namespace NKikimr::NKqp {

TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings)
: Cluster(cluster)
, Database(database)
, DatabaseId(databaseId)
, Text(text)
, Settings(settings)
, QueryParameterTypes(queryParameterTypes)
Expand Down Expand Up @@ -42,6 +43,7 @@ bool TKqpQueryId::IsSql() const {
bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
if (!(Cluster == other.Cluster &&
Database == other.Database &&
DatabaseId == other.DatabaseId &&
UserSid == other.UserSid &&
Text == other.Text &&
Settings == other.Settings &&
Expand Down Expand Up @@ -79,6 +81,7 @@ TString TKqpQueryId::SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "Cluster: " << Cluster << ", "
<< "Database: " << Database << ", "
<< "DatabaseId: " << DatabaseId << ", "
<< "UserSid: " << UserSid << ", "
<< "Text: " << EscapeC(Text) << ", "
<< "Settings: " << Settings.SerializeToString() << ", ";
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/simple/query_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NKikimr::NKqp {
struct TKqpQueryId {
TString Cluster;
TString Database;
TString DatabaseId;
TString UserSid;
TString Text;
TKqpQuerySettings Settings;
Expand All @@ -21,7 +22,7 @@ struct TKqpQueryId {
TGUCSettings GUCSettings;

public:
TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class TKqpQueryCache {
}
}
}
return TKqpQueryId{query.Cluster, query.Database, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
}

TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque

request.SetSchedulerGroup(UserRequestContext->PoolId);
request.SetDatabase(Database);
request.SetDatabaseId(UserRequestContext->DatabaseId);
if (UserRequestContext->PoolConfig.has_value()) {
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
request.SetPoolMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
, RequestType(requestType)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
{
YQL_ENSURE(RequestContext);
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);

Expand Down Expand Up @@ -403,6 +404,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {

NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
context.SetDatabase(Database);
context.SetDatabaseId(RequestContext->DatabaseId);
context.SetActorSystem(actorSystem);
if (UserToken) {
context.SetUserToken(*UserToken);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
[[maybe_unused]]
TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) {
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem();
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(),
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", "/Root", TKqpGatewaySettings(),
actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0),
TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig->GetQueryServiceConfig());
}
Expand Down
24 changes: 12 additions & 12 deletions ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,27 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
using TBase = NKikimr::TQueryBase;

public:
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
TRanksCheckerActor(const TString& databaseId, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
, Database(database)
, DatabaseId(databaseId)
, RanksToCheck(ranksToCheck)
{
TxId = transactionId;
SetOperationInfo(__func__, Database);
SetOperationInfo(__func__, DatabaseId);
}

void OnRunQuery() override {
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();

TStringBuilder sql = TStringBuilder() << R"(
-- TRanksCheckerActor::OnRunQuery
DECLARE $database AS Text;
DECLARE $database_id AS Text;
)";

NYdb::TParamsBuilder params;
params
.AddParam("$database")
.Utf8(CanonizePath(Database))
.AddParam("$database_id")
.Utf8(CanonizePath(DatabaseId))
.Build();

if (!RanksToCheck.empty()) {
Expand All @@ -79,7 +79,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
SELECT
rank, name
FROM `)" << tablePath << R"(`
WHERE database = $database
WHERE database = $database_id
AND rank IN $ranks;
)";

Expand All @@ -97,7 +97,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
MAX(rank) AS MaxRank,
COUNT(*) AS NumberClassifiers
FROM `)" << tablePath << R"(`
WHERE database = $database;
WHERE database = $database_id;
)";

RunDataQuery(sql, &params, TTxControl::ContinueTx());
Expand Down Expand Up @@ -149,7 +149,7 @@ class TRanksCheckerActor : public NKikimr::TQueryBase {
}

private:
const TString Database;
const TString DatabaseId;
const std::unordered_map<i64, TString> RanksToCheck;

ui64 ExpectedResultSets = 1;
Expand Down Expand Up @@ -235,8 +235,8 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
TResourcePoolClassifierConfig object;
TResourcePoolClassifierConfig::TDecoder::DeserializeFromRecord(object, objectRecord);

if (!snapshot->GetClassifierConfig(CanonizePath(object.GetDatabase()), object.GetName())) {
FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database " << object.GetDatabase());
if (!snapshot->GetClassifierConfig(object.GetDatabase(), object.GetName())) {
FailAndPassAway(TStringBuilder() << "Classifier with name " << object.GetName() << " not found in database with id " << object.GetDatabase());
return;
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
}

Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames
SelfId(), Context.GetExternalData().GetDatabaseId(), AlterContext.GetSessionId(), AlterContext.GetTransactionId(), ranksToNames
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace NResourcePool;

NMetadata::NInternal::TTableRecord GetResourcePoolClassifierRecord(const NYql::TObjectSettingsImpl& settings, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context) {
NMetadata::NInternal::TTableRecord result;
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(CanonizePath(context.GetExternalData().GetDatabase())));
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Database, NMetadata::NInternal::TYDBValue::Utf8(context.GetExternalData().GetDatabaseId()));
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::Name, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
return result;
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/gateway/kqp_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class IKqpGateway : public NYql::IKikimrGateway {

public:
virtual TString GetDatabase() = 0;
virtual TString GetDatabaseId() = 0;
virtual bool GetDomainLoginOnly() = 0;
virtual TMaybe<TString> GetDomainName() = 0;

Expand Down Expand Up @@ -232,7 +233,7 @@ class IKqpGateway : public NYql::IKikimrGateway {
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
};

TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, const TString& databaseId,
std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem,
ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig());

Expand Down
Loading
Loading