diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index aa93ee7cf36e..fab575f53b3d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -28,240 +28,6 @@ using namespace NKikimrConfig; using namespace NYql; -class TKqpQueryCache { -public: - TKqpQueryCache(size_t size, TDuration ttl) - : List(size) - , Ttl(ttl) {} - - void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) { - Y_ENSURE(compileResult->Query); - auto& query = *compileResult->Query; - - YQL_ENSURE(compileResult->PreparedQuery); - - auto queryIt = QueryIndex.emplace(query, compileResult->Uid); - if (!queryIt.second) { - EraseByUid(compileResult->Uid); - QueryIndex.erase(query); - } - Y_ENSURE(queryIt.second); - } - - void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) { - Y_ENSURE(compileResult->Query); - Y_ENSURE(compileResult->GetAst()); - - AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid); - } - - bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) { - if (!isPerStatementExecution) { - InsertQuery(compileResult); - } - if (isEnableAstCache && compileResult->GetAst()) { - InsertAst(compileResult); - } - - auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl}); - Y_ABORT_UNLESS(it.second); - - TItem* item = &const_cast(*it.first); - auto removedItem = List.Insert(item); - - IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); - - if (removedItem) { - DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize()); - - auto queryId = *removedItem->Value.CompileResult->Query; - QueryIndex.erase(queryId); - if (removedItem->Value.CompileResult->GetAst()) { - AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst())); - } - auto indexIt = Index.find(*removedItem); - if (indexIt != Index.end()) { - Index.erase(indexIt); - } - } - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - - return removedItem != nullptr; - } - - void AttachReplayMessage(const TString uid, TString replayMessage) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TItem* item = &const_cast(*it); - DecBytes(item->Value.ReplayMessage.size()); - item->Value.ReplayMessage = replayMessage; - item->Value.LastReplayTime = TInstant::Now(); - IncBytes(replayMessage.size()); - } - } - - TString ReplayMessageByUid(const TString uid, TDuration timeout) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TInstant& lastReplayTime = const_cast(*it).Value.LastReplayTime; - TInstant now = TInstant::Now(); - if (lastReplayTime + timeout < now) { - lastReplayTime = now; - return it->Value.ReplayMessage; - } - } - return ""; - } - - TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) { - auto it = Index.find(TItem(uid)); - if (it != Index.end()) { - TItem* item = &const_cast(*it); - if (promote) { - item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl; - List.Promote(item); - } - - return item->Value.CompileResult; - } - - return nullptr; - } - - void Replace(const TKqpCompileResult::TConstPtr& compileResult) { - auto it = Index.find(TItem(compileResult->Uid)); - if (it != Index.end()) { - TItem& item = const_cast(*it); - item.Value.CompileResult = compileResult; - } - } - - TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) { - Y_ABORT_UNLESS(ast.Root); - std::shared_ptr> astPgParams; - if (query.QueryParameterTypes || ast.PgAutoParamValues) { - astPgParams = std::make_shared>(); - if (query.QueryParameterTypes) { - for (const auto& [name, param] : *query.QueryParameterTypes) { - astPgParams->insert({name, param}); - } - } - if (ast.PgAutoParamValues) { - const auto& params = dynamic_cast(ast.PgAutoParamValues.Get())->Values; - for (const auto& [name, param] : params) { - astPgParams->insert({name, param.Gettype()}); - } - } - } - return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; - } - - TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { - auto uid = QueryIndex.FindPtr(query); - if (!uid) { - return nullptr; - } - - return FindByUid(*uid, promote); - } - - TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) { - auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast)); - if (!uid) { - return nullptr; - } - - return FindByUid(*uid, promote); - } - - bool EraseByUid(const TString& uid) { - auto it = Index.find(TItem(uid)); - if (it == Index.end()) { - return false; - } - - TItem* item = &const_cast(*it); - List.Erase(item); - - DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); - DecBytes(item->Value.ReplayMessage.size()); - - Y_ABORT_UNLESS(item->Value.CompileResult); - Y_ABORT_UNLESS(item->Value.CompileResult->Query); - auto queryId = *item->Value.CompileResult->Query; - QueryIndex.erase(queryId); - if (item->Value.CompileResult->GetAst()) { - AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst())); - } - - Index.erase(it); - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - return true; - } - - size_t Size() const { - return Index.size(); - } - - ui64 Bytes() const { - return ByteSize; - } - - size_t EraseExpiredQueries() { - auto prevSize = Size(); - - auto now = TAppData::TimeProvider->Now(); - while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) { - EraseByUid(List.GetOldest()->Key); - } - - Y_ABORT_UNLESS(List.GetSize() == Index.size()); - return prevSize - Size(); - } - - void Clear() { - List = TList(List.GetMaxSize()); - Index.clear(); - QueryIndex.clear(); - AstIndex.clear(); - ByteSize = 0; - } - -private: - void DecBytes(ui64 bytes) { - if (bytes > ByteSize) { - ByteSize = 0; - } else { - ByteSize -= bytes; - } - } - - void IncBytes(ui64 bytes) { - ByteSize += bytes; - } - -private: - struct TCacheEntry { - TKqpCompileResult::TConstPtr CompileResult; - TInstant ExpiredAt; - TString ReplayMessage = ""; - TInstant LastReplayTime = TInstant::Zero(); - }; - - using TList = TLRUList; - using TItem = TList::TItem; - -private: - TList List; - THashSet Index; - THashMap> QueryIndex; - THashMap> AstIndex; - ui64 ByteSize = 0; - TDuration Ttl; -}; - struct TKqpCompileSettings { TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE) @@ -449,18 +215,20 @@ class TKqpCompileService : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_COMPILE_SERVICE; } - TKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, + TKqpCompileService( + TKqpQueryCachePtr queryCache, + const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, std::shared_ptr queryReplayFactory, std::optional federatedQuerySetup ) - : TableServiceConfig(tableServiceConfig) + : QueryCache(std::move(queryCache)) + , TableServiceConfig(tableServiceConfig) , QueryServiceConfig(queryServiceConfig) , KqpSettings(kqpSettings) , ModuleResolverState(moduleResolverState) , Counters(counters) - , QueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec())) , RequestsQueue(TableServiceConfig.GetCompileRequestQueueSize()) , QueryReplayFactory(std::move(queryReplayFactory)) , FederatedQuerySetup(federatedQuerySetup) @@ -578,7 +346,7 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams || TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) { - QueryCache.Clear(); + QueryCache->Clear(); LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Query cache was invalidated due to config change"); @@ -632,24 +400,24 @@ class TKqpCompileService : public TActorBootstrapped { << ", split: " << request.Split << *request.UserRequestContext); - *Counters->CompileQueryCacheSize = QueryCache.Size(); - *Counters->CompileQueryCacheBytes = QueryCache.Bytes(); - auto userSid = request.UserToken->GetUserSID(); auto dbCounters = request.DbCounters; - if (request.Uid) { - Counters->ReportCompileRequestGet(dbCounters); + auto compileResult = QueryCache->Find( + request.Uid, + request.Query, + request.TempTablesState, + request.KeepInCache, + request.UserToken->GetUserSID(), + Counters, + dbCounters, + ev->Sender, + ctx); - auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); - if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { - compileResult = nullptr; - } + if (request.Uid) { if (compileResult) { Y_ENSURE(compileResult->Query); if (compileResult->Query->UserSid == userSid) { - Counters->ReportQueryCacheHit(dbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" << ", sender: " << ev->Sender << ", queryUid: " << *request.Uid); @@ -663,38 +431,18 @@ class TKqpCompileService : public TActorBootstrapped { << ", expected sid: " << compileResult->Query->UserSid << ", actual sid: " << userSid); } - } - - Counters->ReportQueryCacheHit(dbCounters, false); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid); - - NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); - ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; - } - - Counters->ReportCompileRequestCompile(dbCounters); - - Y_ENSURE(request.Query); - auto& query = *request.Query; - - if (query.UserSid.empty()) { - query.UserSid = userSid; - } else { - Y_ENSURE(query.UserSid == userSid); - } - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " << query.SerializeToString()); - auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); - if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { - compileResult = nullptr; - } + } else { + Counters->ReportQueryCacheHit(dbCounters, false); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Query not found" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid); - if (compileResult) { - Counters->ReportQueryCacheHit(dbCounters, true); + NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Query not found: " << *request.Uid); + ReplyError(ev->Sender, *request.Uid, Ydb::StatusIds::NOT_FOUND, {issue}, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; + } + } else if (compileResult) { + Y_ENSURE(request.Query); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" << ", sender: " << ev->Sender @@ -704,6 +452,8 @@ class TKqpCompileService : public TActorBootstrapped { return; } + Counters->ReportCompileRequestCompile(dbCounters); + CollectDiagnostics = request.CollectDiagnostics; LWTRACK(KqpCompileServiceEnqueued, @@ -768,7 +518,7 @@ class TKqpCompileService : public TActorBootstrapped { auto dbCounters = request.DbCounters; Counters->ReportRecompileRequestGet(dbCounters); - TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false); + TKqpCompileResult::TConstPtr compileResult = QueryCache->FindByUid(request.Uid, false); if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { compileResult = nullptr; } @@ -879,7 +629,7 @@ class TKqpCompileService : public TActorBootstrapped { if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) { QueryReplayBackend->Collect(*ev->Get()->ReplayMessage); - QueryCache.AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage); + QueryCache->AttachReplayMessage(compileRequest.Uid, *ev->Get()->ReplayMessage); } auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); @@ -890,8 +640,8 @@ class TKqpCompileService : public TActorBootstrapped { } } else { if (!hasTempTablesNameClashes) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.EraseByUid(compileResult->Uid); + if (QueryCache->FindByUid(compileResult->Uid, false)) { + QueryCache->EraseByUid(compileResult->Uid); } } } @@ -929,13 +679,13 @@ class TKqpCompileService : public TActorBootstrapped { auto dbCounters = request.DbCounters; Counters->ReportCompileRequestInvalidate(dbCounters); - QueryCache.EraseByUid(request.Uid); + QueryCache->EraseByUid(request.Uid); } void HandleTtlTimer(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Received check queries TTL timeout"); - auto evicted = QueryCache.EraseExpiredQueries(); + auto evicted = QueryCache->EraseExpiredQueries(); if (evicted != 0) { Counters->CompileQueryCacheEvicted->Add(evicted); } @@ -943,30 +693,17 @@ class TKqpCompileService : public TActorBootstrapped { StartCheckQueriesTtlTimer(); } - bool HasTempTablesNameClashes( - TKqpCompileResult::TConstPtr compileResult, - TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) { - if (!compileResult) { - return false; - } - if (!compileResult->PreparedQuery) { - return false; - } - - return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); - } - void UpdateQueryCache(const TActorContext& ctx, TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.Replace(compileResult); + if (QueryCache->FindByUid(compileResult->Uid, false)) { + QueryCache->Replace(compileResult); } else if (keepInCache) { if (compileResult->Query) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert query into compile cache, queryId: " << compileResult->Query->SerializeToString()); - if (QueryCache.FindByQuery(*compileResult->Query, keepInCache)) { + if (QueryCache->FindByQuery(*compileResult->Query, keepInCache)) { LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Trying to insert query into compile cache when it is already there"); } } - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) { + if (QueryCache->Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) { Counters->CompileQueryCacheEvicted->Inc(); } if (compileResult->Query && isQueryActionPrepare) { @@ -983,7 +720,7 @@ class TKqpCompileService : public TActorBootstrapped { YQL_ENSURE(queryAst.Ast->Root); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by ast, queryId: " << compileRequest.Query.SerializeToString() << ", ast: " << queryAst.Ast->Root->ToString()); - auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache); + auto compileResult = QueryCache->FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache); if (!compileRequest.FindInCache || HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { compileResult = nullptr; @@ -1067,17 +804,17 @@ class TKqpCompileService : public TActorBootstrapped { queryParameterTypes->insert({param.GetName(), paramType}); } query.QueryParameterTypes = queryParameterTypes; - if (QueryCache.FindByQuery(query, keepInCache)) { + if (QueryCache->FindByQuery(query, keepInCache)) { return false; } - if (compileResult->GetAst() && QueryCache.FindByAst(query, *compileResult->GetAst(), keepInCache)) { + if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) { return false; } auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst); newCompileResult->AllowCache = compileResult->AllowCache; newCompileResult->PreparedQuery = compileResult->PreparedQuery; LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString()); - return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution); + return QueryCache->Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution); } void ProcessQueue(const TActorContext& ctx) { @@ -1158,7 +895,7 @@ class TKqpCompileService : public TActorBootstrapped { TKqpStatsCompile stats; stats.FromCache = true; - if (auto replayMessage = QueryCache.ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) { + if (auto replayMessage = QueryCache->ReplayMessageByUid(compileResult->Uid, TDuration::Seconds(TableServiceConfig.GetQueryReplayCacheUploadTTLSec()))) { QueryReplayBackend->Collect(replayMessage); } @@ -1219,6 +956,8 @@ class TKqpCompileService : public TActorBootstrapped { } private: + TKqpQueryCachePtr QueryCache; + TTableServiceConfig TableServiceConfig; TQueryServiceConfig QueryServiceConfig; TKqpSettings::TConstPtr KqpSettings; @@ -1226,7 +965,6 @@ class TKqpCompileService : public TActorBootstrapped { TIntrusivePtr Counters; THolder QueryReplayBackend; - TKqpQueryCache QueryCache; TKqpRequestsQueue RequestsQueue; std::shared_ptr QueryReplayFactory; std::optional FederatedQuerySetup; @@ -1234,13 +972,315 @@ class TKqpCompileService : public TActorBootstrapped { bool CollectDiagnostics = false; }; -IActor* CreateKqpCompileService(const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, +// QueryCache + +bool TKqpQueryCache::Insert( + const TKqpCompileResult::TConstPtr& compileResult, + bool isEnableAstCache, + bool isPerStatementExecution) +{ + TGuard guard(Lock); + + if (!isPerStatementExecution) { + InsertQuery(compileResult); + } + if (isEnableAstCache && compileResult->GetAst()) { + InsertAst(compileResult); + } + + auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl}); + Y_ABORT_UNLESS(it.second); + + TItem* item = &const_cast(*it.first); + auto removedItem = List.Insert(item); + + IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); + + if (removedItem) { + DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize()); + + auto queryId = *removedItem->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (removedItem->Value.CompileResult->GetAst()) { + AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst())); + } + auto indexIt = Index.find(*removedItem); + if (indexIt != Index.end()) { + Index.erase(indexIt); + } + } + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + + return removedItem != nullptr; +} + +void TKqpQueryCache::AttachReplayMessage(const TString uid, TString replayMessage) { + TGuard guard(Lock); + + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TItem* item = &const_cast(*it); + DecBytes(item->Value.ReplayMessage.size()); + item->Value.ReplayMessage = replayMessage; + item->Value.LastReplayTime = TInstant::Now(); + IncBytes(replayMessage.size()); + } +} + +TString TKqpQueryCache::ReplayMessageByUid(const TString uid, TDuration timeout) { + TGuard guard(Lock); + + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TInstant& lastReplayTime = const_cast(*it).Value.LastReplayTime; + TInstant now = TInstant::Now(); + if (lastReplayTime + timeout < now) { + lastReplayTime = now; + return it->Value.ReplayMessage; + } + } + return ""; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByUidImpl(const TString& uid, bool promote) { + auto it = Index.find(TItem(uid)); + if (it != Index.end()) { + TItem* item = &const_cast(*it); + if (promote) { + item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl; + List.Promote(item); + } + + return item->Value.CompileResult; + } + + return nullptr; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByQueryImpl(const TKqpQueryId& query, bool promote) { + auto uid = QueryIndex.FindPtr(query); + if (!uid) { + return nullptr; + } + + // we're holding read and assume it's recursive + return FindByUidImpl(*uid, promote); +} + +bool TKqpQueryCache::EraseByUidImpl(const TString& uid) { + auto it = Index.find(TItem(uid)); + if (it == Index.end()) { + return false; + } + + TItem* item = &const_cast(*it); + List.Erase(item); + + DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize()); + DecBytes(item->Value.ReplayMessage.size()); + + Y_ABORT_UNLESS(item->Value.CompileResult); + Y_ABORT_UNLESS(item->Value.CompileResult->Query); + auto queryId = *item->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (item->Value.CompileResult->GetAst()) { + AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst())); + } + + Index.erase(it); + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + return true; +} + +void TKqpQueryCache::Replace(const TKqpCompileResult::TConstPtr& compileResult) { + TGuard guard(Lock); + + auto it = Index.find(TItem(compileResult->Uid)); + if (it != Index.end()) { + TItem& item = const_cast(*it); + item.Value.CompileResult = compileResult; + } +} + +// find by either uid or query +TKqpCompileResult::TConstPtr TKqpQueryCache::Find( + const TMaybe& uid, + TMaybe& query, + TKqpTempTablesState::TConstPtr tempTablesState, + bool promote, + const NACLib::TSID& userSid, + TIntrusivePtr counters, + TKqpDbCountersPtr& dbCounters, + const TActorId& sender, + const TActorContext& ctx) +{ + TGuard guard(Lock); + + *counters->CompileQueryCacheSize = SizeImpl(); + *counters->CompileQueryCacheBytes = BytesImpl(); + + if (uid) { + counters->ReportCompileRequestGet(dbCounters); + + auto compileResult = FindByUidImpl(*uid, promote); + if (HasTempTablesNameClashes(compileResult, tempTablesState)) { + compileResult = nullptr; + } + + if (compileResult) { + Y_ENSURE(compileResult->Query); + if (compileResult->Query->UserSid == userSid) { + counters->ReportQueryCacheHit(dbCounters, true); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" + << ", sender: " << sender + << ", queryUid: " << *uid); + + return compileResult; + } else { + LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" + << ", sender: " << sender + << ", queryUid: " << *uid + << ", expected sid: " << compileResult->Query->UserSid + << ", actual sid: " << userSid); + } + } + + return nullptr; + } + + Y_ENSURE(query); + + if (query->UserSid.empty()) { + query->UserSid = userSid; + } else { + Y_ENSURE(query->UserSid == userSid); + } + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " + << query->SerializeToString()); + auto compileResult = FindByQueryImpl(*query, promote); + if (HasTempTablesNameClashes(compileResult, tempTablesState)) { + return nullptr; + } + + if (compileResult) { + counters->ReportQueryCacheHit(dbCounters, true); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" + << ", sender: " << sender + << ", queryUid: " << compileResult->Uid); + + return compileResult; + } + + // note, we don't report cache miss, because it's up to caller to decide what to do: + // in particular, session actor will go to the compile service, which will actually report the miss. + + return nullptr; +} + +TKqpCompileResult::TConstPtr TKqpQueryCache::FindByAst( + const TKqpQueryId& query, + const NYql::TAstParseResult& ast, + bool promote) +{ + TGuard guard(Lock); + + auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast)); + if (!uid) { + return nullptr; + } + + return FindByUidImpl(*uid, promote); +} + +size_t TKqpQueryCache::EraseExpiredQueries() { + TGuard guard(Lock); + + auto prevSize = SizeImpl(); + + auto now = TAppData::TimeProvider->Now(); + while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) { + EraseByUidImpl(List.GetOldest()->Key); + } + + Y_ABORT_UNLESS(List.GetSize() == Index.size()); + return prevSize - SizeImpl(); +} + +void TKqpQueryCache::Clear() { + TGuard guard(Lock); + + List = TList(List.GetMaxSize()); + Index.clear(); + QueryIndex.clear(); + AstIndex.clear(); + ByteSize = 0; +} + +void TKqpQueryCache::InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) { + Y_ENSURE(compileResult->Query); + auto& query = *compileResult->Query; + + YQL_ENSURE(compileResult->PreparedQuery); + + auto queryIt = QueryIndex.emplace(query, compileResult->Uid); + if (!queryIt.second) { + EraseByUid(compileResult->Uid); + QueryIndex.erase(query); + } + Y_ENSURE(queryIt.second); +} + +TKqpQueryId TKqpQueryCache::GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) { + Y_ABORT_UNLESS(ast.Root); + std::shared_ptr> astPgParams; + if (query.QueryParameterTypes || ast.PgAutoParamValues) { + astPgParams = std::make_shared>(); + if (query.QueryParameterTypes) { + for (const auto& [name, param] : *query.QueryParameterTypes) { + astPgParams->insert({name, param}); + } + } + if (ast.PgAutoParamValues) { + const auto& params = dynamic_cast(ast.PgAutoParamValues.Get())->Values; + for (const auto& [name, param] : params) { + astPgParams->insert({name, param.Gettype()}); + } + } + } + return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings}; +} + +// + +bool HasTempTablesNameClashes( + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, + bool withSessionId) +{ + if (!compileResult) { + return false; + } + if (!compileResult->PreparedQuery) { + return false; + } + + return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); +} + +IActor* CreateKqpCompileService( + TKqpQueryCachePtr queryCache, + const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, std::shared_ptr queryReplayFactory, - std::optional federatedQuerySetup - ) + std::optional federatedQuerySetup) { - return new TKqpCompileService(tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters, + return new TKqpCompileService( + std::move(queryCache), + tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters, std::move(queryReplayFactory), federatedQuerySetup); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 8da1cdead76f..853ba5a35910 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -4,17 +4,149 @@ #include #include #include +#include + +#include namespace NKikimr { namespace NKqp { +class TKqpDbCounters; + enum class ECompileActorAction { COMPILE, PARSE, SPLIT, }; -IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, +// Cache is shared between query sessions, compile service and kqp proxy. +// Currently we don't use RW lock, because of cache promotions during lookup: +// in benchmark both versions (RW spinlock and adaptive W spinlock) show same +// performance. Might consider RW lock again in future. +class TKqpQueryCache: public TThrRefBase { +public: + TKqpQueryCache(size_t size, TDuration ttl) + : List(size) + , Ttl(ttl) {} + + bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution); + void AttachReplayMessage(const TString uid, TString replayMessage); + + TString ReplayMessageByUid(const TString uid, TDuration timeout); + + TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) { + TGuard guard(Lock); + return FindByUidImpl(uid, promote); + } + + void Replace(const TKqpCompileResult::TConstPtr& compileResult); + + TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { + TGuard guard(Lock); + return FindByQueryImpl(query, promote); + } + + // find by either uid or query + TKqpCompileResult::TConstPtr Find( + const TMaybe& uid, + TMaybe& query, + TKqpTempTablesState::TConstPtr tempTablesState, + bool promote, + const NACLib::TSID& userSid, + TIntrusivePtr counters, + TIntrusivePtr& dbCounters, + const TActorId& sender, + const TActorContext& ctx); + + TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote); + + bool EraseByUid(const TString& uid) { + TGuard guard(Lock); + return EraseByUidImpl(uid); + } + + size_t Size() const { + TGuard guard(Lock); + return SizeImpl(); + } + + ui64 Bytes() const { + TGuard guard(Lock); + return BytesImpl(); + } + + ui64 BytesImpl() const { + return ByteSize; + } + + size_t EraseExpiredQueries(); + + void Clear(); + +private: + TKqpCompileResult::TConstPtr FindByUidImpl(const TString& uid, bool promote); + TKqpCompileResult::TConstPtr FindByQueryImpl(const TKqpQueryId& query, bool promote); + bool EraseByUidImpl(const TString& uid); + + size_t SizeImpl() const { + return Index.size(); + } + + void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult); + + void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) { + Y_ENSURE(compileResult->Query); + Y_ENSURE(compileResult->GetAst()); + + AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid); + } + + TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast); + + void DecBytes(ui64 bytes) { + if (bytes > ByteSize) { + ByteSize = 0; + } else { + ByteSize -= bytes; + } + } + + void IncBytes(ui64 bytes) { + ByteSize += bytes; + } + +private: + struct TCacheEntry { + TKqpCompileResult::TConstPtr CompileResult; + TInstant ExpiredAt; + TString ReplayMessage = ""; + TInstant LastReplayTime = TInstant::Zero(); + }; + + using TList = TLRUList; + using TItem = TList::TItem; + +private: + TList List; + THashSet Index; + THashMap> QueryIndex; + THashMap> AstIndex; + ui64 ByteSize = 0; + TDuration Ttl; + + TAdaptiveLock Lock; +}; + +using TKqpQueryCachePtr = TIntrusivePtr; + +bool HasTempTablesNameClashes( + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, + bool withSessionId = false); + +IActor* CreateKqpCompileService( + TKqpQueryCachePtr queryCache, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, std::shared_ptr queryReplayFactory, diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index c46e8bed14f7..532a6bfb5e06 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -209,6 +209,7 @@ class TKqpProxyService : public TActorBootstrapped { , ModuleResolverState() , KqpProxySharedResources(std::move(kqpProxySharedResources)) , S3ActorsFactory(std::move(s3ActorsFactory)) + , QueryCache(new TKqpQueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec()))) {} void Bootstrap(const TActorContext &ctx) { @@ -276,7 +277,9 @@ class TKqpProxyService : public TActorBootstrapped { } // Create compile service - CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, QueryServiceConfig, + CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService( + QueryCache, + TableServiceConfig, QueryServiceConfig, KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); @@ -1496,7 +1499,7 @@ class TKqpProxyService : public TActorBootstrapped { auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings, + IActor* sessionActor = CreateKqpSessionActor(SelfId(), QueryCache, ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, KqpTempTablesAgentActor); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); @@ -1902,6 +1905,8 @@ class TKqpProxyService : public TActorBootstrapped { std::shared_ptr KqpProxySharedResources; std::shared_ptr S3ActorsFactory; + TKqpQueryCachePtr QueryCache; + bool ServerWorkerBalancerComplete = false; std::optional SelfDataCenterId; TIntrusivePtr RandomProvider; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index f1ff6799afe0..4c02f46b151a 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -1,5 +1,6 @@ #include "kqp_query_state.h" +#include #include namespace NKikimr::NKqp { @@ -136,27 +137,35 @@ std::unique_ptr TKqpQueryState::BuildN return std::make_unique(navigate.Release()); } - bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { + CompileStats = ev->Stats; + if (!SaveAndCheckCompileResult(ev->CompileResult)) { + return false; + } + Orbit = std::move(ev->Orbit); + if (ev->ReplayMessage) { + ReplayMessage = *ev->ReplayMessage; + } + + return true; +} + +bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult) { CompilationRunning = false; - CompileResult = ev->CompileResult; + CompileResult = compileResult; YQL_ENSURE(CompileResult); MaxReadType = CompileResult->MaxReadType; - Orbit = std::move(ev->Orbit); - if (CompileResult->Status != Ydb::StatusIds::SUCCESS) + if (CompileResult->Status != Ydb::StatusIds::SUCCESS) { return false; + } YQL_ENSURE(CompileResult->PreparedQuery); const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion(); YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, "Unexpected prepared query version: " << compiledVersion); - CompileStats = ev->Stats; PreparedQuery = CompileResult->PreparedQuery; - if (ev->ReplayMessage) { - ReplayMessage = *ev->ReplayMessage; - } if (!CommandTagName) { CommandTagName = CompileResult->CommandTagName; } @@ -185,6 +194,70 @@ bool TKqpQueryState::SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev) { return ev->Status == Ydb::StatusIds::SUCCESS; } +bool TKqpQueryState::TryGetFromCache( + TKqpQueryCache& cache, + const TGUCSettings::TPtr& gUCSettingsPtr, + TIntrusivePtr& counters, + const TActorId& sender) +{ + TMaybe query; + TMaybe uid; + + TKqpQuerySettings settings(GetType()); + settings.DocumentApiRestricted = IsDocumentApiRestricted_; + settings.IsInternalCall = IsInternalCall(); + settings.Syntax = GetSyntax(); + + TGUCSettings gUCSettings = gUCSettingsPtr ? *gUCSettingsPtr : TGUCSettings(); + bool keepInCache = false; + switch (GetAction()) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = GetQueryKeepInCache() && query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_PREPARE: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = query->IsSql(); + break; + + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + uid = GetPreparedQuery(); + keepInCache = GetQueryKeepInCache(); + break; + + case NKikimrKqp::QUERY_ACTION_EXPLAIN: + query = TKqpQueryId(Cluster, Database, UserRequestContext->DatabaseId, GetQuery(), settings, GetQueryParameterTypes(), gUCSettings); + keepInCache = false; + break; + + default: + YQL_ENSURE(false); + } + + CompileStats = {}; + + auto compileResult = cache.Find( + uid, + query, + TempTablesState, + keepInCache, + UserToken->GetUserSID(), + counters, + DbCounters, + sender, + TlsActivationContext->AsActorContext()); + + if (compileResult) { + if (SaveAndCheckCompileResult(compileResult)) { + CompileStats.FromCache = true; + return true; + } + } + + return false; +} + std::unique_ptr TKqpQueryState::BuildCompileRequest(std::shared_ptr> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) { TMaybe query; TMaybe uid; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index da4cc35472b3..467605df5ccb 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -26,6 +26,8 @@ namespace NKikimr::NKqp { +class TKqpQueryCache; + // basically it's a state that holds all the context // about the specific query execution. // it holds the unique pointer to the query request, which may include @@ -412,7 +414,7 @@ class TKqpQueryState : public TNonCopyable { const auto& phyQuery = PreparedQuery->GetPhysicalQuery(); auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx); - if (TxCtx->CanDeferEffects()) { + if (TxCtx->CanDeferEffects()) { // At current time sinks require separate tnx with commit. while (tx && tx->GetHasEffects() && !TxCtx->HasOlapTable) { QueryData->CreateKqpValueMap(tx); @@ -470,9 +472,17 @@ class TKqpQueryState : public TNonCopyable { // execution. std::unique_ptr BuildNavigateKeySet(); // same the context of the compiled query to the query state. + bool SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr compileResult); bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); bool SaveAndCheckParseResult(TEvKqp::TEvParseResponse&& ev); bool SaveAndCheckSplitResult(TEvKqp::TEvSplitResponse* ev); + + bool TryGetFromCache( + TKqpQueryCache& cache, + const TGUCSettings::TPtr& gUCSettingsPtr, + TIntrusivePtr& counters, + const TActorId& sender); + // build the compilation request. std::unique_ptr BuildCompileRequest(std::shared_ptr> cookie, const TGUCSettings::TPtr& gUCSettingsPtr); // TODO(gvit): get rid of code duplication in these requests, diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 718e9ffbbb05..dd5b505fba06 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -149,6 +149,7 @@ class TKqpSessionActor : public TActorBootstrapped { } TKqpSessionActor(const TActorId& owner, + TKqpQueryCachePtr queryCache, std::shared_ptr resourceManager, std::shared_ptr caFactory, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, @@ -159,6 +160,7 @@ class TKqpSessionActor : public TActorBootstrapped { const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TActorId& kqpTempTablesAgentActor) : Owner(owner) + , QueryCache(std::move(queryCache)) , SessionId(sessionId) , ResourceManager_(std::move(resourceManager)) , CaFactory_(std::move(caFactory)) @@ -515,12 +517,34 @@ class TKqpSessionActor : public TActorBootstrapped { void CompileQuery() { YQL_ENSURE(QueryState); QueryState->CompilationRunning = true; + + Become(&TKqpSessionActor::ExecuteState); + + // quick path + if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) { + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + + // even if we have successfully compilation result, it doesn't mean anything + // in terms of current schema version of the table if response of compilation is from the cache. + // because of that, we are forcing to run schema version check + if (QueryState->NeedCheckTableVersions()) { + auto ev = QueryState->BuildNavigateKeySet(); + Send(MakeSchemeCacheID(), ev.release()); + return; + } + + OnSuccessCompileRequest(); + return; + } + + // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but + // for extra sanity we make extra hop to the compile service, which might handle the issue better + auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings); LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId, QueryState->KqpSessionSpan.GetTraceId()); - Become(&TKqpSessionActor::ExecuteState); } void CompileSplittedQuery() { @@ -611,8 +635,28 @@ class TKqpSessionActor : public TActorBootstrapped { } void CompileStatement() { + // quick path + if (QueryState->TryGetFromCache(*QueryCache, GUCSettings, Counters, SelfId()) && !QueryState->CompileResult->NeedToSplit) { + LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status); + + // even if we have successfully compilation result, it doesn't mean anything + // in terms of current schema version of the table if response of compilation is from the cache. + // because of that, we are forcing to run schema version check + if (QueryState->NeedCheckTableVersions()) { + auto ev = QueryState->BuildNavigateKeySet(); + Send(MakeSchemeCacheID(), ev.release()); + return; + } + + OnSuccessCompileRequest(); + return; + } + + // TODO: in some cases we could reply right here (e.g. there is uid and query is missing), but + // for extra sanity we make extra hop to the compile service, which might handle the issue better + auto request = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings); - LOG_D("Sending CompileQuery request"); + LOG_D("Sending CompileQuery request (statement)"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), request.release(), 0, QueryState->QueryId, QueryState->KqpSessionSpan.GetTraceId()); @@ -2633,6 +2677,7 @@ class TKqpSessionActor : public TActorBootstrapped { private: TActorId Owner; + TKqpQueryCachePtr QueryCache; TString SessionId; std::shared_ptr ResourceManager_; @@ -2673,7 +2718,9 @@ class TKqpSessionActor : public TActorBootstrapped { } // namespace -IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr resourceManager, +IActor* CreateKqpSessionActor(const TActorId& owner, + TKqpQueryCachePtr queryCache, + std::shared_ptr resourceManager, std::shared_ptr caFactory, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional federatedQuerySetup, @@ -2682,7 +2729,9 @@ IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr queryCache, std::shared_ptr resourceManager_, std::shared_ptr caFactory_, const TString& sessionId,