From 52b3bb29e180da6e4c350146492207b542f9ddce Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 20 Dec 2024 18:46:18 +0300 Subject: [PATCH] scanners unification plain/simple for reuse code --- .../reader/common_reader/iterator/context.cpp | 34 +- .../reader/common_reader/iterator/context.h | 27 ++ .../common_reader/iterator/fetch_steps.cpp | 106 +++++++ .../common_reader/iterator/fetch_steps.h | 146 +++++++++ .../common_reader/iterator/fetching.cpp | 166 ++++++++++ .../reader/common_reader/iterator/fetching.h | 205 ++++++++++++ .../reader/common_reader/iterator/source.cpp | 5 + .../reader/common_reader/iterator/source.h | 208 ++++++++++++ .../reader/common_reader/iterator/ya.make | 3 + .../plain_reader/iterator/constructor.cpp | 2 +- .../plain_reader/iterator/constructor.h | 9 +- .../reader/plain_reader/iterator/context.cpp | 86 +---- .../reader/plain_reader/iterator/context.h | 13 +- .../reader/plain_reader/iterator/fetching.cpp | 222 +------------ .../reader/plain_reader/iterator/fetching.h | 297 +----------------- .../plain_reader/iterator/plain_read_data.cpp | 5 +- .../plain_reader/iterator/plain_read_data.h | 7 +- .../reader/plain_reader/iterator/source.cpp | 41 ++- .../reader/plain_reader/iterator/source.h | 158 ++-------- .../simple_reader/iterator/constructor.cpp | 3 +- .../simple_reader/iterator/constructor.h | 9 +- .../reader/simple_reader/iterator/context.cpp | 77 +---- .../reader/simple_reader/iterator/context.h | 10 +- .../simple_reader/iterator/fetching.cpp | 224 +------------ .../reader/simple_reader/iterator/fetching.h | 280 +---------------- .../simple_reader/iterator/plain_read_data.h | 2 +- .../reader/simple_reader/iterator/source.cpp | 41 ++- .../reader/simple_reader/iterator/source.h | 158 ++-------- 28 files changed, 1104 insertions(+), 1440 deletions(-) create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp create mode 100644 ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp index 5ea51192550b..a33d9b2d5701 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp @@ -9,15 +9,15 @@ namespace NKikimr::NOlap::NReader::NCommon { TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) : CommonContext(commonContext) { - auto readMetadata = CommonContext->GetReadMetadataPtrVerifiedAs(); - Y_ABORT_UNLESS(readMetadata->SelectInfo); + ReadMetadata = CommonContext->GetReadMetadataPtrVerifiedAs(); + Y_ABORT_UNLESS(ReadMetadata->SelectInfo); double kffAccessors = 0.01; double kffFilter = 0.45; double kffFetching = 0.45; double kffMerge = 0.10; TString stagePrefix; - if (readMetadata->GetEarlyFilterColumnIds().size()) { + if (ReadMetadata->GetEarlyFilterColumnIds().size()) { stagePrefix = "EF"; kffFilter = 0.7; kffFetching = 0.15; @@ -41,15 +41,15 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) }; ProcessMemoryGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); - ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( - CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages); + ProcessScopeGuard = + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId()); - auto readSchema = readMetadata->GetResultSchema(); + auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); - IndexChecker = readMetadata->GetProgram().GetIndexChecker(); + IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); { - auto predicateColumns = readMetadata->GetPKRangesFilter().GetColumnIds(readMetadata->GetIndexInfo()); + auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo()); if (predicateColumns.size()) { PredicateColumns = std::make_shared(predicateColumns, readSchema); } else { @@ -58,26 +58,26 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co } { std::set columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX }; - DeletionColumns = std::make_shared(columnIds, readMetadata->GetResultSchema()); + DeletionColumns = std::make_shared(columnIds, ReadMetadata->GetResultSchema()); } - if (!!readMetadata->GetRequestShardingInfo()) { + if (!!ReadMetadata->GetRequestShardingInfo()) { auto shardingColumnIds = - readMetadata->GetIndexInfo().GetColumnIdsVerified(readMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames()); - ShardingColumns = std::make_shared(shardingColumnIds, readMetadata->GetResultSchema()); + ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames()); + ShardingColumns = std::make_shared(shardingColumnIds, ReadMetadata->GetResultSchema()); } else { ShardingColumns = std::make_shared(); } { - auto efColumns = readMetadata->GetEarlyFilterColumnIds(); + auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); if (efColumns.size()) { EFColumns = std::make_shared(efColumns, readSchema); } else { EFColumns = std::make_shared(); } } - if (readMetadata->HasProcessingColumnIds()) { - FFColumns = std::make_shared(readMetadata->GetProcessingColumnIds(), readSchema); + if (ReadMetadata->HasProcessingColumnIds()) { + FFColumns = std::make_shared(ReadMetadata->GetProcessingColumnIds(), readSchema); if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { FFColumns = std::make_shared(*EFColumns + *SpecColumns); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); @@ -95,7 +95,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co } AllUsageColumns = std::make_shared(*FFColumns + *PredicateColumns); - PKColumns = std::make_shared(readMetadata->GetPKColumnIds(), readSchema); + PKColumns = std::make_shared(ReadMetadata->GetPKColumnIds(), readSchema); MergeColumns = std::make_shared(*PKColumns + *SpecColumns); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h index fa24798d4e66..9f2ae6d4bcba 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -2,11 +2,15 @@ #include "columns_set.h" #include +#include #include #include namespace NKikimr::NOlap::NReader::NCommon { +class TFetchingScript; +class IDataSource; + class TSpecialReadContext { private: YDB_READONLY_DEF(std::shared_ptr, CommonContext); @@ -28,13 +32,36 @@ class TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, FilterStageMemory); YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); + TReadMetadata::TConstPtr ReadMetadata; TAtomic AbortFlag = 0; + virtual std::shared_ptr DoGetColumnsFetchingPlan(const std::shared_ptr& source) = 0; + protected: NIndexes::TIndexCheckerContainer IndexChecker; std::shared_ptr EmptyColumns = std::make_shared(); public: + template + std::shared_ptr GetColumnsFetchingPlan(const std::shared_ptr& source) { + return GetColumnsFetchingPlan(std::static_pointer_cast(source)); + } + + std::shared_ptr GetColumnsFetchingPlan(const std::shared_ptr& source) { + return DoGetColumnsFetchingPlan(source); + } + + const TReadMetadata::TConstPtr& GetReadMetadata() const { + return ReadMetadata; + } + + template + std::shared_ptr GetReadMetadataVerifiedAs() const { + auto result = std::dynamic_pointer_cast(ReadMetadata); + AFL_VERIFY(!!result); + return result; + } + ui64 GetProcessMemoryControlId() const { AFL_VERIFY(ProcessMemoryGuard); return ProcessMemoryGuard->GetProcessId(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp new file mode 100644 index 000000000000..0f70d6ef0476 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp @@ -0,0 +1,106 @@ +#include "fetch_steps.h" +#include "source.h" + +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +TConclusion TColumnBlobsFetchingStep::DoExecuteInplace( + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + return !source->StartFetchingColumns(source, step, Columns); +} + +ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr& source) const { + return source->GetColumnBlobBytes(Columns.GetColumnIds()); +} + +TConclusion TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + source->AssembleColumns(Columns); + return true; +} + +ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { + return source->GetColumnRawBytes(Columns->GetColumnIds()); +} + +TConclusion TOptionalAssemblerStep::DoExecuteInplace( + const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + source->AssembleColumns(Columns, !source->IsSourceInMemory()); + return true; +} + +ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { + return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential); +} + +bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr&& guard, + const std::shared_ptr& /*allocation*/) { + auto data = Source.lock(); + if (!data || data->GetContext()->IsAborted()) { + guard->Release(); + return false; + } + if (StageIndex == EStageFeaturesIndexes::Accessors) { + data->MutableStageData().SetAccessorsGuard(std::move(guard)); + } else { + data->RegisterAllocationGuard(std::move(guard)); + } + Step.Next(); + auto task = std::make_shared(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId()); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); + return true; +} + +TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( + const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex) + : TBase(mem) + , Source(source) + , Step(step) + , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) + , StageIndex(stageIndex) { +} + +void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { + auto sourcePtr = Source.lock(); + if (sourcePtr) { + sourcePtr->GetContext()->GetCommonContext()->AbortWithError( + "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); + } +} + +TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + ui64 size = PredefinedSize.value_or(0); + for (auto&& i : Packs) { + ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType()); + if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) { + const ui32 filtered = + source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust()); + if (filtered < source->GetRecordsCount()) { + sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount(); + } + } + size += sizeLocal; + } + + auto allocation = std::make_shared(source, size, step, StageIndex); + NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), + source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex); + return false; +} + +ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr& /*source*/) const { + return 0; +} + +NKikimr::TConclusion TBuildStageResultStep::DoExecuteInplace( + const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + source->BuildStageResult(source); + return true; +} + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h new file mode 100644 index 000000000000..fa6f44309f18 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -0,0 +1,146 @@ +#pragma once +#include "fetching.h" + +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +class TAllocateMemoryStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + class TColumnsPack { + private: + YDB_READONLY_DEF(TColumnsSetIds, Columns); + YDB_READONLY(EMemType, MemType, EMemType::Blob); + + public: + TColumnsPack(const TColumnsSetIds& columns, const EMemType memType) + : Columns(columns) + , MemType(memType) { + } + }; + std::vector Packs; + THashMap> Control; + const EStageFeaturesIndexes StageIndex; + const std::optional PredefinedSize; + +protected: + class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { + private: + using TBase = NGroupedMemoryManager::IAllocation; + std::weak_ptr Source; + TFetchingScriptCursor Step; + NColumnShard::TCounterGuard TasksGuard; + const EStageFeaturesIndexes StageIndex; + virtual bool DoOnAllocated(std::shared_ptr&& guard, + const std::shared_ptr& allocation) override; + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; + + public: + TFetchingStepAllocation(const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, + const EStageFeaturesIndexes stageIndex); + }; + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; + virtual TString DoDebugString() const override { + return TStringBuilder() << "stage=" << StageIndex << ";"; + } + +public: + void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) { + if (!ids.GetColumnsCount()) { + return; + } + for (auto&& i : ids.GetColumnIds()) { + AFL_VERIFY(Control[i].emplace(memType).second); + } + Packs.emplace_back(ids, memType); + } + EStageFeaturesIndexes GetStage() const { + return StageIndex; + } + + TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex) + : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) + , StageIndex(stageIndex) { + AddAllocation(columns, memType); + } + + TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex) + : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) + , StageIndex(stageIndex) + , PredefinedSize(memSize) { + } +}; + +class TAssemblerStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + YDB_READONLY_DEF(std::shared_ptr, Columns); + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } + +public: + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; + TAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) + : TBase("ASSEMBLER" + (specName ? "::" + specName : "")) + , Columns(columns) { + AFL_VERIFY(Columns); + AFL_VERIFY(Columns->GetColumnsCount()); + } +}; + +class TBuildStageResultStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + +public: + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const override; + TBuildStageResultStep() + : TBase("BUILD_STAGE_RESULT") { + } +}; + +class TOptionalAssemblerStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + YDB_READONLY_DEF(std::shared_ptr, Columns); + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } + +public: + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; + + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; + TOptionalAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) + : TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : "")) + , Columns(columns) { + AFL_VERIFY(Columns); + AFL_VERIFY(Columns->GetColumnsCount()); + } +}; + +class TColumnBlobsFetchingStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + TColumnsSetIds Columns; + +protected: + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns.DebugString() << ";"; + } + +public: + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; + TColumnBlobsFetchingStep(const TColumnsSetIds& columns) + : TBase("FETCHING_COLUMNS") + , Columns(columns) { + AFL_VERIFY(Columns.GetColumnsCount()); + } +}; + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp new file mode 100644 index 000000000000..1d418211244d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -0,0 +1,166 @@ +#include "fetch_steps.h" +#include "fetching.h" +#include "source.h" + +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +bool TStepAction::DoApply(IDataReader& owner) const { + if (FinishedFlag) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); + Source->OnSourceFetchingFinishedSafe(owner, Source); + } + return true; +} + +TConclusionStatus TStepAction::DoExecuteImpl() { + if (Source->GetContext()->IsAborted()) { + return TConclusionStatus::Success(); + } + auto executeResult = Cursor.Execute(Source); + if (!executeResult) { + return executeResult; + } + if (*executeResult) { + FinishedFlag = true; + } + return TConclusionStatus::Success(); +} + +TStepAction::TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) + : TBase(ownerActorId) + , Source(source) + , Cursor(std::move(cursor)) + , CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) { +} + +TConclusion TFetchingScriptCursor::Execute(const std::shared_ptr& source) { + AFL_VERIFY(source); + NMiniKQL::TThrowingBindTerminator bind; + Script->OnExecute(); + AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); + while (!Script->IsFinished(CurrentStepIdx)) { + if (source->HasStageData() && source->GetStageData().IsEmpty()) { + source->OnEmptyStageData(source); + break; + } + auto step = Script->GetStep(CurrentStepIdx); + TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(), + IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx); + AFL_VERIFY(!CurrentStartInstant); + CurrentStartInstant = TMonotonic::Now(); + AFL_VERIFY(!CurrentStartDataSize); + CurrentStartDataSize = step->GetProcessingDataSize(source); + const TConclusion resultStep = step->ExecuteInplace(source, *this); + if (!resultStep) { + return resultStep; + } + if (!*resultStep) { + return false; + } + FlushDuration(); + ++CurrentStepIdx; + } + return true; +} + +TString TFetchingScript::DebugString() const { + TStringBuilder sb; + TStringBuilder sbBranch; + for (auto&& i : Steps) { + if (i->GetSumDuration() > TDuration::MilliSeconds(10)) { + sbBranch << "{" << i->DebugString() << "};"; + } + } + if (!sbBranch) { + return ""; + } + sb << "{branch:" << BranchName << ";"; + if (FinishInstant && StartInstant) { + sb << "duration:" << *FinishInstant - *StartInstant << ";"; + } + + sb << "steps_10Ms:[" << sbBranch << "]}"; + return sb; +} + +TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) { +} + +void TFetchingScript::Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { + if (Steps.size() == 0) { + AddStep(entityIds, mType, stage); + } else { + std::optional addIndex; + for (i32 i = Steps.size() - 1; i >= 0; --i) { + if (auto allocation = std::dynamic_pointer_cast(Steps[i])) { + if (allocation->GetStage() == stage) { + allocation->AddAllocation(entityIds, mType); + return; + } else { + addIndex = i + 1; + } + break; + } else if (std::dynamic_pointer_cast(Steps[i])) { + continue; + } else if (std::dynamic_pointer_cast(Steps[i])) { + continue; + } else { + addIndex = i + 1; + break; + } + } + AFL_VERIFY(addIndex); + InsertStep(*addIndex, entityIds, mType, stage); + } +} + +TString IFetchingStep::DebugString() const { + TStringBuilder sb; + sb << "name=" << Name << ";duration=" << SumDuration << ";" + << "size=" << 1e-9 * SumSize << ";details={" << DoDebugString() << "};"; + return sb; +} + +bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { + auto actualColumns = GetNotFetchedAlready(columns); + FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns; + if (!actualColumns.IsEmpty()) { + script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob); + script.AddStep(std::make_shared(actualColumns)); + return true; + } + return false; +} + +bool TColumnsAccumulator::AddAssembleStep( + TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { + auto actualColumns = columns - AssemblerReadyColumns; + AssemblerReadyColumns = AssemblerReadyColumns + columns; + if (actualColumns.IsEmpty()) { + return false; + } + auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); + if (sequential) { + const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); + if (notSequentialColumnIds.size()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); + std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); + script.AddStep(cross, purposeId); + *actualSet = *actualSet - *cross; + } + if (!actualSet->IsEmpty()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); + script.AddStep(actualSet, purposeId); + } + } else { + script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); + script.AddStep(actualSet, purposeId); + } + return true; +} + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h new file mode 100644 index 000000000000..565c51d255b9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -0,0 +1,205 @@ +#pragma once +#include "columns_set.h" + +#include +#include + +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +class IDataSource; +class TSpecialReadContext; +class TFetchingScriptCursor; + +class IFetchingStep { +private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY(TDuration, SumDuration, TDuration::Zero()); + YDB_READONLY(ui64, SumSize, 0); + +protected: + virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const = 0; + virtual TString DoDebugString() const { + return ""; + } + +public: + void AddDuration(const TDuration d) { + SumDuration += d; + } + void AddDataSize(const ui64 size) { + SumSize += size; + } + + virtual ~IFetchingStep() = default; + + [[nodiscard]] TConclusion ExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + return DoExecuteInplace(source, step); + } + + virtual ui64 GetProcessingDataSize(const std::shared_ptr& /*source*/) const { + return 0; + } + + IFetchingStep(const TString& name) + : Name(name) { + } + + TString DebugString() const; +}; + +class TFetchingScript { +private: + YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); + std::vector> Steps; + std::optional StartInstant; + std::optional FinishInstant; + +public: + TFetchingScript(const TSpecialReadContext& context); + + void Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + + void AddStepDataSize(const ui32 index, const ui64 size) { + GetStep(index)->AddDataSize(size); + } + + void AddStepDuration(const ui32 index, const TDuration d) { + FinishInstant = TMonotonic::Now(); + GetStep(index)->AddDuration(d); + } + + void OnExecute() { + if (!StartInstant) { + StartInstant = TMonotonic::Now(); + } + } + + TString DebugString() const; + + const std::shared_ptr& GetStep(const ui32 index) const { + AFL_VERIFY(index < Steps.size()); + return Steps[index]; + } + + template + std::shared_ptr AddStep(Args... args) { + auto result = std::make_shared(args...); + Steps.emplace_back(result); + return result; + } + + template + std::shared_ptr InsertStep(const ui32 index, Args... args) { + AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); + auto result = std::make_shared(args...); + Steps.insert(Steps.begin() + index, result); + return result; + } + + void AddStep(const std::shared_ptr& step) { + AFL_VERIFY(step); + Steps.emplace_back(step); + } + + bool IsFinished(const ui32 currentStepIdx) const { + AFL_VERIFY(currentStepIdx <= Steps.size()); + return currentStepIdx == Steps.size(); + } + + ui32 Execute(const ui32 startStepIdx, const std::shared_ptr& source) const; +}; + +class TColumnsAccumulator { +private: + TColumnsSetIds FetchingReadyColumns; + TColumnsSetIds AssemblerReadyColumns; + ISnapshotSchema::TPtr FullSchema; + std::shared_ptr GuaranteeNotOptional; + +public: + TColumnsAccumulator(const std::shared_ptr& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema) + : FullSchema(fullSchema) + , GuaranteeNotOptional(guaranteeNotOptional) { + } + + TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const { + return columns - FetchingReadyColumns; + } + + bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); + bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, + const bool sequential); +}; + +class TFetchingScriptCursor { +private: + std::optional CurrentStartInstant; + std::optional CurrentStartDataSize; + ui32 CurrentStepIdx = 0; + std::shared_ptr Script; + void FlushDuration() { + AFL_VERIFY(CurrentStartInstant); + AFL_VERIFY(CurrentStartDataSize); + Script->AddStepDuration(CurrentStepIdx, TMonotonic::Now() - *CurrentStartInstant); + Script->AddStepDataSize(CurrentStepIdx, *CurrentStartDataSize); + CurrentStartInstant.reset(); + CurrentStartDataSize.reset(); + } + +public: + TFetchingScriptCursor(const std::shared_ptr& script, const ui32 index) + : CurrentStepIdx(index) + , Script(script) { + AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); + } + + const TString& GetName() const { + return Script->GetStep(CurrentStepIdx)->GetName(); + } + + TString DebugString() const { + return Script->GetStep(CurrentStepIdx)->DebugString(); + } + + bool Next() { + FlushDuration(); + return !Script->IsFinished(++CurrentStepIdx); + } + + TConclusion Execute(const std::shared_ptr& source); +}; + +class TStepAction: public IDataTasksProcessor::ITask { +private: + using TBase = IDataTasksProcessor::ITask; + std::shared_ptr Source; + TFetchingScriptCursor Cursor; + bool FinishedFlag = false; + const NColumnShard::TCounterGuard CountersGuard; + +protected: + virtual bool DoApply(IDataReader& owner) const override; + virtual TConclusionStatus DoExecuteImpl() override; + +public: + virtual TString GetTaskClassIdentifier() const override { + return "STEP_ACTION"; + } + + template + TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) + : TStepAction(std::static_pointer_cast(source), std::move(cursor), ownerActorId) + { + + } + TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId); +}; + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp new file mode 100644 index 000000000000..112b8a812d07 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp @@ -0,0 +1,5 @@ +#include "source.h" + +namespace NKikimr::NOlap::NReader::NCommon { + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h new file mode 100644 index 000000000000..473b1ecc5b51 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h @@ -0,0 +1,208 @@ +#pragma once +#include "context.h" +#include "fetched_data.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap { +class IDataReader; +} + +namespace NKikimr::NOlap::NReader::NCommon { + +class TFetchingScriptCursor; + +class IDataSource: public ICursorEntity { +private: + YDB_READONLY(ui64, SourceId, 0); + YDB_READONLY(ui32, SourceIdx, 0); + YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero()); + YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); + YDB_READONLY_DEF(std::shared_ptr, Context); + YDB_READONLY(ui32, RecordsCount, 0); + YDB_READONLY_DEF(std::optional, ShardingVersionOptional); + YDB_READONLY(bool, HasDeletions, false); + std::optional MemoryGroupId; + + virtual bool DoAddTxConflict() = 0; + + virtual ui64 DoGetEntityId() const override { + return SourceId; + } + + virtual ui64 DoGetEntityRecordsCount() const override { + return RecordsCount; + } + + std::optional IsSourceInMemoryFlag; + TAtomic SourceFinishedSafeFlag = 0; + TAtomic StageResultBuiltFlag = 0; + virtual void DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr& sourcePtr) = 0; + virtual void DoBuildStageResult(const std::shared_ptr& sourcePtr) = 0; + virtual void DoOnEmptyStageData(const std::shared_ptr& sourcePtr) = 0; + + virtual bool DoStartFetchingColumns( + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) = 0; + virtual void DoAssembleColumns(const std::shared_ptr& columns, const bool sequential) = 0; + +protected: + std::vector> ResourceGuards; + std::unique_ptr StageData; + std::unique_ptr StageResult; + +public: + IDataSource(const ui64 sourceId, const ui32 sourceIdx, const std::shared_ptr& context, + const TSnapshot& recordSnapshotMin, const TSnapshot& recordSnapshotMax, const ui32 recordsCount, + const std::optional shardingVersion, const bool hasDeletions) + : SourceId(sourceId) + , SourceIdx(sourceIdx) + , RecordSnapshotMin(recordSnapshotMin) + , RecordSnapshotMax(recordSnapshotMax) + , Context(context) + , RecordsCount(recordsCount) + , ShardingVersionOptional(shardingVersion) + , HasDeletions(hasDeletions) { + } + + virtual ~IDataSource() = default; + + const std::vector>& GetResourceGuards() const { + return ResourceGuards; + } + + virtual THashMap DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobsOriginal) const = 0; + + bool IsSourceInMemory() const { + AFL_VERIFY(IsSourceInMemoryFlag); + return *IsSourceInMemoryFlag; + } + void SetSourceInMemory(const bool value) { + AFL_VERIFY(!IsSourceInMemoryFlag); + IsSourceInMemoryFlag = value; + if (!value) { + AFL_VERIFY(StageData); + StageData->SetUseFilter(value); + } + } + + void SetMemoryGroupId(const ui64 groupId) { + AFL_VERIFY(!MemoryGroupId); + MemoryGroupId = groupId; + } + + ui64 GetMemoryGroupId() const { + AFL_VERIFY(!!MemoryGroupId); + return *MemoryGroupId; + } + + virtual ui64 GetColumnsVolume(const std::set& columnIds, const EMemType type) const = 0; + + ui64 GetResourceGuardsMemory() const { + ui64 result = 0; + for (auto&& i : ResourceGuards) { + result += i->GetMemory(); + } + return result; + } + void RegisterAllocationGuard(const std::shared_ptr& guard) { + ResourceGuards.emplace_back(guard); + } + virtual ui64 GetColumnRawBytes(const std::set& columnIds) const = 0; + virtual ui64 GetColumnBlobBytes(const std::set& columnsIds) const = 0; + + void AssembleColumns(const std::shared_ptr& columns, const bool sequential = false) { + if (columns->IsEmpty()) { + return; + } + DoAssembleColumns(columns, sequential); + } + + bool StartFetchingColumns(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { + return DoStartFetchingColumns(sourcePtr, step, columns); + } + + void OnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr& sourcePtr) { + AFL_VERIFY(AtomicCas(&SourceFinishedSafeFlag, 1, 0)); + AFL_VERIFY(sourcePtr); + DoOnSourceFetchingFinishedSafe(owner, sourcePtr); + } + + void OnEmptyStageData(const std::shared_ptr& sourcePtr) { + AFL_VERIFY(AtomicCas(&StageResultBuiltFlag, 1, 0)); + AFL_VERIFY(sourcePtr); + AFL_VERIFY(!StageResult); + AFL_VERIFY(StageData); + DoOnEmptyStageData(sourcePtr); + AFL_VERIFY(StageResult); + AFL_VERIFY(!StageData); + } + + template + void BuildStageResult(const std::shared_ptr& sourcePtr) { + BuildStageResult(std::static_pointer_cast(sourcePtr)); + } + + void BuildStageResult(const std::shared_ptr& sourcePtr) { + TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); + AFL_VERIFY(AtomicCas(&StageResultBuiltFlag, 1, 0)); + AFL_VERIFY(sourcePtr); + AFL_VERIFY(!StageResult); + AFL_VERIFY(StageData); + DoBuildStageResult(sourcePtr); + AFL_VERIFY(StageResult); + AFL_VERIFY(!StageData); + } + + bool AddTxConflict() { + if (!Context->GetCommonContext()->HasLock()) { + return false; + } + if (DoAddTxConflict()) { + StageData->Clear(); + return true; + } + return false; + } + + bool HasStageData() const { + return !!StageData; + } + + const TFetchedData& GetStageData() const { + AFL_VERIFY(StageData); + return *StageData; + } + + TFetchedData& MutableStageData() { + AFL_VERIFY(StageData); + return *StageData; + } + + bool HasStageResult() const { + return !!StageResult; + } + + const TFetchedResult& GetStageResult() const { + AFL_VERIFY(!!StageResult); + return *StageResult; + } + + TFetchedResult& MutableStageResult() { + AFL_VERIFY(!!StageResult); + return *StageResult; + } +}; + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 0633fc216232..5ff7dcd23ab4 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -5,6 +5,9 @@ SRCS( columns_set.cpp iterator.cpp context.cpp + source.cpp + fetching.cpp + fetch_steps.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp index 654315a1ab0b..00bc62547e27 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NReader::NPlain { void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData())); AFL_VERIFY(Step.Next()); - auto task = std::make_shared(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h index 79e3e26c4e3c..e8dfc3f15e4f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h @@ -11,15 +11,16 @@ namespace NKikimr::NOlap::NReader::NPlain { class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter { private: using TBase = NBlobOperations::NRead::ITask; - const std::shared_ptr Source; + const std::shared_ptr Source; TFetchingScriptCursor Step; - const std::shared_ptr Context; + const std::shared_ptr Context; virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; public: - TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, - const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId) + TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, + const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, + const TString& externalTaskId) : TBase(readActions, taskCustomer, externalTaskId) , Source(sourcePtr) , Step(step) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 1ba2222004e3..bd7816f177f0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -1,13 +1,15 @@ #include "context.h" #include "source.h" +#include +#include #include namespace NKikimr::NOlap::NReader::NPlain { std::unique_ptr TSpecialReadContext::BuildMerger() const { - return std::make_unique( - ReadMetadata->GetReplaceKey(), GetProgramInputColumns()->GetSchema(), GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); + return std::make_unique(GetReadMetadata()->GetReplaceKey(), GetProgramInputColumns()->GetSchema(), + GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); } ui64 TSpecialReadContext::GetMemoryForSources(const THashMap>& sources) { @@ -22,12 +24,13 @@ ui64 TSpecialReadContext::GetMemoryForSources(const THashMap TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source) { +std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(const std::shared_ptr& sourceExt) { + auto source = std::static_pointer_cast(sourceExt); if (source->NeedAccessorsFetching()) { if (!AskAccumulatorsScript) { AskAccumulatorsScript = std::make_shared(*this); if (ui64 size = source->PredictAccessorsMemory()) { - AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); + AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); } AskAccumulatorsScript->AddStep(); AskAccumulatorsScript->AddStep(*GetFFColumns()); @@ -46,12 +49,12 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con }(); const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false); const bool isWholeExclusiveSource = source->GetExclusiveIntervalOnly() && source->IsSourceInMemory(); - const bool needSnapshots = ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() || !isWholeExclusiveSource; + const bool needSnapshots = GetReadMetadata()->GetRequestSnapshot() < source->GetRecordSnapshotMax() || !isWholeExclusiveSource; const bool hasDeletions = source->GetHasDeletions(); bool needShardingFilter = false; - if (!!ReadMetadata->GetRequestShardingInfo()) { + if (!!GetReadMetadata()->GetRequestShardingInfo()) { auto ver = source->GetShardingVersionOptional(); - if (!ver || *ver < ReadMetadata->GetRequestShardingInfo()->GetSnapshotVersion()) { + if (!ver || *ver < GetReadMetadata()->GetRequestShardingInfo()->GetSnapshotVersion()) { needShardingFilter = true; } } @@ -75,73 +78,18 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con } else { std::shared_ptr result = std::make_shared(*this); result->SetBranchName("FAKE"); - result->AddStep(std::make_shared(source->GetRecordsCount())); + result->AddStep(); return result; } } } -class TColumnsAccumulator { -private: - TColumnsSetIds FetchingReadyColumns; - TColumnsSetIds AssemblerReadyColumns; - ISnapshotSchema::TPtr FullSchema; - std::shared_ptr GuaranteeNotOptional; - -public: - TColumnsAccumulator(const std::shared_ptr& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema) - : FullSchema(fullSchema) - , GuaranteeNotOptional(guaranteeNotOptional) { - } - - TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const { - return columns - FetchingReadyColumns; - } - - bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { - auto actualColumns = GetNotFetchedAlready(columns); - FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns; - if (!actualColumns.IsEmpty()) { - script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob); - script.AddStep(std::make_shared(actualColumns)); - return true; - } - return false; - } - bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, - const bool sequential) { - auto actualColumns = columns - AssemblerReadyColumns; - AssemblerReadyColumns = AssemblerReadyColumns + columns; - if (actualColumns.IsEmpty()) { - return false; - } - auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); - if (sequential) { - const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); - if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); - std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); - *actualSet = *actualSet - *cross; - } - if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); - } - } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); - } - return true; - } -}; - std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - TColumnsAccumulator acc(GetMergeColumns(), ReadMetadata->GetResultSchema()); + NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); if (!!IndexChecker && useIndexes && exclusiveSource) { result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); result->AddStep(std::make_shared(IndexChecker)); @@ -220,11 +168,11 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } - for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + for (auto&& i : GetReadMetadata()->GetProgram().GetSteps()) { if (i->GetFilterOriginalColumnIds().empty()) { break; } - TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), ReadMetadata->GetResultSchema()); + TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), GetReadMetadata()->GetResultSchema()); acc.AddAssembleStep(*result, stepColumnIds, "EF", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared(i)); if (!i->IsFilterOnly()) { @@ -257,11 +205,11 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c if (partialUsageByPredicate) { result->AddStep(std::make_shared()); } - for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + for (auto&& i : GetReadMetadata()->GetProgram().GetSteps()) { if (i->GetFilterOriginalColumnIds().empty()) { break; } - TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), ReadMetadata->GetResultSchema()); + TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), GetReadMetadata()->GetResultSchema()); acc.AddAssembleStep(*result, stepColumnIds, "EF", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared(i)); if (!i->IsFilterOnly()) { @@ -271,12 +219,12 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } + result->AddStep(); return result; } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) : TBase(commonContext) { - ReadMetadata = GetCommonContext()->GetReadMetadataPtrVerifiedAs(); } TString TSpecialReadContext::ProfileDebugString() const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 4c34ef572ef4..4b2f98497254 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -15,6 +16,7 @@ using TColumnsSet = NCommon::TColumnsSet; using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; +using TFetchingScript = NCommon::TFetchingScript; class TSpecialReadContext: public NCommon::TSpecialReadContext { private: @@ -23,7 +25,6 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, FilterStageMemory); YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); - TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; @@ -31,6 +32,8 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext { CacheFetchingScripts; std::shared_ptr AskAccumulatorsScript; + virtual std::shared_ptr DoGetColumnsFetchingPlan(const std::shared_ptr& source) override; + public: const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(); const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(); @@ -41,17 +44,11 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext { return MergeStageMemory->GetFullMemory() + FilterStageMemory->GetFullMemory() + FetchingStageMemory->GetFullMemory(); } - const TReadMetadata::TConstPtr& GetReadMetadata() const { - return ReadMetadata; - } - std::unique_ptr BuildMerger() const; - TString ProfileDebugString() const; + virtual TString ProfileDebugString() const override; TSpecialReadContext(const std::shared_ptr& commonContext); - - std::shared_ptr GetColumnsFetchingPlan(const std::shared_ptr& source); }; } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index 0d1ceaf9c9bd..96f26409d9cc 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -10,69 +10,11 @@ namespace NKikimr::NOlap::NReader::NPlain { -bool TStepAction::DoApply(IDataReader& /*owner*/) const { - if (FinishedFlag) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); - Source->SetIsReady(); - } - return true; -} - -TConclusionStatus TStepAction::DoExecuteImpl() { - if (Source->GetContext()->IsAborted()) { - return TConclusionStatus::Success(); - } - auto executeResult = Cursor.Execute(Source); - if (!executeResult) { - return executeResult; - } - if (*executeResult) { - Source->Finalize(); - FinishedFlag = true; - } - return TConclusionStatus::Success(); -} - -TStepAction::TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) - : TBase(ownerActorId) - , Source(source) - , Cursor(std::move(cursor)) - , CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) { -} - -TConclusion TColumnBlobsFetchingStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - return !source->StartFetchingColumns(source, step, Columns); -} - -ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnBlobBytes(Columns.GetColumnIds()); -} - TConclusion TIndexBlobsFetchingStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { return !source->StartFetchingIndexes(source, step, Indexes); } -TConclusion TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - source->AssembleColumns(Columns); - return true; -} - -ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnRawBytes(Columns->GetColumnIds()); -} - -TConclusion TOptionalAssemblerStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - source->AssembleColumns(Columns, !source->GetExclusiveIntervalOnly() || !source->IsSourceInMemory()); - return true; -} - -ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential); -} - TConclusion TFilterProgramStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { AFL_VERIFY(source); AFL_VERIFY(Step); @@ -128,164 +70,11 @@ TConclusion TShardingFilter::DoExecuteInplace(const std::shared_ptr TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - std::vector> columns; - for (auto&& f : IIndexInfo::ArrowSchemaSnapshot()->fields()) { - columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), Count)); - } - source->MutableStageData().AddBatch( - std::make_shared(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns))); - return true; -} - TConclusion TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { source->ApplyIndex(IndexChecker); return true; } -TConclusion TFetchingScriptCursor::Execute(const std::shared_ptr& source) { - AFL_VERIFY(source); - NMiniKQL::TThrowingBindTerminator bind; - Script->OnExecute(); - AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); - while (!Script->IsFinished(CurrentStepIdx)) { - if (source->GetStageData().IsEmpty()) { - source->OnEmptyStageData(); - break; - } - auto step = Script->GetStep(CurrentStepIdx); - TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(), - IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx); - AFL_VERIFY(!CurrentStartInstant); - CurrentStartInstant = TMonotonic::Now(); - AFL_VERIFY(!CurrentStartDataSize); - CurrentStartDataSize = step->GetProcessingDataSize(source); - const TConclusion resultStep = step->ExecuteInplace(source, *this); - if (!resultStep) { - return resultStep; - } - if (!*resultStep) { - return false; - } - FlushDuration(); - ++CurrentStepIdx; - } - return true; -} - -bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr&& guard, - const std::shared_ptr& /*allocation*/) { - auto data = Source.lock(); - if (!data || data->GetContext()->IsAborted()) { - guard->Release(); - return false; - } - if (StageIndex == EStageFeaturesIndexes::Accessors) { - data->MutableStageData().SetAccessorsGuard(std::move(guard)); - } else { - data->RegisterAllocationGuard(std::move(guard)); - } - Step.Next(); - auto task = std::make_shared(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId()); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); - return true; -} - -TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( - const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex) - : TBase(mem) - , Source(source) - , Step(step) - , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) - , StageIndex(stageIndex) { -} - -void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { - auto sourcePtr = Source.lock(); - if (sourcePtr) { - sourcePtr->GetContext()->GetCommonContext()->AbortWithError( - "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); - } -} - -TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - ui64 size = PredefinedSize.value_or(0); - for (auto&& i : Packs) { - ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType()); - if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) { - const ui32 filtered = source->GetStageData().GetFilteredCount( - source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust()); - if (filtered < source->GetRecordsCount()) { - sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount(); - } - } - size += sizeLocal; - } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TAllocateMemoryStep::DoExecuteInplace")("source", source->GetSourceIdx())("memory", size); - - auto allocation = std::make_shared(source, size, step, StageIndex); - NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), - source->GetContext()->GetCommonContext()->GetScanId(), source->GetFirstIntervalId(), { allocation }, (ui32)StageIndex); - return false; -} - -ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr& /*source*/) const { - return 0; -} - -TString TFetchingScript::DebugString() const { - TStringBuilder sb; - TStringBuilder sbBranch; - for (auto&& i : Steps) { - if (i->GetSumDuration() > TDuration::MilliSeconds(10)) { - sbBranch << "{" << i->DebugString() << "};"; - } - } - if (!sbBranch) { - return ""; - } - sb << "{branch:" << BranchName << ";limit:" << Limit << ";"; - if (FinishInstant && StartInstant) { - sb << "duration:" << *FinishInstant - *StartInstant << ";"; - } - - sb << "steps_10Ms:[" << sbBranch << "]}"; - return sb; -} - -TFetchingScript::TFetchingScript(const TSpecialReadContext& context) - : Limit(context.GetReadMetadata()->GetLimitRobust()) { -} - -void TFetchingScript::Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { - if (Steps.size() == 0) { - AddStep(entityIds, mType, stage); - } else { - std::optional addIndex; - for (i32 i = Steps.size() - 1; i >= 0; --i) { - if (auto allocation = std::dynamic_pointer_cast(Steps[i])) { - if (allocation->GetStage() == stage) { - allocation->AddAllocation(entityIds, mType); - return; - } else { - addIndex = i + 1; - } - break; - } else if (std::dynamic_pointer_cast(Steps[i])) { - continue; - } else if (std::dynamic_pointer_cast(Steps[i])) { - continue; - } else { - addIndex = i + 1; - break; - } - } - AFL_VERIFY(addIndex); - InsertStep(*addIndex, entityIds, mType, stage); - } -} - NKikimr::TConclusion TFilterCutLimit::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { source->MutableStageData().CutFilter(source->GetRecordsCount(), Limit, Reverse); @@ -313,4 +102,15 @@ TConclusion TDetectInMem::DoExecuteInplace(const std::shared_ptr TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + std::vector> columns; + for (auto&& f : IIndexInfo::ArrowSchemaSnapshot()->fields()) { + columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), source->GetRecordsCount())); + } + source->MutableStageData().AddBatch(std::make_shared( + arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), source->GetRecordsCount(), columns))); + source->BuildStageResult(source); + return true; +} + } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index e957091587d6..0762c4e5a5e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -16,186 +17,43 @@ using TIndexesSet = NCommon::TIndexesSet; using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; +using TFetchingScriptCursor = NCommon::TFetchingScriptCursor; +using TStepAction = NCommon::TStepAction; class IDataSource; -class TFetchingScriptCursor; class TSpecialReadContext; -class IFetchingStep { +class IFetchingStep: public NCommon::IFetchingStep { private: - YDB_READONLY_DEF(TString, Name); - YDB_READONLY(TDuration, SumDuration, TDuration::Zero()); - YDB_READONLY(ui64, SumSize, 0); - -protected: + using TBase = NCommon::IFetchingStep; virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const = 0; - virtual TString DoDebugString() const { - return ""; - } - -public: - void AddDuration(const TDuration d) { - SumDuration += d; - } - void AddDataSize(const ui64 size) { - SumSize += size; - } - - virtual ~IFetchingStep() = default; - - [[nodiscard]] TConclusion ExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - return DoExecuteInplace(source, step); - } - virtual ui64 GetProcessingDataSize(const std::shared_ptr& /*source*/) const { return 0; } - IFetchingStep(const TString& name) - : Name(name) { - } - - TString DebugString() const { - TStringBuilder sb; - sb << "name=" << Name << ";duration=" << SumDuration << ";" - << "size=" << 1e-9 * SumSize << ";details={" << DoDebugString() << "};"; - return sb; - } -}; - -class TFetchingScript { -private: - YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); - std::vector> Steps; - std::optional StartInstant; - std::optional FinishInstant; - const ui32 Limit; - -public: - TFetchingScript(const TSpecialReadContext& context); - - void Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); - - void AddStepDataSize(const ui32 index, const ui64 size) { - GetStep(index)->AddDataSize(size); - } - - void AddStepDuration(const ui32 index, const TDuration d) { - FinishInstant = TMonotonic::Now(); - GetStep(index)->AddDuration(d); - } - - void OnExecute() { - if (!StartInstant) { - StartInstant = TMonotonic::Now(); - } - } - - TString DebugString() const; - - const std::shared_ptr& GetStep(const ui32 index) const { - AFL_VERIFY(index < Steps.size()); - return Steps[index]; - } - - template - std::shared_ptr AddStep(Args... args) { - auto result = std::make_shared(args...); - Steps.emplace_back(result); - return result; - } - - template - std::shared_ptr InsertStep(const ui32 index, Args... args) { - AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); - auto result = std::make_shared(args...); - Steps.insert(Steps.begin() + index, result); - return result; - } - - void AddStep(const std::shared_ptr& step) { - AFL_VERIFY(step); - Steps.emplace_back(step); - } - - bool IsFinished(const ui32 currentStepIdx) const { - AFL_VERIFY(currentStepIdx <= Steps.size()); - return currentStepIdx == Steps.size(); - } - - ui32 Execute(const ui32 startStepIdx, const std::shared_ptr& source) const; -}; - -class TFetchingScriptCursor { -private: - std::optional CurrentStartInstant; - std::optional CurrentStartDataSize; - ui32 CurrentStepIdx = 0; - std::shared_ptr Script; - void FlushDuration() { - AFL_VERIFY(CurrentStartInstant); - AFL_VERIFY(CurrentStartDataSize); - Script->AddStepDuration(CurrentStepIdx, TMonotonic::Now() - *CurrentStartInstant); - Script->AddStepDataSize(CurrentStepIdx, *CurrentStartDataSize); - CurrentStartInstant.reset(); - CurrentStartDataSize.reset(); - } - -public: - TFetchingScriptCursor(const std::shared_ptr& script, const ui32 index) - : CurrentStepIdx(index) - , Script(script) { - AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); - } - - const TString& GetName() const { - return Script->GetStep(CurrentStepIdx)->GetName(); - } - - TString DebugString() const { - return Script->GetStep(CurrentStepIdx)->DebugString(); + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final { + return GetProcessingDataSize(std::static_pointer_cast(source)); } - bool Next() { - FlushDuration(); - return !Script->IsFinished(++CurrentStepIdx); + virtual TConclusion DoExecuteInplace( + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final { + const auto source = std::static_pointer_cast(sourceExt); + return DoExecuteInplace(source, step); } - TConclusion Execute(const std::shared_ptr& source); -}; - -class TStepAction: public IDataTasksProcessor::ITask { -private: - using TBase = IDataTasksProcessor::ITask; - std::shared_ptr Source; - TFetchingScriptCursor Cursor; - bool FinishedFlag = false; - const NColumnShard::TCounterGuard CountersGuard; - -protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual TConclusionStatus DoExecuteImpl() override; - public: - virtual TString GetTaskClassIdentifier() const override { - return "STEP_ACTION"; - } - - TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId); + using TBase::TBase; }; class TBuildFakeSpec: public IFetchingStep { private: using TBase = IFetchingStep; - const ui32 Count = 0; protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; public: - TBuildFakeSpec(const ui32 count) - : TBase("FAKE_SPEC") - , Count(count) { - AFL_VERIFY(Count); + TBuildFakeSpec() + : TBase("FAKE_SPEC") { } }; @@ -214,74 +72,6 @@ class TApplyIndexStep: public IFetchingStep { } }; -class TAllocateMemoryStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - class TColumnsPack { - private: - YDB_READONLY_DEF(TColumnsSetIds, Columns); - YDB_READONLY(EMemType, MemType, EMemType::Blob); - - public: - TColumnsPack(const TColumnsSetIds& columns, const EMemType memType) - : Columns(columns) - , MemType(memType) { - } - }; - std::vector Packs; - THashMap> Control; - const EStageFeaturesIndexes StageIndex; - const std::optional PredefinedSize; - -protected: - class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { - private: - using TBase = NGroupedMemoryManager::IAllocation; - std::weak_ptr Source; - TFetchingScriptCursor Step; - NColumnShard::TCounterGuard TasksGuard; - const EStageFeaturesIndexes StageIndex; - virtual bool DoOnAllocated(std::shared_ptr&& guard, - const std::shared_ptr& allocation) override; - virtual void DoOnAllocationImpossible(const TString& errorMessage) override; - - public: - TFetchingStepAllocation(const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, - const EStageFeaturesIndexes stageIndex); - }; - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - virtual TString DoDebugString() const override { - return TStringBuilder() << "stage=" << StageIndex << ";"; - } - -public: - void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) { - if (!ids.GetColumnsCount()) { - return; - } - for (auto&& i : ids.GetColumnIds()) { - AFL_VERIFY(Control[i].emplace(memType).second); - } - Packs.emplace_back(ids, memType); - } - EStageFeaturesIndexes GetStage() const { - return StageIndex; - } - - TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex) - : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) - , StageIndex(stageIndex) { - AddAllocation(columns, memType); - } - - TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex) - : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) - , StageIndex(stageIndex) - , PredefinedSize(memSize) { - } -}; - class TDetectInMemStep: public IFetchingStep { private: using TBase = IFetchingStep; @@ -302,26 +92,6 @@ class TDetectInMemStep: public IFetchingStep { } }; -class TColumnBlobsFetchingStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - TColumnsSetIds Columns; - -protected: - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns.DebugString() << ";"; - } - -public: - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - TColumnBlobsFetchingStep(const TColumnsSetIds& columns) - : TBase("FETCHING_COLUMNS") - , Columns(columns) { - AFL_VERIFY(Columns.GetColumnsCount()); - } -}; - class TPortionAccessorFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; @@ -358,45 +128,6 @@ class TIndexBlobsFetchingStep: public IFetchingStep { } }; -class TAssemblerStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - YDB_READONLY_DEF(std::shared_ptr, Columns); - virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; - } - -public: - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - TAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) - : TBase("ASSEMBLER" + (specName ? "::" + specName : "")) - , Columns(columns) { - AFL_VERIFY(Columns); - AFL_VERIFY(Columns->GetColumnsCount()); - } -}; - -class TOptionalAssemblerStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - YDB_READONLY_DEF(std::shared_ptr, Columns); - virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; - } - -public: - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - TOptionalAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) - : TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : "")) - , Columns(columns) { - AFL_VERIFY(Columns); - AFL_VERIFY(Columns->GetColumnsCount()); - } -}; - class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index 8633f5d692a8..328d9a38c40e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -9,8 +9,9 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) , SpecialReadContext(std::make_shared(context)) { ui32 sourceIdx = 0; std::deque> sources; + const auto readMetadata = GetReadMetadataVerifiedAs(); const auto& portions = GetReadMetadata()->SelectInfo->Portions; - const auto& committed = GetReadMetadata()->CommittedBlobs; + const auto& committed = readMetadata->CommittedBlobs; ui64 compactedPortionsBytes = 0; ui64 insertedPortionsBytes = 0; ui64 committedPortionsBytes = 0; @@ -51,7 +52,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) auto& stats = GetReadMetadata()->ReadStats; stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size(); stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs(); - stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size(); + stats->CommittedBatches = readMetadata->CommittedBlobs.size(); stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount(); stats->CommittedPortionsBytes = committedPortionsBytes; stats->InsertedPortionsBytes = insertedPortionsBytes; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h index d20c1cd49739..960f49541bc6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h @@ -49,7 +49,12 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin Scanner->OnSentDataFromInterval(intervalIdx); } - const TReadMetadata::TConstPtr& GetReadMetadata() const { + template + std::shared_ptr GetReadMetadataVerifiedAs() const { + return SpecialReadContext->GetReadMetadataVerifiedAs(); + } + + const NCommon::TReadMetadata::TConstPtr& GetReadMetadata() const { return SpecialReadContext->GetReadMetadata(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 5f472ada96e9..9a9bd23c8571 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -23,36 +23,53 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr& fetch void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::shared_ptr& sourcePtr) { AFL_VERIFY(FetchingPlan); - AFL_VERIFY(!Context->IsAborted()); + AFL_VERIFY(!GetContext()->IsAborted()); if (!IsReadyFlag) { AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); } if (AtomicCas(&SourceStartedFlag, 1, 0)) { - SetFirstIntervalId(interval.GetIntervalId()); + SetMemoryGroupId(interval.GetIntervalId()); AFL_VERIFY(FetchingPlan); StageData = std::make_unique(GetExclusiveIntervalOnly()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx); - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); - if (Context->IsAborted()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); + NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", GetSourceIdx())("method", "InitFetchingPlan")); + if (GetContext()->IsAborted()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted"); return; } TFetchingScriptCursor cursor(FetchingPlan, 0); - auto task = std::make_shared(sourcePtr, std::move(cursor), Context->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } } -void IDataSource::SetIsReady() { +void IDataSource::DoOnSourceFetchingFinishedSafe(IDataReader& /*owner*/, const std::shared_ptr& /*sourcePtr*/) { AFL_VERIFY(!IsReadyFlag); IsReadyFlag = true; for (auto&& i : Intervals) { - i.second->OnSourceFetchStageReady(SourceIdx); + i.second->OnSourceFetchStageReady(GetSourceIdx()); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "source_ready")("intervals_count", Intervals.size())("source_idx", SourceIdx); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "source_ready")("intervals_count", Intervals.size())("source_idx", GetSourceIdx()); Intervals.clear(); } +void IDataSource::DoOnEmptyStageData(const std::shared_ptr& sourcePtr) { + if (ResourceGuards.size()) { + if (ExclusiveIntervalOnly) { + ResourceGuards.back()->Update(0); + } else { + ResourceGuards.back()->Update(GetColumnRawBytes(GetContext()->GetPKColumns()->GetColumnIds())); + } + } + DoBuildStageResult(sourcePtr); +} + +void IDataSource::DoBuildStageResult(const std::shared_ptr& /*sourcePtr*/) { + TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); + StageResult = std::make_unique(std::move(StageData)); + StageData.reset(); +} + void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, TBlobsAction& blobsAction, THashMap& defaultBlocks, const std::shared_ptr& filter) { const NArrow::TColumnFilter& cFilter = filter ? *filter : NArrow::TColumnFilter::BuildAllowFilter(); @@ -86,7 +103,7 @@ void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, TBlob } bool TPortionDataSource::DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName()); AFL_VERIFY(columns.GetColumnsCount()); AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter()); @@ -140,7 +157,7 @@ bool TPortionDataSource::DoStartFetchingIndexes( return false; } - auto constructor = std::make_shared(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = std::make_shared(readingActions, std::static_pointer_cast(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } @@ -245,7 +262,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& /*columns*/) { + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& /*columns*/) { if (ReadStarted) { return false; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 53bb37e6506f..460cb8e85f62 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -27,51 +28,39 @@ class TPlainReadData; class IFetchTaskConstructor; class IFetchingStep; -class IDataSource { +class IDataSource: public NCommon::IDataSource { private: + using TBase = NCommon::IDataSource; YDB_ACCESSOR(bool, ExclusiveIntervalOnly, true); - YDB_READONLY(ui32, SourceIdx, 0); YDB_READONLY_DEF(NArrow::NMerger::TSortableBatchPosition, Start); YDB_READONLY_DEF(NArrow::NMerger::TSortableBatchPosition, Finish); NArrow::TReplaceKey StartReplaceKey; NArrow::TReplaceKey FinishReplaceKey; - YDB_READONLY_DEF(std::shared_ptr, Context); - YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero()); - YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); - YDB_READONLY(ui32, RecordsCount, 0); - YDB_READONLY_DEF(std::optional, ShardingVersionOptional); - YDB_READONLY(bool, HasDeletions, false); YDB_READONLY(ui32, IntervalsCount, 0); virtual NJson::TJsonValue DoDebugJson() const = 0; bool MergingStartedFlag = false; TAtomic SourceStartedFlag = 0; std::shared_ptr FetchingPlan; - std::vector> ResourceGuards; - std::optional FirstIntervalId; ui32 CurrentPlanStepIndex = 0; YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage); + virtual void DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr& /*sourcePtr*/) override; + virtual void DoBuildStageResult(const std::shared_ptr& sourcePtr) override; + virtual void DoOnEmptyStageData(const std::shared_ptr& sourcePtr) override; + protected: - std::optional IsSourceInMemoryFlag; THashMap Intervals; - std::unique_ptr StageData; - std::unique_ptr StageResult; - TAtomic FilterStageFlag = 0; bool IsReadyFlag = false; - virtual bool DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) = 0; virtual bool DoStartFetchingIndexes( const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) = 0; - virtual void DoAssembleColumns(const std::shared_ptr& columns, const bool sequential) = 0; virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; virtual NJson::TJsonValue DoDebugJsonForMemory() const { return NJson::JSON_MAP; } - virtual bool DoAddTxConflict() = 0; virtual bool DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) = 0; public: @@ -82,52 +71,6 @@ class IDataSource { return DoStartFetchingAccessor(sourcePtr, step); } - bool AddTxConflict() { - if (!Context->GetCommonContext()->HasLock()) { - return false; - } - if (DoAddTxConflict()) { - StageData->Clear(); - return true; - } - return false; - } - - ui64 GetResourceGuardsMemory() const { - ui64 result = 0; - for (auto&& i : ResourceGuards) { - result += i->GetMemory(); - } - return result; - } - - void RegisterAllocationGuard(const std::shared_ptr& guard) { - ResourceGuards.emplace_back(guard); - } - bool IsSourceInMemory() const { - AFL_VERIFY(IsSourceInMemoryFlag); - return *IsSourceInMemoryFlag; - } - void SetSourceInMemory(const bool value) { - AFL_VERIFY(!IsSourceInMemoryFlag); - IsSourceInMemoryFlag = value; - if (NeedAccessorsForRead()) { - AFL_VERIFY(StageData); - if (!value) { - StageData->SetUseFilter(value); - } - } - } - void SetFirstIntervalId(const ui64 value) { - AFL_VERIFY(!FirstIntervalId); - FirstIntervalId = value; - } - ui64 GetFirstIntervalId() const { - AFL_VERIFY(!!FirstIntervalId); - return *FirstIntervalId; - } - virtual THashMap DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobsOriginal) const = 0; - virtual ui64 GetPathId() const = 0; virtual bool HasIndexes(const std::set& indexIds) const = 0; @@ -138,33 +81,10 @@ class IDataSource { return FinishReplaceKey; } - const TFetchedResult& GetStageResult() const { - AFL_VERIFY(!!StageResult); - return *StageResult; - } - - void SetIsReady(); - - void Finalize() { - TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - StageResult = std::make_unique(std::move(StageData)); - } - void ApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) { return DoApplyIndex(indexMeta); } - void AssembleColumns(const std::shared_ptr& columns, const bool sequential = false) { - if (columns->IsEmpty()) { - return; - } - DoAssembleColumns(columns, sequential); - } - - bool StartFetchingColumns(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { - return DoStartFetchingColumns(sourcePtr, step, columns); - } - bool StartFetchingIndexes( const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) { AFL_VERIFY(indexes); @@ -179,11 +99,7 @@ class IDataSource { ++IntervalsCount; } - virtual ui64 GetColumnsVolume(const std::set& columnIds, const EMemType type) const = 0; - - virtual ui64 GetColumnRawBytes(const std::set& columnIds) const = 0; virtual ui64 GetIndexRawBytes(const std::set& indexIds) const = 0; - virtual ui64 GetColumnBlobBytes(const std::set& columnsIds) const = 0; bool IsMergingStarted() const { return MergingStartedFlag; @@ -202,13 +118,14 @@ class IDataSource { NJson::TJsonValue DebugJsonForMemory() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("details", DoDebugJsonForMemory()); - result.InsertValue("count", RecordsCount); + result.InsertValue("count", GetRecordsCount()); return result; } NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue("source_idx", SourceIdx); + result.InsertValue("source_id", GetSourceId()); + result.InsertValue("source_idx", GetSourceIdx()); result.InsertValue("start", Start.DebugJson()); result.InsertValue("finish", Finish.DebugJson()); result.InsertValue("specific", DoDebugJson()); @@ -221,44 +138,17 @@ class IDataSource { return IsReadyFlag; } - void OnEmptyStageData() { - if (!ResourceGuards.size()) { - return; - } - if (ExclusiveIntervalOnly) { - ResourceGuards.back()->Update(0); - } else { - ResourceGuards.back()->Update(GetColumnRawBytes(Context->GetPKColumns()->GetColumnIds())); - } - } - - const TFetchedData& GetStageData() const { - AFL_VERIFY(StageData); - return *StageData; - } - - TFetchedData& MutableStageData() { - AFL_VERIFY(StageData); - return *StageData; - } - void RegisterInterval(TFetchingInterval& interval, const std::shared_ptr& sourcePtr); - IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, const NArrow::TReplaceKey& start, + IDataSource(const ui64 sourceId, const ui32 sourceIdx, const std::shared_ptr& context, const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMin, const TSnapshot& recordSnapshotMax, const ui32 recordsCount, const std::optional shardingVersion, const bool hasDeletions) - : SourceIdx(sourceIdx) + : TBase(sourceId, sourceIdx, context, recordSnapshotMin, recordSnapshotMax, recordsCount, shardingVersion, hasDeletions) , Start(context->GetReadMetadata()->BuildSortedPosition(start)) , Finish(context->GetReadMetadata()->BuildSortedPosition(finish)) , StartReplaceKey(start) - , FinishReplaceKey(finish) - , Context(context) - , RecordSnapshotMin(recordSnapshotMin) - , RecordSnapshotMax(recordSnapshotMax) - , RecordsCount(recordsCount) - , ShardingVersionOptional(shardingVersion) - , HasDeletions(hasDeletions) { - UsageClass = Context->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(GetStartReplaceKey(), GetFinishReplaceKey()); + , FinishReplaceKey(finish) { + UsageClass = GetContext()->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(GetStartReplaceKey(), GetFinishReplaceKey()); AFL_VERIFY(UsageClass != TPKRangeFilter::EUsageClass::DontUsage); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson()); if (Start.IsReverseSort()) { @@ -283,9 +173,9 @@ class TPortionDataSource: public IDataSource { virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) override; virtual bool DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; - virtual bool DoStartFetchingIndexes( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) override; + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; + virtual bool DoStartFetchingIndexes(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, + const std::shared_ptr& indexes) override; virtual void DoAssembleColumns(const std::shared_ptr& columns, const bool sequential) override; virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; @@ -372,7 +262,6 @@ class TPortionDataSource: public IDataSource { } virtual ui64 GetIndexRawBytes(const std::set& indexIds) const override { - return Portion->GetTotalRawBytes(); return GetStageData().GetPortionAccessor().GetIndexRawBytes(indexIds, false); } @@ -385,9 +274,9 @@ class TPortionDataSource: public IDataSource { } TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr& portion, const std::shared_ptr& context) - : TBase(sourceIdx, context, portion->IndexKeyStart(), portion->IndexKeyEnd(), portion->RecordSnapshotMin(TSnapshot::Zero()), - portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(), portion->GetShardingVersionOptional(), - portion->GetMeta().GetDeletionsCount()) + : TBase(portion->GetPortionId(), sourceIdx, context, portion->IndexKeyStart(), portion->IndexKeyEnd(), + portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(), + portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount()) , Portion(portion) , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) { } @@ -403,7 +292,7 @@ class TCommittedDataSource: public IDataSource { } virtual bool DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; virtual bool DoStartFetchingIndexes(const std::shared_ptr& /*sourcePtr*/, const TFetchingScriptCursor& /*step*/, const std::shared_ptr& /*indexes*/) override { return false; @@ -495,8 +384,9 @@ class TCommittedDataSource: public IDataSource { } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context) - : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetCommittedSnapshotDef(TSnapshot::Zero()), - committed.GetCommittedSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) + : TBase((ui64)committed.GetInsertWriteId(), sourceIdx, context, committed.GetFirst(), committed.GetLast(), + committed.GetCommittedSnapshotDef(TSnapshot::Zero()), committed.GetCommittedSnapshotDef(TSnapshot::Zero()), + committed.GetRecordsCount(), {}, committed.GetIsDelete()) , CommittedBlob(committed) { } }; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp index cf4b7b6f0aaf..c39320e79981 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp @@ -20,7 +20,8 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra } TBlobsFetcherTask::TBlobsFetcherTask(const std::vector>& readActions, - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& context, + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, + const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId) : TBase(readActions, taskCustomer, externalTaskId) , Source(sourcePtr) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h index 5290bd4d6982..ef8cd6b62a7b 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h @@ -11,16 +11,17 @@ namespace NKikimr::NOlap::NReader::NSimple { class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter { private: using TBase = NBlobOperations::NRead::ITask; - const std::shared_ptr Source; + const std::shared_ptr Source; TFetchingScriptCursor Step; - const std::shared_ptr Context; + const std::shared_ptr Context; const NColumnShard::TCounterGuard Guard; virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; public: - TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, - const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId); + TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, + const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, + const TString& externalTaskId); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 207efc4e6deb..a14f1c17c920 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -1,25 +1,27 @@ #include "context.h" #include "source.h" +#include #include namespace NKikimr::NOlap::NReader::NSimple { -std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source) { - const bool needSnapshots = ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax(); +std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(const std::shared_ptr& sourceExt) { + const auto source = std::static_pointer_cast(sourceExt); + const bool needSnapshots = GetReadMetadata()->GetRequestSnapshot() < source->GetRecordSnapshotMax(); if (!needSnapshots && GetFFColumns()->GetColumnIds().size() == 1 && GetFFColumns()->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX)) { std::shared_ptr result = std::make_shared(*this); source->SetSourceInMemory(true); result->SetBranchName("FAKE"); - result->AddStep(std::make_shared(source->GetRecordsCount())); + result->AddStep(); result->AddStep(0, source->GetRecordsCount()); return result; } if (!source->GetStageData().HasPortionAccessor()) { if (!AskAccumulatorsScript) { AskAccumulatorsScript = std::make_shared(*this); - AskAccumulatorsScript->AddStep( + AskAccumulatorsScript->AddStep( source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors); AskAccumulatorsScript->AddStep(); AskAccumulatorsScript->AddStep(*GetFFColumns()); @@ -39,9 +41,9 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false); const bool hasDeletions = source->GetHasDeletions(); bool needShardingFilter = false; - if (!!ReadMetadata->GetRequestShardingInfo()) { + if (!!GetReadMetadata()->GetRequestShardingInfo()) { auto ver = source->GetShardingVersionOptional(); - if (!ver || *ver < ReadMetadata->GetRequestShardingInfo()->GetSnapshotVersion()) { + if (!ver || *ver < GetReadMetadata()->GetRequestShardingInfo()->GetSnapshotVersion()) { needShardingFilter = true; } } @@ -64,67 +66,12 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con } } -class TColumnsAccumulator { -private: - TColumnsSetIds FetchingReadyColumns; - TColumnsSetIds AssemblerReadyColumns; - ISnapshotSchema::TPtr FullSchema; - std::shared_ptr GuaranteeNotOptional; - -public: - TColumnsAccumulator(const std::shared_ptr& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema) - : FullSchema(fullSchema) - , GuaranteeNotOptional(guaranteeNotOptional) { - } - - TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const { - return columns - FetchingReadyColumns; - } - - bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { - auto actualColumns = GetNotFetchedAlready(columns); - FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns; - if (!actualColumns.IsEmpty()) { - script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob); - script.AddStep(std::make_shared(actualColumns)); - return true; - } - return false; - } - bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, - const bool sequential) { - auto actualColumns = columns - AssemblerReadyColumns; - AssemblerReadyColumns = AssemblerReadyColumns + columns; - if (actualColumns.IsEmpty()) { - return false; - } - auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); - if (sequential) { - const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); - if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); - std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); - *actualSet = *actualSet - *cross; - } - if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); - } - } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); - } - return true; - } -}; - std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - TColumnsAccumulator acc(GetMergeColumns(), ReadMetadata->GetResultSchema()); + NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); if (!!IndexChecker && useIndexes) { result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); result->AddStep(std::make_shared(IndexChecker)); @@ -164,11 +111,11 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } - for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + for (auto&& i : GetReadMetadata()->GetProgram().GetSteps()) { if (i->GetFilterOriginalColumnIds().empty()) { break; } - TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), ReadMetadata->GetResultSchema()); + TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), GetReadMetadata()->GetResultSchema()); acc.AddAssembleStep(*result, stepColumnIds, "EF", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared(i)); if (!i->IsFilterOnly()) { @@ -181,13 +128,13 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, false); } + result->AddStep(); result->AddStep(); return result; } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) : TBase(commonContext) { - ReadMetadata = GetCommonContext()->GetReadMetadataPtrVerifiedAs(); } TString TSpecialReadContext::ProfileDebugString() const { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h index 5fdb245d9b77..d1dd942a611c 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h @@ -15,11 +15,11 @@ using TColumnsSet = NCommon::TColumnsSet; using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; +using TFetchingScript = NCommon::TFetchingScript; class TSpecialReadContext: public NCommon::TSpecialReadContext { private: using TBase = NCommon::TSpecialReadContext; - TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; @@ -27,16 +27,12 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext { CacheFetchingScripts; std::shared_ptr AskAccumulatorsScript; -public: - const TReadMetadata::TConstPtr& GetReadMetadata() const { - return ReadMetadata; - } + virtual std::shared_ptr DoGetColumnsFetchingPlan(const std::shared_ptr& source) override; +public: virtual TString ProfileDebugString() const override; TSpecialReadContext(const std::shared_ptr& commonContext); - - std::shared_ptr GetColumnsFetchingPlan(const std::shared_ptr& source); }; } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index 6a1cd4587b96..482843d08130 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -12,69 +12,11 @@ namespace NKikimr::NOlap::NReader::NSimple { -bool TStepAction::DoApply(IDataReader& owner) const { - if (FinishedFlag) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); - auto* plainReader = static_cast(&owner); - plainReader->MutableScanner().OnSourceReady(Source, nullptr, 0, Source->GetRecordsCount(), *plainReader); - } - return true; -} - -TConclusionStatus TStepAction::DoExecuteImpl() { - if (Source->GetContext()->IsAborted()) { - return TConclusionStatus::Success(); - } - auto executeResult = Cursor.Execute(Source); - if (!executeResult) { - return executeResult; - } - if (*executeResult) { - FinishedFlag = true; - } - return TConclusionStatus::Success(); -} - -TStepAction::TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) - : TBase(ownerActorId) - , Source(source) - , Cursor(std::move(cursor)) - , CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) { -} - -TConclusion TColumnBlobsFetchingStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - return !source->StartFetchingColumns(source, step, Columns); -} - -ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnBlobBytes(Columns.GetColumnIds()); -} - TConclusion TIndexBlobsFetchingStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { return !source->StartFetchingIndexes(source, step, Indexes); } -TConclusion TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - source->AssembleColumns(Columns); - return true; -} - -ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnRawBytes(Columns->GetColumnIds()); -} - -TConclusion TOptionalAssemblerStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - source->AssembleColumns(Columns, !source->IsSourceInMemory()); - return true; -} - -ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr& source) const { - return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential); -} - TConclusion TFilterProgramStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { AFL_VERIFY(source); AFL_VERIFY(Step); @@ -130,164 +72,11 @@ TConclusion TShardingFilter::DoExecuteInplace(const std::shared_ptr TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - std::vector> columns; - for (auto&& f : IIndexInfo::ArrowSchemaSnapshot()->fields()) { - columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), Count)); - } - source->MutableStageData().AddBatch( - std::make_shared(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns))); - source->SetUsedRawBytes(0); - source->Finalize({}); - return true; -} - TConclusion TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { source->ApplyIndex(IndexChecker); return true; } -TConclusion TFetchingScriptCursor::Execute(const std::shared_ptr& source) { - AFL_VERIFY(source); - NMiniKQL::TThrowingBindTerminator bind; - Script->OnExecute(); - while (!Script->IsFinished(CurrentStepIdx)) { - if (source->HasStageData() && source->GetStageData().IsEmpty()) { - source->OnEmptyStageData(); - break; - } - auto step = Script->GetStep(CurrentStepIdx); - TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(), - IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx); - AFL_VERIFY(!CurrentStartInstant); - CurrentStartInstant = TMonotonic::Now(); - AFL_VERIFY(!CurrentStartDataSize); - CurrentStartDataSize = step->GetProcessingDataSize(source); - const TConclusion resultStep = step->ExecuteInplace(source, *this); - if (!resultStep) { - return resultStep; - } - if (!*resultStep) { - return false; - } - FlushDuration(); - ++CurrentStepIdx; - } - return true; -} - -bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr&& guard, - const std::shared_ptr& /*allocation*/) { - auto data = Source.lock(); - if (!data || data->GetContext()->IsAborted()) { - guard->Release(); - return false; - } - if (StageIndex == EStageFeaturesIndexes::Accessors) { - data->MutableStageData().SetAccessorsGuard(std::move(guard)); - } else { - data->RegisterAllocationGuard(std::move(guard)); - } - Step.Next(); - auto task = std::make_shared(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId()); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); - return true; -} - -TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( - const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex) - : TBase(mem) - , Source(source) - , Step(step) - , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) - , StageIndex(stageIndex) { -} - -void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { - auto sourcePtr = Source.lock(); - if (sourcePtr) { - sourcePtr->GetContext()->GetCommonContext()->AbortWithError( - "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); - } -} - -TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - ui64 size = PredefinedSize.value_or(0); - for (auto&& i : Packs) { - ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType()); - if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) { - const ui32 filtered = - source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust()); - if (filtered < source->GetRecordsCount()) { - sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount(); - } - } - size += sizeLocal; - } - - auto allocation = std::make_shared(source, size, step, StageIndex); - NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), - source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex); - return false; -} - -ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr& /*source*/) const { - return 0; -} - -TString TFetchingScript::DebugString() const { - TStringBuilder sb; - TStringBuilder sbBranch; - for (auto&& i : Steps) { - if (i->GetSumDuration() > TDuration::MilliSeconds(10)) { - sbBranch << "{" << i->DebugString() << "};"; - } - } - if (!sbBranch) { - return ""; - } - sb << "{branch:" << BranchName << ";limit:" << Limit << ";"; - if (FinishInstant && StartInstant) { - sb << "duration:" << *FinishInstant - *StartInstant << ";"; - } - - sb << "steps_10Ms:[" << sbBranch << "]}"; - return sb; -} - -TFetchingScript::TFetchingScript(const TSpecialReadContext& context) - : Limit(context.GetReadMetadata()->GetLimitRobust()) { -} - -void TFetchingScript::Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { - if (Steps.size() == 0) { - AddStep(entityIds, mType, stage); - } else { - std::optional addIndex; - for (i32 i = Steps.size() - 1; i >= 0; --i) { - if (auto allocation = std::dynamic_pointer_cast(Steps[i])) { - if (allocation->GetStage() == stage) { - allocation->AddAllocation(entityIds, mType); - return; - } else { - addIndex = i + 1; - } - break; - } else if (std::dynamic_pointer_cast(Steps[i])) { - continue; - } else if (std::dynamic_pointer_cast(Steps[i])) { - continue; - } else { - addIndex = i + 1; - break; - } - } - AFL_VERIFY(addIndex); - InsertStep(*addIndex, entityIds, mType, stage); - } -} - NKikimr::TConclusion TFilterCutLimit::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { source->MutableStageData().CutFilter(source->GetRecordsCount(), Limit, Reverse); @@ -385,7 +174,6 @@ TConclusion TBuildResultStep::DoExecuteInplace(const std::shared_ptr TPrepareResultStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - source->Finalize(NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion()); std::shared_ptr plan = std::make_shared(*source->GetContext()); if (source->IsSourceInMemory()) { AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1); @@ -406,4 +194,16 @@ TConclusion TPrepareResultStep::DoExecuteInplace(const std::shared_ptr TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + std::vector> columns; + for (auto&& f : IIndexInfo::ArrowSchemaSnapshot()->fields()) { + columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), source->GetRecordsCount())); + } + source->MutableStageData().AddBatch(std::make_shared( + arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), source->GetRecordsCount(), columns))); + source->SetUsedRawBytes(0); + source->Finalize({}); + return true; +} + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index dd2c01051624..995597a168d6 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -11,191 +12,51 @@ namespace NKikimr::NOlap::NReader::NSimple { +class IDataSource; using TColumnsSet = NCommon::TColumnsSet; using TIndexesSet = NCommon::TIndexesSet; using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; +using TFetchingScriptCursor = NCommon::TFetchingScriptCursor; +using TStepAction = NCommon::TStepAction; -class IDataSource; -class TFetchingScriptCursor; class TSpecialReadContext; -class IFetchingStep { -private: - YDB_READONLY_DEF(TString, Name); - YDB_READONLY(TDuration, SumDuration, TDuration::Zero()); - YDB_READONLY(ui64, SumSize, 0); -protected: +class IFetchingStep: public NCommon::IFetchingStep { +private: + using TBase = NCommon::IFetchingStep; virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const = 0; - virtual TString DoDebugString() const { - return ""; - } - -public: - void AddDuration(const TDuration d) { - SumDuration += d; - } - void AddDataSize(const ui64 size) { - SumSize += size; - } - - virtual ~IFetchingStep() = default; - - [[nodiscard]] TConclusion ExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - return DoExecuteInplace(source, step); - } virtual ui64 GetProcessingDataSize(const std::shared_ptr& /*source*/) const { return 0; } - IFetchingStep(const TString& name) - : Name(name) { - } - - TString DebugString() const { - TStringBuilder sb; - sb << "name=" << Name << ";duration=" << SumDuration << ";" - << "size=" << 1e-9 * SumSize << ";details={" << DoDebugString() << "};"; - return sb; - } -}; - -class TFetchingScript { -private: - YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); - std::vector> Steps; - std::optional StartInstant; - std::optional FinishInstant; - const ui32 Limit; - -public: - TFetchingScript(const TSpecialReadContext& context); - - void Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); - - void AddStepDataSize(const ui32 index, const ui64 size) { - GetStep(index)->AddDataSize(size); - } - - void AddStepDuration(const ui32 index, const TDuration d) { - FinishInstant = TMonotonic::Now(); - GetStep(index)->AddDuration(d); - } - - void OnExecute() { - if (!StartInstant) { - StartInstant = TMonotonic::Now(); - } - } - - TString DebugString() const; - - const std::shared_ptr& GetStep(const ui32 index) const { - AFL_VERIFY(index < Steps.size()); - return Steps[index]; - } - - template - std::shared_ptr AddStep(Args... args) { - auto result = std::make_shared(args...); - Steps.emplace_back(result); - return result; - } - - template - std::shared_ptr InsertStep(const ui32 index, Args... args) { - AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); - auto result = std::make_shared(args...); - Steps.insert(Steps.begin() + index, result); - return result; - } - - void AddStep(const std::shared_ptr& step) { - AFL_VERIFY(step); - Steps.emplace_back(step); - } - - bool IsFinished(const ui32 currentStepIdx) const { - AFL_VERIFY(currentStepIdx <= Steps.size()); - return currentStepIdx == Steps.size(); + virtual TConclusion DoExecuteInplace( + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final { + const auto source = std::static_pointer_cast(sourceExt); + return DoExecuteInplace(source, step); } - ui32 Execute(const ui32 startStepIdx, const std::shared_ptr& source) const; -}; - -class TFetchingScriptCursor { -private: - std::optional CurrentStartInstant; - std::optional CurrentStartDataSize; - ui32 CurrentStepIdx = 0; - std::shared_ptr Script; - void FlushDuration() { - AFL_VERIFY(CurrentStartInstant); - AFL_VERIFY(CurrentStartDataSize); - Script->AddStepDuration(CurrentStepIdx, TMonotonic::Now() - *CurrentStartInstant); - Script->AddStepDataSize(CurrentStepIdx, *CurrentStartDataSize); - CurrentStartInstant.reset(); - CurrentStartDataSize.reset(); + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final { + return GetProcessingDataSize(std::static_pointer_cast(source)); } public: - TFetchingScriptCursor(const std::shared_ptr& script, const ui32 index) - : CurrentStepIdx(index) - , Script(script) { - AFL_VERIFY(!Script->IsFinished(CurrentStepIdx)); - } - - const TString& GetName() const { - return Script->GetStep(CurrentStepIdx)->GetName(); - } + using TBase::TBase; - TString DebugString() const { - return Script->GetStep(CurrentStepIdx)->DebugString(); - } - - bool Next() { - FlushDuration(); - return !Script->IsFinished(++CurrentStepIdx); - } - - TConclusion Execute(const std::shared_ptr& source); }; -class TStepAction: public IDataTasksProcessor::ITask { -private: - using TBase = IDataTasksProcessor::ITask; - std::shared_ptr Source; - TFetchingScriptCursor Cursor; - bool FinishedFlag = false; - const NColumnShard::TCounterGuard CountersGuard; - -protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual TConclusionStatus DoExecuteImpl() override; - -public: - virtual TString GetTaskClassIdentifier() const override { - return "STEP_ACTION"; - } - - TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId); -}; +class IDataSource; class TBuildFakeSpec: public IFetchingStep { private: using TBase = IFetchingStep; - const ui32 Count = 0; - -protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; public: - TBuildFakeSpec(const ui32 count) - : TBase("FAKE_SPEC") - , Count(count) { - AFL_VERIFY(Count); + TBuildFakeSpec() + : TBase("FAKE_SPEC") { } }; @@ -214,74 +75,6 @@ class TApplyIndexStep: public IFetchingStep { } }; -class TAllocateMemoryStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - class TColumnsPack { - private: - YDB_READONLY_DEF(TColumnsSetIds, Columns); - YDB_READONLY(EMemType, MemType, EMemType::Blob); - - public: - TColumnsPack(const TColumnsSetIds& columns, const EMemType memType) - : Columns(columns) - , MemType(memType) { - } - }; - std::vector Packs; - THashMap> Control; - const EStageFeaturesIndexes StageIndex; - const std::optional PredefinedSize; - -protected: - class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { - private: - using TBase = NGroupedMemoryManager::IAllocation; - std::weak_ptr Source; - TFetchingScriptCursor Step; - NColumnShard::TCounterGuard TasksGuard; - const EStageFeaturesIndexes StageIndex; - virtual bool DoOnAllocated(std::shared_ptr&& guard, - const std::shared_ptr& allocation) override; - virtual void DoOnAllocationImpossible(const TString& errorMessage) override; - - public: - TFetchingStepAllocation(const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step, - const EStageFeaturesIndexes stageIndex); - }; - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - virtual TString DoDebugString() const override { - return TStringBuilder() << "stage=" << StageIndex << ";"; - } - -public: - void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) { - if (!ids.GetColumnsCount()) { - return; - } - for (auto&& i : ids.GetColumnIds()) { - AFL_VERIFY(Control[i].emplace(memType).second); - } - Packs.emplace_back(ids, memType); - } - EStageFeaturesIndexes GetStage() const { - return StageIndex; - } - - TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex) - : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) - , StageIndex(stageIndex) { - AddAllocation(columns, memType); - } - - TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex) - : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) - , StageIndex(stageIndex) - , PredefinedSize(memSize) { - } -}; - class TDetectInMemStep: public IFetchingStep { private: using TBase = IFetchingStep; @@ -400,45 +193,6 @@ class TIndexBlobsFetchingStep: public IFetchingStep { } }; -class TAssemblerStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - YDB_READONLY_DEF(std::shared_ptr, Columns); - virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; - } - -public: - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - TAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) - : TBase("ASSEMBLER" + (specName ? "::" + specName : "")) - , Columns(columns) { - AFL_VERIFY(Columns); - AFL_VERIFY(Columns->GetColumnsCount()); - } -}; - -class TOptionalAssemblerStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - YDB_READONLY_DEF(std::shared_ptr, Columns); - virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; - } - -public: - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; - - virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - TOptionalAssemblerStep(const std::shared_ptr& columns, const TString& specName = Default()) - : TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : "")) - , Columns(columns) { - AFL_VERIFY(Columns); - AFL_VERIFY(Columns->GetColumnsCount()); - } -}; - class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h index 1be59d1bc6da..08ec74361360 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h @@ -45,7 +45,7 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin } public: - const TReadMetadata::TConstPtr& GetReadMetadata() const { + const NCommon::TReadMetadata::TConstPtr& GetReadMetadata() const { return SpecialReadContext->GetReadMetadata(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 9ce0b5bd0ae0..af65ff1a8fc6 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -23,26 +23,54 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr& fetch void IDataSource::StartProcessing(const std::shared_ptr& sourcePtr) { AFL_VERIFY(!ProcessingStarted); AFL_VERIFY(FetchingPlan); - AFL_VERIFY(!Context->IsAborted()); + AFL_VERIFY(!GetContext()->IsAborted()); ProcessingStarted = true; SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx); + SetMemoryGroupId(SourceGroupGuard->GetGroupId()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); TFetchingScriptCursor cursor(FetchingPlan, 0); - auto task = std::make_shared(sourcePtr, std::move(cursor), Context->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } void IDataSource::ContinueCursor(const std::shared_ptr& sourcePtr) { AFL_VERIFY(!!ScriptCursor); if (ScriptCursor->Next()) { - auto task = std::make_shared(sourcePtr, std::move(*ScriptCursor), Context->GetCommonContext()->GetScanActorId()); + auto task = std::make_shared(sourcePtr, std::move(*ScriptCursor), GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); ScriptCursor.reset(); } } +void IDataSource::DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr& sourcePtr) { + auto* plainReader = static_cast(&owner); + plainReader->MutableScanner().OnSourceReady(std::static_pointer_cast(sourcePtr), nullptr, 0, GetRecordsCount(), *plainReader); +} + +void IDataSource::DoOnEmptyStageData(const std::shared_ptr& /*sourcePtr*/) { + ResourceGuards.clear(); + Finalize({}); +} + +void IDataSource::DoBuildStageResult(const std::shared_ptr& /*sourcePtr*/) { + Finalize(NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion()); +} + +void IDataSource::Finalize(const std::optional memoryLimit) { + TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); + if (memoryLimit) { + const auto accessor = StageData->GetPortionAccessor(); + StageResult = std::make_unique(std::move(StageData)); + StageResult->SetPages(accessor.BuildReadPages(*memoryLimit, GetContext()->GetProgramInputColumns()->GetColumnIds())); + } else { + StageResult = std::make_unique(std::move(StageData)); + StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) }); + } + StageData.reset(); +} + void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, TBlobsAction& blobsAction, THashMap& defaultBlocks, const std::shared_ptr& filter) { const NArrow::TColumnFilter& cFilter = filter ? *filter : NArrow::TColumnFilter::BuildAllowFilter(); @@ -76,7 +104,7 @@ void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, TBlob } bool TPortionDataSource::DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName()); AFL_VERIFY(columns.GetColumnsCount()); AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter()); @@ -130,7 +158,8 @@ bool TPortionDataSource::DoStartFetchingIndexes( return false; } - auto constructor = std::make_shared(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = std::make_shared( + readingActions, std::static_pointer_cast(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 06df08c8bfb2..1227b7ddfd0f 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,7 @@ class TFetchingInterval; class TPlainReadData; class IFetchTaskConstructor; class IFetchingStep; +class TBuildFakeSpec; class TPortionPage { private: @@ -88,55 +90,38 @@ class TReplaceKeyAdapter { } }; -class IDataSource: public ICursorEntity { +class IDataSource: public NCommon::IDataSource { private: - YDB_READONLY(ui32, SourceId, 0); - YDB_READONLY(ui32, SourceIdx, 0); + using TBase = NCommon::IDataSource; const TReplaceKeyAdapter Start; const TReplaceKeyAdapter Finish; - YDB_READONLY_DEF(std::shared_ptr, Context); - YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero()); - YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); - YDB_READONLY(ui32, RecordsCount, 0); - YDB_READONLY_DEF(std::optional, ShardingVersionOptional); - YDB_READONLY(bool, HasDeletions, false); virtual NJson::TJsonValue DoDebugJson() const = 0; std::shared_ptr FetchingPlan; - YDB_READONLY_DEF(std::vector>, ResourceGuards); YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage); YDB_ACCESSOR(ui32, ResultRecordsCount, 0); bool ProcessingStarted = false; bool IsStartedByCursor = false; - - virtual ui64 DoGetEntityId() const override { - return SourceId; - } - - virtual ui64 DoGetEntityRecordsCount() const override { - return RecordsCount; - } + friend class TBuildFakeSpec; std::optional ScriptCursor; std::shared_ptr SourceGroupGuard; + virtual void DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std::shared_ptr& sourcePtr) override; + virtual void DoBuildStageResult(const std::shared_ptr& /*sourcePtr*/) override; + virtual void DoOnEmptyStageData(const std::shared_ptr& /*sourcePtr*/) override; + + void Finalize(const std::optional memoryLimit); + protected: std::optional UsedRawBytes; - std::optional IsSourceInMemoryFlag; - std::unique_ptr StageData; - std::unique_ptr StageResult; - - virtual bool DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) = 0; virtual bool DoStartFetchingIndexes( const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) = 0; - virtual void DoAssembleColumns(const std::shared_ptr& columns, const bool sequential) = 0; virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; virtual NJson::TJsonValue DoDebugJsonForMemory() const { return NJson::JSON_MAP; } - virtual bool DoAddTxConflict() = 0; virtual bool DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) = 0; public: @@ -224,88 +209,13 @@ class IDataSource: public ICursorEntity { return DoStartFetchingAccessor(sourcePtr, step); } - bool AddTxConflict() { - if (!Context->GetCommonContext()->HasLock()) { - return false; - } - if (DoAddTxConflict()) { - StageData->Clear(); - return true; - } - return false; - } - - ui64 GetResourceGuardsMemory() const { - ui64 result = 0; - for (auto&& i : ResourceGuards) { - result += i->GetMemory(); - } - return result; - } - void RegisterAllocationGuard(const std::shared_ptr& guard) { - ResourceGuards.emplace_back(guard); - } - bool IsSourceInMemory() const { - AFL_VERIFY(IsSourceInMemoryFlag); - return *IsSourceInMemoryFlag; - } - void SetSourceInMemory(const bool value) { - AFL_VERIFY(!IsSourceInMemoryFlag); - IsSourceInMemoryFlag = value; - AFL_VERIFY(StageData); - if (!value) { - StageData->SetUseFilter(value); - } - } - virtual THashMap DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobsOriginal) const = 0; - virtual ui64 GetPathId() const = 0; virtual bool HasIndexes(const std::set& indexIds) const = 0; - bool HasStageResult() const { - return !!StageResult; - } - - const TFetchedResult& GetStageResult() const { - AFL_VERIFY(!!StageResult); - return *StageResult; - } - - TFetchedResult& MutableStageResult() { - AFL_VERIFY(!!StageResult); - return *StageResult; - } - - void Finalize(const std::optional memoryLimit) { - AFL_VERIFY(!StageResult); - AFL_VERIFY(StageData); - TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - - if (memoryLimit) { - const auto accessor = StageData->GetPortionAccessor(); - StageResult = std::make_unique(std::move(StageData)); - StageResult->SetPages(accessor.BuildReadPages(*memoryLimit, GetContext()->GetProgramInputColumns()->GetColumnIds())); - } else { - StageResult = std::make_unique(std::move(StageData)); - StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) }); - } - } - void ApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) { return DoApplyIndex(indexMeta); } - void AssembleColumns(const std::shared_ptr& columns, const bool sequential = false) { - if (columns->IsEmpty()) { - return; - } - DoAssembleColumns(columns, sequential); - } - - bool StartFetchingColumns(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) { - return DoStartFetchingColumns(sourcePtr, step, columns); - } - bool StartFetchingIndexes( const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) { AFL_VERIFY(indexes); @@ -313,11 +223,7 @@ class IDataSource: public ICursorEntity { } void InitFetchingPlan(const std::shared_ptr& fetching); - virtual ui64 GetColumnsVolume(const std::set& columnIds, const EMemType type) const = 0; - - virtual ui64 GetColumnRawBytes(const std::set& columnIds) const = 0; virtual ui64 GetIndexRawBytes(const std::set& indexIds) const = 0; - virtual ui64 GetColumnBlobBytes(const std::set& columnsIds) const = 0; void Abort() { DoAbort(); @@ -326,13 +232,14 @@ class IDataSource: public ICursorEntity { NJson::TJsonValue DebugJsonForMemory() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("details", DoDebugJsonForMemory()); - result.InsertValue("count", RecordsCount); + result.InsertValue("count", GetRecordsCount()); return result; } NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue("source_idx", SourceIdx); + result.InsertValue("source_id", GetSourceId()); + result.InsertValue("source_idx", GetSourceIdx()); result.InsertValue("start", Start.DebugString()); result.InsertValue("finish", Finish.DebugString()); result.InsertValue("specific", DoDebugJson()); @@ -341,40 +248,14 @@ class IDataSource: public ICursorEntity { bool OnIntervalFinished(const ui32 intervalIdx); - void OnEmptyStageData() { - ResourceGuards.clear(); - Finalize(std::nullopt); - } - - bool HasStageData() const { - return !!StageData; - } - - const TFetchedData& GetStageData() const { - AFL_VERIFY(StageData); - return *StageData; - } - - TFetchedData& MutableStageData() { - AFL_VERIFY(StageData); - return *StageData; - } - - IDataSource(const ui32 sourceId, const ui32 sourceIdx, const std::shared_ptr& context, const NArrow::TReplaceKey& start, + IDataSource(const ui64 sourceId, const ui32 sourceIdx, const std::shared_ptr& context, const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMin, const TSnapshot& recordSnapshotMax, const ui32 recordsCount, const std::optional shardingVersion, const bool hasDeletions) - : SourceId(sourceId) - , SourceIdx(sourceIdx) + : TBase(sourceId, sourceIdx, context, recordSnapshotMin, recordSnapshotMax, recordsCount, shardingVersion, hasDeletions) , Start(context->GetReadMetadata()->IsDescSorted() ? finish : start, context->GetReadMetadata()->IsDescSorted()) - , Finish(context->GetReadMetadata()->IsDescSorted() ? start : finish, context->GetReadMetadata()->IsDescSorted()) - , Context(context) - , RecordSnapshotMin(recordSnapshotMin) - , RecordSnapshotMax(recordSnapshotMax) - , RecordsCount(recordsCount) - , ShardingVersionOptional(shardingVersion) - , HasDeletions(hasDeletions) { + , Finish(context->GetReadMetadata()->IsDescSorted() ? start : finish, context->GetReadMetadata()->IsDescSorted()) { StageData = std::make_unique(true); - UsageClass = Context->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(start, finish); + UsageClass = GetContext()->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(start, finish); AFL_VERIFY(UsageClass != TPKRangeFilter::EUsageClass::DontUsage); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugString())( "finish", Finish.DebugString()); @@ -400,7 +281,7 @@ class TPortionDataSource: public IDataSource { virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) override; virtual bool DoStartFetchingColumns( - const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override; virtual bool DoStartFetchingIndexes( const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& indexes) override; virtual void DoAssembleColumns(const std::shared_ptr& columns, const bool sequential) override; @@ -489,7 +370,6 @@ class TPortionDataSource: public IDataSource { } virtual ui64 GetIndexRawBytes(const std::set& indexIds) const override { - return Portion->GetTotalRawBytes(); return GetStageData().GetPortionAccessor().GetIndexRawBytes(indexIds, false); }