diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index e954c6eeb9fd..59781a82bf5a 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -77,7 +77,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self break; } } - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier()); self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); for (auto& [_, portionInfo] : PortionsToRemove) { diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h index 6173d3147e87..dce9aca55685 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.h +++ b/ydb/core/tx/columnshard/engines/reader/common/result.h @@ -12,7 +12,7 @@ namespace NKikimr::NOlap::NReader { // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation class TPartialReadResult: public TNonCopyable { private: - YDB_READONLY_DEF(std::shared_ptr, ResourcesGuard); + YDB_READONLY_DEF(std::vector>, ResourceGuards); YDB_READONLY_DEF(std::shared_ptr, GroupGuard); NArrow::TShardedRecordBatch ResultBatch; @@ -54,11 +54,11 @@ class TPartialReadResult: public TNonCopyable { return ScanCursor; } - explicit TPartialReadResult(std::shared_ptr&& resourcesGuard, - std::shared_ptr&& gGuard, const NArrow::TShardedRecordBatch& batch, + explicit TPartialReadResult(const std::vector>& resourceGuards, + const std::shared_ptr& gGuard, const NArrow::TShardedRecordBatch& batch, const std::shared_ptr& scanCursor, const std::optional notFinishedIntervalIdx) - : ResourcesGuard(std::move(resourcesGuard)) - , GroupGuard(std::move(gGuard)) + : ResourceGuards(resourceGuards) + , GroupGuard(gGuard) , ResultBatch(batch) , ScanCursor(scanCursor) , NotFinishedIntervalIdx(notFinishedIntervalIdx) { @@ -68,7 +68,7 @@ class TPartialReadResult: public TNonCopyable { explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr& scanCursor, const std::optional notFinishedIntervalIdx) - : TPartialReadResult(nullptr, nullptr, batch, scanCursor, notFinishedIntervalIdx) { + : TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) { } }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 6298efcdd13d..2ff33e594783 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -27,7 +27,8 @@ void TScanHead::OnIntervalResult(std::shared_ptrsecond->GetGroupGuard(); } - AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared(std::move(allocationGuard), std::move(gGuard), *newBatch, + std::vector> guards = { std::move(allocationGuard) }; + AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared(guards, std::move(gGuard), *newBatch, std::make_shared(lastPK), callbackIdxSubscriver)).second); } else { AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index caa7f0330506..1ca0b8cae26a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -96,27 +96,27 @@ class TColumnsAccumulator { const bool sequential) { auto actualColumns = columns - AssemblerReadyColumns; AssemblerReadyColumns = AssemblerReadyColumns + columns; - if (!actualColumns.IsEmpty()) { - auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); - if (sequential) { - const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); - if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); - std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); - *actualSet = *actualSet - *cross; - } - if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); - } - } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); + if (actualColumns.IsEmpty()) { + return false; + } + auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); + if (sequential) { + const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); + if (notSequentialColumnIds.size()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); + std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); + script.AddStep(cross, purposeId); + *actualSet = *actualSet - *cross; } - return true; + if (!actualSet->IsEmpty()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); + script.AddStep(actualSet, purposeId); + } + } else { + script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); + script.AddStep(actualSet, purposeId); } - return false; + return true; } }; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index 00a0ae70a15e..8a36491860b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -16,9 +16,13 @@ void TScanHead::OnSourceReady(const std::shared_ptr& source, std::s while (FetchingSources.size()) { auto frontSource = *FetchingSources.begin(); if (!frontSource->HasStageResult()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())( + "source_idx", frontSource->GetSourceIdx()); break; } if (!frontSource->GetStageResult().HasResultChunk()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())( + "source_idx", frontSource->GetSourceIdx()); break; } auto table = frontSource->MutableStageResult().ExtractResultChunk(); @@ -28,24 +32,32 @@ void TScanHead::OnSourceReady(const std::shared_ptr& source, std::s sourceIdxToContinue = frontSource->GetSourceIdx(); } if (table && table->num_rows()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())( + "source_idx", frontSource->GetSourceIdx())("table", table->num_rows()); auto cursor = std::make_shared(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount); - reader.OnIntervalResult(std::make_shared(nullptr, nullptr, table, cursor, sourceIdxToContinue)); + reader.OnIntervalResult( + std::make_shared(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue)); } else if (sourceIdxToContinue) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())( + "source_idx", frontSource->GetSourceIdx()); ContinueSource(*sourceIdxToContinue); break; } if (!isFinished) { break; } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", frontSource->GetSourceId())( + "source_idx", frontSource->GetSourceIdx()); AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx())); if (Context->GetCommonContext()->GetReadMetadata()->Limit) { - FinishedSources.emplace(*FetchingSources.begin()); + frontSource->ClearResult(); + FinishedSources.emplace(frontSource); } FetchingSources.erase(FetchingSources.begin()); while (FetchingSources.size() && FinishedSources.size()) { - auto finishedSource = *FinishedSources.begin(); auto fetchingSource = *FetchingSources.begin(); + auto finishedSource = *FinishedSources.begin(); if (finishedSource->GetFinish() < fetchingSource->GetStart()) { FetchedCount += finishedSource->GetRecordsCount(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 9c578661f064..9cd55c438ed7 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -25,6 +25,8 @@ void IDataSource::StartProcessing(const std::shared_ptr& sourcePtr) AFL_VERIFY(FetchingPlan); AFL_VERIFY(!Context->IsAborted()); ProcessingStarted = true; + SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( + GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); TFetchingScriptCursor cursor(FetchingPlan, 0); @@ -237,9 +239,7 @@ TPortionDataSource::TPortionDataSource( portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(), portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount()) , Portion(portion) - , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) - , SourceGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId())) { + , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) { } } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 91c4533cb773..ea6d499321ae 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -60,7 +60,7 @@ class IDataSource: public ICursorEntity { YDB_READONLY(bool, HasDeletions, false); virtual NJson::TJsonValue DoDebugJson() const = 0; std::shared_ptr FetchingPlan; - std::vector> ResourceGuards; + YDB_READONLY_DEF(std::vector>, ResourceGuards); YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage); bool ProcessingStarted = false; bool IsStartedByCursor = false; @@ -74,7 +74,7 @@ class IDataSource: public ICursorEntity { } std::optional ScriptCursor; - + std::shared_ptr SourceGroupGuard; protected: std::optional IsSourceInMemoryFlag; @@ -95,11 +95,27 @@ class IDataSource: public ICursorEntity { virtual bool DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) = 0; public: - virtual ui64 GetMemoryGroupId() const = 0; bool GetIsStartedByCursor() const { return IsStartedByCursor; } + const std::shared_ptr& GetGroupGuard() const { + AFL_VERIFY(SourceGroupGuard); + return SourceGroupGuard; + } + + ui64 GetMemoryGroupId() const { + AFL_VERIFY(SourceGroupGuard); + return SourceGroupGuard->GetGroupId(); + } + + virtual void ClearResult() { + StageData.reset(); + StageResult.reset(); + ResourceGuards.clear(); + SourceGroupGuard = nullptr; + } + void SetIsStartedByCursor() { IsStartedByCursor = true; } @@ -323,7 +339,6 @@ class TPortionDataSource: public IDataSource { using TBase = IDataSource; const TPortionInfo::TConstPtr Portion; std::shared_ptr Schema; - const std::shared_ptr SourceGroupGuard; void NeedFetchColumns(const std::set& columnIds, TBlobsAction& blobsAction, THashMap& nullBlocks, const std::shared_ptr& filter); @@ -365,10 +380,6 @@ class TPortionDataSource: public IDataSource { virtual bool DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) override; public: - virtual ui64 GetMemoryGroupId() const override { - return SourceGroupGuard->GetGroupId(); - } - virtual ui64 PredictAccessorsSize() const override { return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/storage.cpp b/ydb/core/tx/columnshard/engines/storage/granule/storage.cpp index a8c9a092bc32..b8953946ba83 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/storage.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/storage.cpp @@ -63,7 +63,7 @@ std::optional TGranulesStorage::GetCom maxPriorityGranule = granulesSorted.front().GetGranule(); break; } - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId()); std::pop_heap(granulesSorted.begin(), granulesSorted.end()); granulesSorted.pop_back(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index a5ffa1c95df7..6ed2c11da611 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -47,7 +47,7 @@ std::shared_ptr TOptimizerPlanner::DoGetOptimizationTask( result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize()); } auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1); - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())( "level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString()); result->SetCheckPoints(std::move(positions)); for (auto&& i : result->GetSwitchedPortions()) {