diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index d21354297ee1..e350fbb05719 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -51,13 +51,16 @@ TSelectInfo::TStats TSelectInfo::Stats() const { return out; } -void TSelectInfo::DebugStream(IOutputStream& out) { +TString TSelectInfo::DebugString() const { + TStringBuilder result; + result << "count:" << PortionsOrderedPK.size() << ";"; if (PortionsOrderedPK.size()) { - out << "portions:"; + result << "portions:"; for (auto& portionInfo : PortionsOrderedPK) { - out << portionInfo->DebugString(); + result << portionInfo->DebugString(); } } + return result; } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index d628f1dd2373..2d68345b45e3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -49,7 +49,7 @@ struct TSelectInfo { TStats Stats() const; - void DebugStream(IOutputStream& out); + TString DebugString() const; }; class TColumnEngineStats { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h index 5d1a684e0217..49a50c9b74f1 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h @@ -30,7 +30,7 @@ class TDataStorageAccessor { }; // Holds all metadata that is needed to perform read/scan -struct TReadMetadataBase { +class TReadMetadataBase { public: enum class ESorting { NONE = 0 /* "not_sorted" */, @@ -153,8 +153,8 @@ struct TReadMetadataBase { ui64 Limit = 0; - virtual void Dump(IOutputStream& out) const { - out << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}" + virtual TString DebugString() const { + return TStringBuilder() << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}" << " " << Sorting << " sorted"; } @@ -179,12 +179,6 @@ struct TReadMetadataBase { virtual std::unique_ptr StartScan(const std::shared_ptr& readContext) const = 0; virtual std::vector GetKeyYqlSchema() const = 0; - // TODO: can this only be done for base class? - friend IOutputStream& operator<<(IOutputStream& out, const TReadMetadataBase& meta) { - meta.Dump(out); - return out; - } - const TProgramContainer& GetProgram() const { return Program; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp new file mode 100644 index 000000000000..ec712ef066c0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -0,0 +1,121 @@ +#include "read_metadata.h" + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +TConclusionStatus TReadMetadata::Init( + const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) { + SetPKRangesFilter(readDescription.PKRangesFilter); + InitShardingInfo(readDescription.PathId); + TxId = readDescription.TxId; + LockId = readDescription.LockId; + if (LockId) { + owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation()); + LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo(); + } + + SelectInfo = dataAccessor.Select(readDescription, !!LockId); + if (LockId) { + for (auto&& i : SelectInfo->PortionsOrderedPK) { + if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) { + if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) { + } else { + auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified()); + AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId()); + } + } + } + } + + { + auto customConclusion = DoInitCustom(owner, readDescription, dataAccessor); + if (customConclusion.IsFail()) { + return customConclusion; + } + } + + StatsMode = readDescription.StatsMode; + return TConclusionStatus::Success(); +} + +std::set TReadMetadata::GetEarlyFilterColumnIds() const { + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + std::set result; + for (auto&& i : GetProgram().GetEarlyFilterColumns()) { + auto id = indexInfo.GetColumnIdOptional(i); + if (id) { + result.emplace(*id); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); + } + } + return result; +} + +std::set TReadMetadata::GetPKColumnIds() const { + std::set result; + auto& indexInfo = ResultIndexSchema->GetIndexInfo(); + for (auto&& i : indexInfo.GetPrimaryKeyColumns()) { + Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second); + } + return result; +} + +NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const { + return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted()); +} + +void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const { + if (!GetLockId()) { + return; + } + const ui64 lock = *GetLockId(); + if (GetBrokenWithCommitted()) { + owner.GetOperationsManager().GetLockVerified(lock).SetBroken(); + } else { + NOlap::NTxInteractions::TTxConflicts conflicts; + for (auto&& i : GetConflictableLockIds()) { + conflicts.Add(i, lock); + } + auto writer = std::make_shared(PathId, conflicts); + owner.GetOperationsManager().AddEventForLock(owner, lock, writer); + } +} + +void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const { + if (!LockId) { + return; + } + auto evWriter = std::make_shared( + PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds()); + owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter); +} + +void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const { + if (LockSharingInfo) { + NKikimrDataEvents::TLock lockInfo; + lockInfo.SetLockId(LockSharingInfo->GetLockId()); + lockInfo.SetGeneration(LockSharingInfo->GetGeneration()); + lockInfo.SetDataShard(tabletId); + lockInfo.SetCounter(LockSharingInfo->GetCounter()); + lockInfo.SetPathId(PathId); + lockInfo.SetHasWrites(LockSharingInfo->HasWrites()); + if (LockSharingInfo->IsBroken()) { + scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo)); + } else { + scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo)); + } + } +} + +bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { + AFL_VERIFY(LockSharingInfo); + auto it = ConflictedWriteIds.find(writeId); + AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size()); + return it->second.GetLockId() == LockSharingInfo->GetLockId(); +} + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h new file mode 100644 index 000000000000..df07febacea0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h @@ -0,0 +1,166 @@ +#pragma once +#include +#include +#include +#include + +#include + +namespace NKikimr::NColumnShard { +class TLockSharingInfo; +} + +namespace NKikimr::NOlap::NReader::NCommon { + +class TReadMetadata: public TReadMetadataBase { + using TBase = TReadMetadataBase; + +private: + const ui64 PathId; + std::shared_ptr BrokenWithCommitted = std::make_shared(); + std::shared_ptr LockSharingInfo; + + class TWriteIdInfo { + private: + const ui64 LockId; + std::shared_ptr Conflicts; + + public: + TWriteIdInfo(const ui64 lockId, const std::shared_ptr& counter) + : LockId(lockId) + , Conflicts(counter) { + } + + ui64 GetLockId() const { + return LockId; + } + + void MarkAsConflictable() const { + Conflicts->Inc(); + } + + bool IsConflictable() const { + return Conflicts->Val(); + } + }; + + THashMap> LockConflictCounters; + THashMap ConflictedWriteIds; + + virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override; + virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override; + virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override; + + virtual TConclusionStatus DoInitCustom( + const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) = 0; + +public: + using TConstPtr = std::shared_ptr; + + bool GetBrokenWithCommitted() const { + return BrokenWithCommitted->Val(); + } + THashSet GetConflictableLockIds() const { + THashSet result; + for (auto&& i : ConflictedWriteIds) { + if (i.second.IsConflictable()) { + result.emplace(i.second.GetLockId()); + } + } + return result; + } + + bool IsLockConflictable(const ui64 lockId) const { + auto it = LockConflictCounters.find(lockId); + AFL_VERIFY(it != LockConflictCounters.end()); + return it->second->Val(); + } + + bool IsWriteConflictable(const TInsertWriteId writeId) const { + auto it = ConflictedWriteIds.find(writeId); + AFL_VERIFY(it != ConflictedWriteIds.end()); + return it->second.IsConflictable(); + } + + void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) { + auto it = LockConflictCounters.find(lockId); + if (it == LockConflictCounters.end()) { + it = LockConflictCounters.emplace(lockId, std::make_shared()).first; + } + AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second); + } + + [[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const; + + void SetConflictedWriteId(const TInsertWriteId writeId) const { + auto it = ConflictedWriteIds.find(writeId); + AFL_VERIFY(it != ConflictedWriteIds.end()); + it->second.MarkAsConflictable(); + } + + void SetBrokenWithCommitted() const { + BrokenWithCommitted->Inc(); + } + + NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const; + virtual std::shared_ptr BuildReader(const std::shared_ptr& context) const = 0; + + bool HasProcessingColumnIds() const { + return GetProgram().HasProcessingColumnIds(); + } + + ui64 GetPathId() const { + return PathId; + } + + std::shared_ptr SelectInfo; + NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; + std::shared_ptr ReadStats; + + TReadMetadata(const ui64 pathId, const std::shared_ptr info, const TSnapshot& snapshot, const ESorting sorting, + const TProgramContainer& ssaProgram, const std::shared_ptr& scanCursor) + : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor) + , PathId(pathId) + , ReadStats(std::make_shared()) { + } + + virtual std::vector GetKeyYqlSchema() const override { + return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns(); + } + + TConclusionStatus Init( + const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor); + + std::vector GetColumnsOrder() const { + auto schema = GetResultSchema(); + std::vector result; + for (auto&& i : schema->GetSchema()->fields()) { + result.emplace_back(i->name()); + } + return result; + } + + std::set GetEarlyFilterColumnIds() const; + std::set GetPKColumnIds() const; + + virtual bool Empty() const = 0; + + size_t NumIndexedBlobs() const { + Y_ABORT_UNLESS(SelectInfo); + return SelectInfo->Stats().Blobs; + } + + virtual TString DebugString() const override { + TStringBuilder result; + + result << TBase::DebugString() << ";" << " index blobs: " << NumIndexedBlobs() << " committed blobs: " + << " at snapshot: " << GetRequestSnapshot().DebugString(); + + if (SelectInfo) { + result << ", " << SelectInfo->DebugString(); + } + return result; + } +}; + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/ya.make new file mode 100644 index 000000000000..180dc0be1044 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + read_metadata.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/reader/abstract + ydb/core/kqp/compute_actor +) + +END() diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp new file mode 100644 index 000000000000..5ea51192550b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp @@ -0,0 +1,114 @@ +#include "context.h" + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) + : CommonContext(commonContext) { + auto readMetadata = CommonContext->GetReadMetadataPtrVerifiedAs(); + Y_ABORT_UNLESS(readMetadata->SelectInfo); + + double kffAccessors = 0.01; + double kffFilter = 0.45; + double kffFetching = 0.45; + double kffMerge = 0.10; + TString stagePrefix; + if (readMetadata->GetEarlyFilterColumnIds().size()) { + stagePrefix = "EF"; + kffFilter = 0.7; + kffFetching = 0.15; + kffMerge = 0.14; + kffAccessors = 0.01; + } else { + stagePrefix = "FO"; + kffFilter = 0.1; + kffFetching = 0.75; + kffMerge = 0.14; + kffAccessors = 0.01; + } + + std::vector> stages = { + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( + stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit), + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( + stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit), + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( + stagePrefix + "::FETCHING", kffFetching * TGlobalLimits::ScanMemoryLimit), + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) + }; + ProcessMemoryGuard = + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); + ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( + CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); + + auto readSchema = readMetadata->GetResultSchema(); + SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); + IndexChecker = readMetadata->GetProgram().GetIndexChecker(); + { + auto predicateColumns = readMetadata->GetPKRangesFilter().GetColumnIds(readMetadata->GetIndexInfo()); + if (predicateColumns.size()) { + PredicateColumns = std::make_shared(predicateColumns, readSchema); + } else { + PredicateColumns = std::make_shared(); + } + } + { + std::set columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX }; + DeletionColumns = std::make_shared(columnIds, readMetadata->GetResultSchema()); + } + + if (!!readMetadata->GetRequestShardingInfo()) { + auto shardingColumnIds = + readMetadata->GetIndexInfo().GetColumnIdsVerified(readMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames()); + ShardingColumns = std::make_shared(shardingColumnIds, readMetadata->GetResultSchema()); + } else { + ShardingColumns = std::make_shared(); + } + { + auto efColumns = readMetadata->GetEarlyFilterColumnIds(); + if (efColumns.size()) { + EFColumns = std::make_shared(efColumns, readSchema); + } else { + EFColumns = std::make_shared(); + } + } + if (readMetadata->HasProcessingColumnIds()) { + FFColumns = std::make_shared(readMetadata->GetProcessingColumnIds(), readSchema); + if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { + FFColumns = std::make_shared(*EFColumns + *SpecColumns); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); + } else { + AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString()); + } + } else { + FFColumns = EFColumns; + } + if (FFColumns->IsEmpty()) { + ProgramInputColumns = SpecColumns; + } else { + ProgramInputColumns = FFColumns; + } + AllUsageColumns = std::make_shared(*FFColumns + *PredicateColumns); + + PKColumns = std::make_shared(readMetadata->GetPKColumnIds(), readSchema); + MergeColumns = std::make_shared(*PKColumns + *SpecColumns); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); +} + +TString TSpecialReadContext::DebugString() const { + TStringBuilder sb; + sb << "ef=" << EFColumns->DebugString() << ";" + << "sharding=" << ShardingColumns->DebugString() << ";" + << "pk=" << PKColumns->DebugString() << ";" + << "ff=" << FFColumns->DebugString() << ";" + << "program_input=" << ProgramInputColumns->DebugString() << ";"; + return sb; +} + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h new file mode 100644 index 000000000000..fa24798d4e66 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -0,0 +1,61 @@ +#pragma once +#include "columns_set.h" + +#include +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +class TSpecialReadContext { +private: + YDB_READONLY_DEF(std::shared_ptr, CommonContext); + YDB_READONLY_DEF(std::shared_ptr, ProcessMemoryGuard); + YDB_READONLY_DEF(std::shared_ptr, ProcessScopeGuard); + + YDB_READONLY_DEF(std::shared_ptr, SpecColumns); + YDB_READONLY_DEF(std::shared_ptr, MergeColumns); + YDB_READONLY_DEF(std::shared_ptr, ShardingColumns); + YDB_READONLY_DEF(std::shared_ptr, DeletionColumns); + YDB_READONLY_DEF(std::shared_ptr, EFColumns); + YDB_READONLY_DEF(std::shared_ptr, PredicateColumns); + YDB_READONLY_DEF(std::shared_ptr, PKColumns); + YDB_READONLY_DEF(std::shared_ptr, AllUsageColumns); + YDB_READONLY_DEF(std::shared_ptr, FFColumns); + YDB_READONLY_DEF(std::shared_ptr, ProgramInputColumns); + + YDB_READONLY_DEF(std::shared_ptr, MergeStageMemory); + YDB_READONLY_DEF(std::shared_ptr, FilterStageMemory); + YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); + + TAtomic AbortFlag = 0; + +protected: + NIndexes::TIndexCheckerContainer IndexChecker; + std::shared_ptr EmptyColumns = std::make_shared(); + +public: + ui64 GetProcessMemoryControlId() const { + AFL_VERIFY(ProcessMemoryGuard); + return ProcessMemoryGuard->GetProcessId(); + } + + bool IsAborted() const { + return AtomicGet(AbortFlag); + } + + void Abort() { + AtomicSet(AbortFlag, 1); + } + + virtual ~TSpecialReadContext() { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("fetching", DebugString()); + } + + TString DebugString() const; + virtual TString ProfileDebugString() const = 0; + + TSpecialReadContext(const std::shared_ptr& commonContext); +}; + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp new file mode 100644 index 000000000000..de8bf3830758 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.cpp @@ -0,0 +1,49 @@ +#include "iterator.h" + +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr& context, const TReadMetadata::TConstPtr& readMetadata) + : Context(context) + , ReadMetadata(readMetadata) + , ReadyResults(context->GetCounters()) { + IndexedData = readMetadata->BuildReader(Context); + Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted()); +} + +TConclusion> TColumnShardScanIterator::GetBatch() { + FillReadyResults(); + return ReadyResults.pop_front(); +} + +void TColumnShardScanIterator::PrepareResults() { + FillReadyResults(); +} + +TConclusion TColumnShardScanIterator::ReadNextInterval() { + return IndexedData->ReadNextInterval(); +} + +void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const { + return IndexedData->OnSentDataFromInterval(intervalIdx); +} + +TColumnShardScanIterator::~TColumnShardScanIterator() { + if (!IndexedData->IsFinished()) { + IndexedData->Abort("iterator destructor"); + } + ReadMetadata->ReadStats->PrintToLog(); +} + +void TColumnShardScanIterator::Apply(const std::shared_ptr& task) { + if (!IndexedData->IsFinished()) { + Y_ABORT_UNLESS(task->Apply(*IndexedData)); + } +} + +const TReadStats& TColumnShardScanIterator::GetStats() const { + return *ReadMetadata->ReadStats; +} + +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h new file mode 100644 index 000000000000..5de306cab085 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/iterator.h @@ -0,0 +1,105 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { + +class TReadMetadata; + +class TReadyResults { +private: + const NColumnShard::TConcreteScanCounters Counters; + std::deque> Data; + i64 RecordsCount = 0; +public: + TString DebugString() const { + TStringBuilder sb; + sb + << "count:" << Data.size() << ";" + << "records_count:" << RecordsCount << ";" + ; + if (Data.size()) { + sb << "schema=" << Data.front()->GetResultBatch().schema()->ToString() << ";"; + } + return sb; + } + TReadyResults(const NColumnShard::TConcreteScanCounters& counters) + : Counters(counters) + { + + } + const std::shared_ptr& emplace_back(std::shared_ptr&& v) { + AFL_VERIFY(!!v); + RecordsCount += v->GetResultBatch().num_rows(); + Data.emplace_back(std::move(v)); + return Data.back(); + } + std::shared_ptr pop_front() { + if (Data.empty()) { + return {}; + } + auto result = std::move(Data.front()); + AFL_VERIFY(RecordsCount >= result->GetResultBatch().num_rows()); + RecordsCount -= result->GetResultBatch().num_rows(); + Data.pop_front(); + return result; + } + bool empty() const { + return Data.empty(); + } + size_t size() const { + return Data.size(); + } +}; + +class TColumnShardScanIterator: public TScanIteratorBase { +private: + virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override; + +protected: + ui64 ItemsRead = 0; + const i64 MaxRowsInBatch = 5000; + std::shared_ptr Context; + std::shared_ptr ReadMetadata; + TReadyResults ReadyResults; + std::shared_ptr IndexedData; + +public: + TColumnShardScanIterator(const std::shared_ptr& context, const std::shared_ptr& readMetadata); + ~TColumnShardScanIterator(); + + virtual TConclusionStatus Start() override { + AFL_VERIFY(IndexedData); + return IndexedData->Start(); + } + + virtual std::optional GetAvailableResultsCount() const override { + return ReadyResults.size(); + } + + virtual const TReadStats& GetStats() const override; + + virtual TString DebugString(const bool verbose) const override { + return TStringBuilder() + << "ready_results:(" << ReadyResults.DebugString() << ");" + << "indexed_data:(" << IndexedData->DebugString(verbose) << ")" + ; + } + + virtual void Apply(const std::shared_ptr& task) override; + + bool Finished() const override { + return IndexedData->IsFinished() && ReadyResults.empty(); + } + + virtual TConclusion> GetBatch() override; + virtual void PrepareResults() override; + + virtual TConclusion ReadNextInterval() override; + +private: + virtual void FillReadyResults() = 0; +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 2e977214ce03..0633fc216232 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -3,6 +3,8 @@ LIBRARY() SRCS( fetched_data.cpp columns_set.cpp + iterator.cpp + context.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/ya.make index b5d696e401b3..d974b7efac13 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/ya.make @@ -5,6 +5,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/reader/common_reader/iterator + ydb/core/tx/columnshard/engines/reader/common_reader/constructor ) END() diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index 5623d1e4dc3a..8ac0322909b8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -1,11 +1,8 @@ #include "read_metadata.h" -#include #include #include #include -#include -#include namespace NKikimr::NOlap::NReader::NPlain { @@ -13,22 +10,8 @@ std::unique_ptr TReadMetadata::StartScan(const std::shared_pt return std::make_unique(readContext, readContext->GetReadMetadataPtrVerifiedAs()); } -TConclusionStatus TReadMetadata::Init( +TConclusionStatus TReadMetadata::DoInitCustom( const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) { - SetPKRangesFilter(readDescription.PKRangesFilter); - InitShardingInfo(readDescription.PathId); - TxId = readDescription.TxId; - LockId = readDescription.LockId; - if (LockId) { - owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation()); - LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo(); - } - - /// @note We could have column name changes between schema versions: - /// Add '1:foo', Drop '1:foo', Add '2:foo'. Drop should hide '1:foo' from reads. - /// It's expected that we have only one version on 'foo' in blob and could split them by schema {planStep:txId}. - /// So '1:foo' would be omitted in blob records for the column in new snapshots. And '2:foo' - in old ones. - /// It's not possible for blobs with several columns. There should be a special logic for them. CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey(), LockId, GetRequestSnapshot()); @@ -44,101 +27,11 @@ TConclusionStatus TReadMetadata::Init( } } - SelectInfo = dataAccessor.Select(readDescription, !!LockId); - if (LockId) { - for (auto&& i : SelectInfo->PortionsOrderedPK) { - if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) { - if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) { - } else { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified()); - AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId()); - } - } - } - } - - StatsMode = readDescription.StatsMode; return TConclusionStatus::Success(); } -std::set TReadMetadata::GetEarlyFilterColumnIds() const { - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - std::set result; - for (auto&& i : GetProgram().GetEarlyFilterColumns()) { - auto id = indexInfo.GetColumnIdOptional(i); - if (id) { - result.emplace(*id); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } - } - return result; -} - -std::set TReadMetadata::GetPKColumnIds() const { - std::set result; - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - for (auto&& i : indexInfo.GetPrimaryKeyColumns()) { - Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second); - } - return result; -} - std::shared_ptr TReadMetadata::BuildReader(const std::shared_ptr& context) const { return std::make_shared(context); } -NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const { - return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted()); -} - -void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const { - if (!GetLockId()) { - return; - } - const ui64 lock = *GetLockId(); - if (GetBrokenWithCommitted()) { - owner.GetOperationsManager().GetLockVerified(lock).SetBroken(); - } else { - NOlap::NTxInteractions::TTxConflicts conflicts; - for (auto&& i : GetConflictableLockIds()) { - conflicts.Add(i, lock); - } - auto writer = std::make_shared(PathId, conflicts); - owner.GetOperationsManager().AddEventForLock(owner, lock, writer); - } -} - -void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const { - if (!LockId) { - return; - } - auto evWriter = std::make_shared( - PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds()); - owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter); -} - -void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const { - if (LockSharingInfo) { - NKikimrDataEvents::TLock lockInfo; - lockInfo.SetLockId(LockSharingInfo->GetLockId()); - lockInfo.SetGeneration(LockSharingInfo->GetGeneration()); - lockInfo.SetDataShard(tabletId); - lockInfo.SetCounter(LockSharingInfo->GetCounter()); - lockInfo.SetPathId(PathId); - lockInfo.SetHasWrites(LockSharingInfo->HasWrites()); - if (LockSharingInfo->IsBroken()) { - scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo)); - } else { - scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo)); - } - } -} - -bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { - AFL_VERIFY(LockSharingInfo); - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size()); - return it->second.GetLockId() == LockSharingInfo->GetLockId(); -} - } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h index 34ef6496fd69..17f56ef0ff33 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h @@ -1,9 +1,5 @@ #pragma once -#include -#include -#include -#include -#include +#include namespace NKikimr::NColumnShard { class TLockSharingInfo; @@ -12,162 +8,28 @@ class TLockSharingInfo; namespace NKikimr::NOlap::NReader::NPlain { // Holds all metadata that is needed to perform read/scan -struct TReadMetadata : public TReadMetadataBase { - using TBase = TReadMetadataBase; - +class TReadMetadata: public NCommon::TReadMetadata { private: - const ui64 PathId; - std::shared_ptr BrokenWithCommitted = std::make_shared(); - std::shared_ptr LockSharingInfo; - - class TWriteIdInfo { - private: - const ui64 LockId; - std::shared_ptr Conflicts; - - public: - TWriteIdInfo(const ui64 lockId, const std::shared_ptr& counter) - : LockId(lockId) - , Conflicts(counter) { - } - - ui64 GetLockId() const { - return LockId; - } - - void MarkAsConflictable() const { - Conflicts->Inc(); - } - - bool IsConflictable() const { - return Conflicts->Val(); - } - }; - - THashMap> LockConflictCounters; - THashMap ConflictedWriteIds; - - virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override; - virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override; - virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override; + using TBase = NCommon::TReadMetadata; + virtual TConclusionStatus DoInitCustom( + const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) override; public: using TConstPtr = std::shared_ptr; + using TBase::TBase; - bool GetBrokenWithCommitted() const { - return BrokenWithCommitted->Val(); - } - THashSet GetConflictableLockIds() const { - THashSet result; - for (auto&& i : ConflictedWriteIds) { - if (i.second.IsConflictable()) { - result.emplace(i.second.GetLockId()); - } - } - return result; - } - - bool IsLockConflictable(const ui64 lockId) const { - auto it = LockConflictCounters.find(lockId); - AFL_VERIFY(it != LockConflictCounters.end()); - return it->second->Val(); - } - - bool IsWriteConflictable(const TInsertWriteId writeId) const { - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); - return it->second.IsConflictable(); - } - - void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) { - auto it = LockConflictCounters.find(lockId); - if (it == LockConflictCounters.end()) { - it = LockConflictCounters.emplace(lockId, std::make_shared()).first; - } - AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second); - } - - [[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const; - - void SetConflictedWriteId(const TInsertWriteId writeId) const { - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); - it->second.MarkAsConflictable(); - } - - void SetBrokenWithCommitted() const { - BrokenWithCommitted->Inc(); - } - - NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const; - std::shared_ptr BuildReader(const std::shared_ptr& context) const; - - bool HasProcessingColumnIds() const { - return GetProgram().HasProcessingColumnIds(); - } - - ui64 GetPathId() const { - return PathId; - } - - std::shared_ptr SelectInfo; - NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; std::vector CommittedBlobs; - std::shared_ptr ReadStats; - - TReadMetadata(const ui64 pathId, const std::shared_ptr info, const TSnapshot& snapshot, const ESorting sorting, - const TProgramContainer& ssaProgram, const std::shared_ptr& scanCursor) - : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor) - , PathId(pathId) - , ReadStats(std::make_shared()) { - } - - virtual std::vector GetKeyYqlSchema() const override { - return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns(); - } - - TConclusionStatus Init(const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor); - - std::vector GetColumnsOrder() const { - auto schema = GetResultSchema(); - std::vector result; - for (auto&& i : schema->GetSchema()->fields()) { - result.emplace_back(i->name()); - } - return result; - } - - std::set GetEarlyFilterColumnIds() const; - std::set GetPKColumnIds() const; - - bool Empty() const { + virtual bool Empty() const override { Y_ABORT_UNLESS(SelectInfo); return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty(); } - size_t NumIndexedBlobs() const { - Y_ABORT_UNLESS(SelectInfo); - return SelectInfo->Stats().Blobs; - } - - std::unique_ptr StartScan(const std::shared_ptr& readContext) const override; + virtual std::shared_ptr BuildReader(const std::shared_ptr& context) const override; + virtual std::unique_ptr StartScan(const std::shared_ptr& readContext) const override; - void Dump(IOutputStream& out) const override { - out << " index blobs: " << NumIndexedBlobs() - << " committed blobs: " << CommittedBlobs.size() - // << " with program steps: " << (Program ? Program->Steps.size() : 0) - << " at snapshot: " << GetRequestSnapshot().DebugString(); - TBase::Dump(out); - if (SelectInfo) { - out << ", "; - SelectInfo->DebugStream(out); - } - } - - friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) { - meta.Dump(out); - return out; + virtual TString DebugString() const override { + return TBase::DebugString() + ";committed=" + ::ToString(CommittedBlobs.size()); } }; -} +} // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make index 883f2b6b8e33..165408de6d67 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/reader/abstract + ydb/core/tx/columnshard/engines/reader/common_reader/constructor ydb/core/kqp/compute_actor ) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index ff9d97ad7303..9c90c8da1689 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NReader::NPlain { std::unique_ptr TSpecialReadContext::BuildMerger() const { return std::make_unique( - ReadMetadata->GetReplaceKey(), ProgramInputColumns->GetSchema(), CommonContext->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); + ReadMetadata->GetReplaceKey(), GetProgramInputColumns()->GetSchema(), GetCommonContext()->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); } ui64 TSpecialReadContext::GetMemoryForSources(const THashMap>& sources) { @@ -31,7 +31,7 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); } AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*FFColumns); + AskAccumulatorsScript->AddStep(*GetFFColumns()); } return AskAccumulatorsScript; } @@ -113,80 +113,80 @@ class TColumnsAccumulator { const bool sequential) { auto actualColumns = columns - AssemblerReadyColumns; AssemblerReadyColumns = AssemblerReadyColumns + columns; - if (!actualColumns.IsEmpty()) { - auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); - if (sequential) { - const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); - if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); - std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); - *actualSet = *actualSet - *cross; - } - if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); - } - } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); + if (actualColumns.IsEmpty()) { + return false; + } + auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); + if (sequential) { + const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); + if (notSequentialColumnIds.size()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); + std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); + script.AddStep(cross, purposeId); + *actualSet = *actualSet - *cross; } - return true; + if (!actualSet->IsEmpty()) { + script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); + script.AddStep(actualSet, purposeId); + } + } else { + script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); + script.AddStep(actualSet, purposeId); } - return false; + return true; } }; std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { std::shared_ptr result = std::make_shared(*this); - const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount(); + const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - TColumnsAccumulator acc(MergeColumns, ReadMetadata->GetResultSchema()); + TColumnsAccumulator acc(GetMergeColumns(), ReadMetadata->GetResultSchema()); if (!!IndexChecker && useIndexes && exclusiveSource) { result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); result->AddStep(std::make_shared(IndexChecker)); } bool hasFilterSharding = false; - if (needFilterSharding && !ShardingColumns->IsEmpty()) { + if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { hasFilterSharding = true; - TColumnsSetIds columnsFetch = *ShardingColumns; + TColumnsSetIds columnsFetch = *GetShardingColumns(); if (!exclusiveSource) { - columnsFetch = columnsFetch + *PKColumns + *SpecColumns; + columnsFetch = columnsFetch + *GetPKColumns() + *GetSpecColumns(); } acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } - if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) { + if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) { result->SetBranchName("simple"); - TColumnsSetIds columnsFetch = *FFColumns; + TColumnsSetIds columnsFetch = *GetFFColumns(); if (needFilterDeletion) { - columnsFetch = columnsFetch + *DeletionColumns; + columnsFetch = columnsFetch + *GetDeletionColumns(); } if (needSnapshots) { - columnsFetch = columnsFetch + *SpecColumns; + columnsFetch = columnsFetch + *GetSpecColumns(); } if (!exclusiveSource) { - columnsFetch = columnsFetch + *MergeColumns; + columnsFetch = columnsFetch + *GetMergeColumns(); } else { - if (columnsFetch.GetColumnsCount() == 1 && SpecColumns->Contains(columnsFetch) && !hasFilterSharding) { + if (columnsFetch.GetColumnsCount() == 1 && GetSpecColumns()->Contains(columnsFetch) && !hasFilterSharding) { return nullptr; } } if (columnsFetch.GetColumnsCount() || hasFilterSharding || needFilterDeletion) { acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Fetching); if (needSnapshots) { - acc.AddAssembleStep(*result, *SpecColumns, "SPEC", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); } if (!exclusiveSource) { - acc.AddAssembleStep(*result, *MergeColumns, "LAST_PK", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*result, *GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); } if (needSnapshots) { result->AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *DeletionColumns, "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); result->AddStep(std::make_shared()); } acc.AddAssembleStep(*result, columnsFetch, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); @@ -195,30 +195,30 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c } } else if (exclusiveSource) { result->SetBranchName("exclusive"); - TColumnsSet columnsFetch = *EFColumns; + TColumnsSet columnsFetch = *GetEFColumns(); if (needFilterDeletion) { - columnsFetch = columnsFetch + *DeletionColumns; + columnsFetch = columnsFetch + *GetDeletionColumns(); } - if (needSnapshots || FFColumns->Cross(*SpecColumns)) { - columnsFetch = columnsFetch + *SpecColumns; + if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { + columnsFetch = columnsFetch + *GetSpecColumns(); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *PredicateColumns; + columnsFetch = columnsFetch + *GetPredicateColumns(); } AFL_VERIFY(columnsFetch.GetColumnsCount()); acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - acc.AddAssembleStep(*result, *DeletionColumns, "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *PredicateColumns, "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } - if (needSnapshots || FFColumns->Cross(*SpecColumns)) { - acc.AddAssembleStep(*result, *SpecColumns, "SPEC", EStageFeaturesIndexes::Filter, false); + if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { + acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { @@ -235,24 +235,24 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c if (GetReadMetadata()->Limit) { result->AddStep(std::make_shared(GetReadMetadata()->Limit, GetReadMetadata()->IsDescSorted())); } - acc.AddFetchingStep(*result, *FFColumns, EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *FFColumns, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } else { result->SetBranchName("merge"); - TColumnsSet columnsFetch = *MergeColumns + *EFColumns; + TColumnsSet columnsFetch = *GetMergeColumns() + *GetEFColumns(); if (needFilterDeletion) { - columnsFetch = columnsFetch + *DeletionColumns; + columnsFetch = columnsFetch + *GetDeletionColumns(); } AFL_VERIFY(columnsFetch.GetColumnsCount()); acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(*result, *SpecColumns, "SPEC", EStageFeaturesIndexes::Filter, false); - acc.AddAssembleStep(*result, *PKColumns, "PK", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); if (needSnapshots) { result->AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *DeletionColumns, "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } if (partialUsageByPredicate) { @@ -269,114 +269,15 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c break; } } - acc.AddFetchingStep(*result, *FFColumns, EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *FFColumns, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } return result; } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) - : CommonContext(commonContext) { - ReadMetadata = dynamic_pointer_cast(CommonContext->GetReadMetadata()); - Y_ABORT_UNLESS(ReadMetadata); - Y_ABORT_UNLESS(ReadMetadata->SelectInfo); - - double kffAccessors = 0.01; - double kffFilter = 0.45; - double kffFetching = 0.45; - double kffMerge = 0.10; - TString stagePrefix; - if (ReadMetadata->GetEarlyFilterColumnIds().size()) { - stagePrefix = "EF"; - kffFilter = 0.7; - kffFetching = 0.15; - kffMerge = 0.14; - kffAccessors = 0.01; - } else { - stagePrefix = "FO"; - kffFilter = 0.1; - kffFetching = 0.75; - kffMerge = 0.14; - kffAccessors = 0.01; - } - - std::vector> stages = { - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::FETCHING", kffFetching * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) - }; - ProcessMemoryGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); - ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( - CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); - - auto readSchema = ReadMetadata->GetResultSchema(); - SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); - IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); - { - auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo()); - if (predicateColumns.size()) { - PredicateColumns = std::make_shared(predicateColumns, readSchema); - } else { - PredicateColumns = std::make_shared(); - } - } - { - std::set columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX }; - DeletionColumns = std::make_shared(columnIds, ReadMetadata->GetResultSchema()); - } - - if (!!ReadMetadata->GetRequestShardingInfo()) { - auto shardingColumnIds = - ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames()); - ShardingColumns = std::make_shared(shardingColumnIds, ReadMetadata->GetResultSchema()); - } else { - ShardingColumns = std::make_shared(); - } - { - auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); - if (efColumns.size()) { - EFColumns = std::make_shared(efColumns, readSchema); - } else { - EFColumns = std::make_shared(); - } - } - if (ReadMetadata->HasProcessingColumnIds()) { - FFColumns = std::make_shared(ReadMetadata->GetProcessingColumnIds(), readSchema); - if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { - FFColumns = std::make_shared(*EFColumns + *SpecColumns); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); - } else { - AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString()); - } - } else { - FFColumns = EFColumns; - } - if (FFColumns->IsEmpty()) { - ProgramInputColumns = SpecColumns; - } else { - ProgramInputColumns = FFColumns; - } - - PKColumns = std::make_shared(ReadMetadata->GetPKColumnIds(), readSchema); - MergeColumns = std::make_shared(*PKColumns + *SpecColumns); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); -} - -TString TSpecialReadContext::DebugString() const { - TStringBuilder sb; - sb << "ef=" << EFColumns->DebugString() << ";" - << "sharding=" << ShardingColumns->DebugString() << ";" - << "pk=" << PKColumns->DebugString() << ";" - << "ff=" << FFColumns->DebugString() << ";" - << "program_input=" << ProgramInputColumns->DebugString() << ";"; - return sb; + : TBase(commonContext) { + ReadMetadata = GetCommonContext()->GetReadMetadataPtrVerifiedAs(); } TString TSpecialReadContext::ProfileDebugString() const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 8c0fc73bbcd4..4c34ef572ef4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include @@ -16,30 +16,14 @@ using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; -class TSpecialReadContext { +class TSpecialReadContext: public NCommon::TSpecialReadContext { private: - YDB_READONLY_DEF(std::shared_ptr, CommonContext); - YDB_READONLY_DEF(std::shared_ptr, ProcessMemoryGuard); - YDB_READONLY_DEF(std::shared_ptr, ProcessScopeGuard); - - YDB_READONLY_DEF(std::shared_ptr, SpecColumns); - YDB_READONLY_DEF(std::shared_ptr, MergeColumns); - YDB_READONLY_DEF(std::shared_ptr, ShardingColumns); - YDB_READONLY_DEF(std::shared_ptr, DeletionColumns); - YDB_READONLY_DEF(std::shared_ptr, EFColumns); - YDB_READONLY_DEF(std::shared_ptr, PredicateColumns); - YDB_READONLY_DEF(std::shared_ptr, PKColumns); - YDB_READONLY_DEF(std::shared_ptr, FFColumns); - YDB_READONLY_DEF(std::shared_ptr, ProgramInputColumns); - + using TBase = NCommon::TSpecialReadContext; YDB_READONLY_DEF(std::shared_ptr, MergeStageMemory); YDB_READONLY_DEF(std::shared_ptr, FilterStageMemory); YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); - TAtomic AbortFlag = 0; - NIndexes::TIndexCheckerContainer IndexChecker; TReadMetadata::TConstPtr ReadMetadata; - std::shared_ptr EmptyColumns = std::make_shared(); std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; @@ -52,10 +36,6 @@ class TSpecialReadContext { const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(); const ui64 ReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize; - ui64 GetProcessMemoryControlId() const { - AFL_VERIFY(ProcessMemoryGuard); - return ProcessMemoryGuard->GetProcessId(); - } ui64 GetMemoryForSources(const THashMap>& sources); ui64 GetRequestedMemoryBytes() const { return MergeStageMemory->GetFullMemory() + FilterStageMemory->GetFullMemory() + FetchingStageMemory->GetFullMemory(); @@ -65,22 +45,8 @@ class TSpecialReadContext { return ReadMetadata; } - bool IsAborted() const { - return AtomicGet(AbortFlag); - } - - void Abort() { - AtomicSet(AbortFlag, 1); - } - - ~TSpecialReadContext() { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("profile", ProfileDebugString()); - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("fetching", DebugString()); - } - std::unique_ptr BuildMerger() const; - TString DebugString() const; TString ProfileDebugString() const; TSpecialReadContext(const std::shared_ptr& commonContext); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp index f705deb4501c..6226df5c35f2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp @@ -2,32 +2,6 @@ namespace NKikimr::NOlap::NReader::NPlain { -TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr& context, const TReadMetadata::TConstPtr& readMetadata) - : Context(context) - , ReadMetadata(readMetadata) - , ReadyResults(context->GetCounters()) -{ - IndexedData = readMetadata->BuildReader(Context); - Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted()); -} - -TConclusion> TColumnShardScanIterator::GetBatch() { - FillReadyResults(); - return ReadyResults.pop_front(); -} - -void TColumnShardScanIterator::PrepareResults() { - FillReadyResults(); -} - -TConclusion TColumnShardScanIterator::ReadNextInterval() { - return IndexedData->ReadNextInterval(); -} - -void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const { - return IndexedData->OnSentDataFromInterval(intervalIdx); -} - void TColumnShardScanIterator::FillReadyResults() { auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch); i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit - ItemsRead; @@ -41,22 +15,10 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { - AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "limit_reached_on_scan")("limit", Context->GetReadMetadata()->Limit)("ready", ItemsRead); + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "limit_reached_on_scan")("limit", Context->GetReadMetadata()->Limit)( + "ready", ItemsRead); IndexedData->Abort("records count limit exhausted"); } } -TColumnShardScanIterator::~TColumnShardScanIterator() { - if (!IndexedData->IsFinished()) { - IndexedData->Abort("iterator destructor"); - } - ReadMetadata->ReadStats->PrintToLog(); -} - -void TColumnShardScanIterator::Apply(const std::shared_ptr& task) { - if (!IndexedData->IsFinished()) { - Y_ABORT_UNLESS(task->Apply(*IndexedData)); - } -} - -} +} // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h index 38b1fcc29882..eef490520499 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h @@ -1,104 +1,15 @@ #pragma once -#include -#include -#include -#include +#include namespace NKikimr::NOlap::NReader::NPlain { -class TReadyResults { +class TColumnShardScanIterator: public NCommon::TColumnShardScanIterator { private: - const NColumnShard::TConcreteScanCounters Counters; - std::deque> Data; - i64 RecordsCount = 0; -public: - TString DebugString() const { - TStringBuilder sb; - sb - << "count:" << Data.size() << ";" - << "records_count:" << RecordsCount << ";" - ; - if (Data.size()) { - sb << "schema=" << Data.front()->GetResultBatch().schema()->ToString() << ";"; - } - return sb; - } - TReadyResults(const NColumnShard::TConcreteScanCounters& counters) - : Counters(counters) - { - - } - const std::shared_ptr& emplace_back(std::shared_ptr&& v) { - AFL_VERIFY(!!v); - RecordsCount += v->GetResultBatch().num_rows(); - Data.emplace_back(std::move(v)); - return Data.back(); - } - std::shared_ptr pop_front() { - if (Data.empty()) { - return {}; - } - auto result = std::move(Data.front()); - AFL_VERIFY(RecordsCount >= result->GetResultBatch().num_rows()); - RecordsCount -= result->GetResultBatch().num_rows(); - Data.pop_front(); - return result; - } - bool empty() const { - return Data.empty(); - } - size_t size() const { - return Data.size(); - } -}; - -class TColumnShardScanIterator: public TScanIteratorBase { -private: - std::shared_ptr Context; - const TReadMetadata::TConstPtr ReadMetadata; - TReadyResults ReadyResults; - std::shared_ptr IndexedData; - ui64 ItemsRead = 0; - const i64 MaxRowsInBatch = 5000; - virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override; + using TBase = NCommon::TColumnShardScanIterator; + virtual void FillReadyResults() override; public: - TColumnShardScanIterator(const std::shared_ptr& context, const TReadMetadata::TConstPtr& readMetadata); - ~TColumnShardScanIterator(); - - virtual TConclusionStatus Start() override { - AFL_VERIFY(IndexedData); - return IndexedData->Start(); - } - - virtual std::optional GetAvailableResultsCount() const override { - return ReadyResults.size(); - } - - virtual const TReadStats& GetStats() const override { - return *ReadMetadata->ReadStats; - } - - virtual TString DebugString(const bool verbose) const override { - return TStringBuilder() - << "ready_results:(" << ReadyResults.DebugString() << ");" - << "indexed_data:(" << IndexedData->DebugString(verbose) << ")" - ; - } - - virtual void Apply(const std::shared_ptr& task) override; - - bool Finished() const override { - return IndexedData->IsFinished() && ReadyResults.empty(); - } - - virtual TConclusion> GetBatch() override; - virtual void PrepareResults() override; - - virtual TConclusion ReadNextInterval() override; - -private: - void FillReadyResults(); + using TBase::TBase; }; -} +} // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp index d57492b742c8..646e58c1857d 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp @@ -4,8 +4,6 @@ #include #include #include -#include -#include namespace NKikimr::NOlap::NReader::NSimple { @@ -13,112 +11,13 @@ std::unique_ptr TReadMetadata::StartScan(const std::shared_pt return std::make_unique(readContext, readContext->GetReadMetadataPtrVerifiedAs()); } -TConclusionStatus TReadMetadata::Init( +TConclusionStatus TReadMetadata::DoInitCustom( const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) { - SetPKRangesFilter(readDescription.PKRangesFilter); - InitShardingInfo(readDescription.PathId); - TxId = readDescription.TxId; - LockId = readDescription.LockId; - if (LockId) { - owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation()); - LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo(); - } - - SelectInfo = dataAccessor.Select(readDescription, !!LockId); - if (LockId) { - for (auto&& i : SelectInfo->PortionsOrderedPK) { - if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) { - if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) { - } else { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified()); - AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId()); - } - } - } - } - - StatsMode = readDescription.StatsMode; return TConclusionStatus::Success(); } -std::set TReadMetadata::GetEarlyFilterColumnIds() const { - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - std::set result; - for (auto&& i : GetProgram().GetEarlyFilterColumns()) { - auto id = indexInfo.GetColumnIdOptional(i); - if (id) { - result.emplace(*id); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } - } - return result; -} - -std::set TReadMetadata::GetPKColumnIds() const { - std::set result; - auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - for (auto&& i : indexInfo.GetPrimaryKeyColumns()) { - Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second); - } - return result; -} - std::shared_ptr TReadMetadata::BuildReader(const std::shared_ptr& context) const { return std::make_shared(context); } -NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const { - return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted()); -} - -void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const { - if (!GetLockId()) { - return; - } - const ui64 lock = *GetLockId(); - if (GetBrokenWithCommitted()) { - owner.GetOperationsManager().GetLockVerified(lock).SetBroken(); - } else { - NOlap::NTxInteractions::TTxConflicts conflicts; - for (auto&& i : GetConflictableLockIds()) { - conflicts.Add(i, lock); - } - auto writer = std::make_shared(PathId, conflicts); - owner.GetOperationsManager().AddEventForLock(owner, lock, writer); - } -} - -void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const { - if (!LockId) { - return; - } - auto evWriter = std::make_shared( - PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds()); - owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter); -} - -void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const { - if (LockSharingInfo) { - NKikimrDataEvents::TLock lockInfo; - lockInfo.SetLockId(LockSharingInfo->GetLockId()); - lockInfo.SetGeneration(LockSharingInfo->GetGeneration()); - lockInfo.SetDataShard(tabletId); - lockInfo.SetCounter(LockSharingInfo->GetCounter()); - lockInfo.SetPathId(PathId); - lockInfo.SetHasWrites(LockSharingInfo->HasWrites()); - if (LockSharingInfo->IsBroken()) { - scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo)); - } else { - scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo)); - } - } -} - -bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const { - AFL_VERIFY(LockSharingInfo); - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size()); - return it->second.GetLockId() == LockSharingInfo->GetLockId(); -} - } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h index f894284dfd94..eb1e302ca21e 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h @@ -1,172 +1,25 @@ #pragma once -#include -#include -#include -#include -#include - -namespace NKikimr::NColumnShard { -class TLockSharingInfo; -} +#include namespace NKikimr::NOlap::NReader::NSimple { -// Holds all metadata that is needed to perform read/scan -struct TReadMetadata : public TReadMetadataBase { - using TBase = TReadMetadataBase; - -private: - const ui64 PathId; - std::shared_ptr BrokenWithCommitted = std::make_shared(); - std::shared_ptr LockSharingInfo; - - class TWriteIdInfo { - private: - const ui64 LockId; - std::shared_ptr Conflicts; - - public: - TWriteIdInfo(const ui64 lockId, const std::shared_ptr& counter) - : LockId(lockId) - , Conflicts(counter) { - } - - ui64 GetLockId() const { - return LockId; - } - - void MarkAsConflictable() const { - Conflicts->Inc(); - } - - bool IsConflictable() const { - return Conflicts->Val(); - } - }; - - THashMap> LockConflictCounters; - THashMap ConflictedWriteIds; - - virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override; - virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override; - virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override; +class TReadMetadata: public NCommon::TReadMetadata { + using TBase = NCommon::TReadMetadata; public: using TConstPtr = std::shared_ptr; + using TBase::TBase; - bool GetBrokenWithCommitted() const { - return BrokenWithCommitted->Val(); - } - THashSet GetConflictableLockIds() const { - THashSet result; - for (auto&& i : ConflictedWriteIds) { - if (i.second.IsConflictable()) { - result.emplace(i.second.GetLockId()); - } - } - return result; - } - - bool IsLockConflictable(const ui64 lockId) const { - auto it = LockConflictCounters.find(lockId); - AFL_VERIFY(it != LockConflictCounters.end()); - return it->second->Val(); - } - - bool IsWriteConflictable(const TInsertWriteId writeId) const { - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); - return it->second.IsConflictable(); - } - - void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) { - auto it = LockConflictCounters.find(lockId); - if (it == LockConflictCounters.end()) { - it = LockConflictCounters.emplace(lockId, std::make_shared()).first; - } - AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second); - } - - [[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const; - - void SetConflictedWriteId(const TInsertWriteId writeId) const { - auto it = ConflictedWriteIds.find(writeId); - AFL_VERIFY(it != ConflictedWriteIds.end()); - it->second.MarkAsConflictable(); - } - - void SetBrokenWithCommitted() const { - BrokenWithCommitted->Inc(); - } - - NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const; - std::shared_ptr BuildReader(const std::shared_ptr& context) const; - - bool HasProcessingColumnIds() const { - return GetProgram().HasProcessingColumnIds(); - } - - ui64 GetPathId() const { - return PathId; - } - - std::shared_ptr SelectInfo; - NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; - std::shared_ptr ReadStats; - - TReadMetadata(const ui64 pathId, const std::shared_ptr info, const TSnapshot& snapshot, const ESorting sorting, - const TProgramContainer& ssaProgram, const std::shared_ptr& scanCursor) - : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor) - , PathId(pathId) - , ReadStats(std::make_shared()) - { - } - - virtual std::vector GetKeyYqlSchema() const override { - return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns(); - } - - TConclusionStatus Init(const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor); - - std::vector GetColumnsOrder() const { - auto schema = GetResultSchema(); - std::vector result; - for (auto&& i : schema->GetSchema()->fields()) { - result.emplace_back(i->name()); - } - return result; - } - - std::set GetEarlyFilterColumnIds() const; - std::set GetPKColumnIds() const; - - bool Empty() const { + virtual bool Empty() const override { Y_ABORT_UNLESS(SelectInfo); return SelectInfo->PortionsOrderedPK.empty(); } - size_t NumIndexedBlobs() const { - Y_ABORT_UNLESS(SelectInfo); - return SelectInfo->Stats().Blobs; - } - - std::unique_ptr StartScan(const std::shared_ptr& readContext) const override; + virtual std::shared_ptr BuildReader(const std::shared_ptr& context) const override; + virtual TConclusionStatus DoInitCustom( + const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) override; - void Dump(IOutputStream& out) const override { - out << " index blobs: " << NumIndexedBlobs() - // << " with program steps: " << (Program ? Program->Steps.size() : 0) - << " at snapshot: " << GetRequestSnapshot().DebugString(); - TBase::Dump(out); - if (SelectInfo) { - out << ", "; - SelectInfo->DebugStream(out); - } - } - - friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) { - meta.Dump(out); - return out; - } + virtual std::unique_ptr StartScan(const std::shared_ptr& readContext) const override; }; -} +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make index 883f2b6b8e33..165408de6d67 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/reader/abstract + ydb/core/tx/columnshard/engines/reader/common_reader/constructor ydb/core/kqp/compute_actor ) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 5908800255fd..d2509215cca1 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -5,15 +5,10 @@ namespace NKikimr::NOlap::NReader::NSimple { -std::unique_ptr TSpecialReadContext::BuildMerger() const { - return std::make_unique( - ReadMetadata->GetReplaceKey(), ProgramInputColumns->GetSchema(), CommonContext->IsReverse(), IIndexInfo::GetSnapshotColumnNames()); -} - std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source) { const bool needSnapshots = ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax(); - if (!needSnapshots && FFColumns->GetColumnIds().size() == 1 && - FFColumns->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX)) { + if (!needSnapshots && GetFFColumns()->GetColumnIds().size() == 1 && + GetFFColumns()->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX)) { std::shared_ptr result = std::make_shared(*this); source->SetSourceInMemory(true); result->SetBranchName("FAKE"); @@ -25,9 +20,9 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con if (!AskAccumulatorsScript) { AskAccumulatorsScript = std::make_shared(*this); AskAccumulatorsScript->AddStep( - source->PredictAccessorsSize(FFColumns->GetColumnIds()), EStageFeaturesIndexes::Accessors); + source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors); AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*FFColumns); + AskAccumulatorsScript->AddStep(*GetFFColumns()); } return AskAccumulatorsScript; } @@ -127,30 +122,30 @@ class TColumnsAccumulator { std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { std::shared_ptr result = std::make_shared(*this); - const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount(); + const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - TColumnsAccumulator acc(MergeColumns, ReadMetadata->GetResultSchema()); + TColumnsAccumulator acc(GetMergeColumns(), ReadMetadata->GetResultSchema()); if (!!IndexChecker && useIndexes) { result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); result->AddStep(std::make_shared(IndexChecker)); } - if (needFilterSharding && !ShardingColumns->IsEmpty()) { - const TColumnsSetIds columnsFetch = *ShardingColumns; + if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { + const TColumnsSetIds columnsFetch = *GetShardingColumns(); acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } { result->SetBranchName("exclusive"); - TColumnsSet columnsFetch = *EFColumns; + TColumnsSet columnsFetch = *GetEFColumns(); if (needFilterDeletion) { - columnsFetch = columnsFetch + *DeletionColumns; + columnsFetch = columnsFetch + *GetDeletionColumns(); } - if (needSnapshots || FFColumns->Cross(*SpecColumns)) { - columnsFetch = columnsFetch + *SpecColumns; + if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { + columnsFetch = columnsFetch + *GetSpecColumns(); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *PredicateColumns; + columnsFetch = columnsFetch + *GetPredicateColumns(); } if (columnsFetch.GetColumnsCount()) { @@ -158,15 +153,15 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *DeletionColumns, "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *PredicateColumns, "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } - if (needSnapshots || FFColumns->Cross(*SpecColumns)) { - acc.AddAssembleStep(*result, *SpecColumns, "SPEC", EStageFeaturesIndexes::Filter, false); + if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { + acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); result->AddStep(std::make_shared()); } for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { @@ -183,116 +178,16 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c if (GetReadMetadata()->Limit) { result->AddStep(std::make_shared(GetReadMetadata()->Limit, GetReadMetadata()->IsDescSorted())); } - acc.AddFetchingStep(*result, *FFColumns, EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *FFColumns, "LAST", EStageFeaturesIndexes::Fetching, false); + acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, false); } result->AddStep(); return result; } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) - : CommonContext(commonContext) { - ReadMetadata = dynamic_pointer_cast(CommonContext->GetReadMetadata()); - Y_ABORT_UNLESS(ReadMetadata); - Y_ABORT_UNLESS(ReadMetadata->SelectInfo); - - double kffAccessors = 0.01; - double kffFilter = 0.45; - double kffFetching = 0.45; - double kffMerge = 0.10; - TString stagePrefix; - if (ReadMetadata->GetEarlyFilterColumnIds().size()) { - stagePrefix = "EF"; - kffFilter = 0.7; - kffFetching = 0.15; - kffMerge = 0.14; - kffAccessors = 0.01; - } else { - stagePrefix = "FO"; - kffFilter = 0.1; - kffFetching = 0.75; - kffMerge = 0.14; - kffAccessors = 0.01; - } - - std::vector> stages = { - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( - stagePrefix + "::FETCHING", kffFetching * TGlobalLimits::ScanMemoryLimit), - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) - }; - ProcessMemoryGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); - ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( - CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); - - auto readSchema = ReadMetadata->GetResultSchema(); - SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); - IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); - { - auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo()); - if (predicateColumns.size()) { - PredicateColumns = std::make_shared(predicateColumns, readSchema); - } else { - PredicateColumns = std::make_shared(); - } - } - { - std::set columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX }; - DeletionColumns = std::make_shared(columnIds, ReadMetadata->GetResultSchema()); - } - - if (!!ReadMetadata->GetRequestShardingInfo()) { - auto shardingColumnIds = - ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames()); - ShardingColumns = std::make_shared(shardingColumnIds, ReadMetadata->GetResultSchema()); - } else { - ShardingColumns = std::make_shared(); - } - { - auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); - if (efColumns.size()) { - EFColumns = std::make_shared(efColumns, readSchema); - } else { - EFColumns = std::make_shared(); - } - } - if (ReadMetadata->HasProcessingColumnIds()) { - FFColumns = std::make_shared(ReadMetadata->GetProcessingColumnIds(), readSchema); - if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { - FFColumns = std::make_shared(*EFColumns + *SpecColumns); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); - } else { - AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString()); - } - } else { - FFColumns = EFColumns; - } - if (FFColumns->IsEmpty()) { - ProgramInputColumns = SpecColumns; - } else { - ProgramInputColumns = FFColumns; - } - AllUsageColumns = std::make_shared(*FFColumns + *PredicateColumns); - - PKColumns = std::make_shared(ReadMetadata->GetPKColumnIds(), readSchema); - MergeColumns = std::make_shared(*PKColumns + *SpecColumns); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); -} - -TString TSpecialReadContext::DebugString() const { - TStringBuilder sb; - sb << "ef=" << EFColumns->DebugString() << ";" - << "sharding=" << ShardingColumns->DebugString() << ";" - << "pk=" << PKColumns->DebugString() << ";" - << "ff=" << FFColumns->DebugString() << ";" - << "program_input=" << ProgramInputColumns->DebugString() << ";"; - return sb; + : TBase(commonContext) { + ReadMetadata = GetCommonContext()->GetReadMetadataPtrVerifiedAs(); } TString TSpecialReadContext::ProfileDebugString() const { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h index 4159f0da79f6..5fdb245d9b77 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include @@ -16,31 +16,10 @@ using EStageFeaturesIndexes = NCommon::EStageFeaturesIndexes; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; -class TSpecialReadContext { +class TSpecialReadContext: public NCommon::TSpecialReadContext { private: - YDB_READONLY_DEF(std::shared_ptr, CommonContext); - YDB_READONLY_DEF(std::shared_ptr, ProcessMemoryGuard); - YDB_READONLY_DEF(std::shared_ptr, ProcessScopeGuard); - - YDB_READONLY_DEF(std::shared_ptr, SpecColumns); - YDB_READONLY_DEF(std::shared_ptr, MergeColumns); - YDB_READONLY_DEF(std::shared_ptr, ShardingColumns); - YDB_READONLY_DEF(std::shared_ptr, DeletionColumns); - YDB_READONLY_DEF(std::shared_ptr, EFColumns); - YDB_READONLY_DEF(std::shared_ptr, PredicateColumns); - YDB_READONLY_DEF(std::shared_ptr, PKColumns); - YDB_READONLY_DEF(std::shared_ptr, AllUsageColumns); - YDB_READONLY_DEF(std::shared_ptr, FFColumns); - YDB_READONLY_DEF(std::shared_ptr, ProgramInputColumns); - - YDB_READONLY_DEF(std::shared_ptr, MergeStageMemory); - YDB_READONLY_DEF(std::shared_ptr, FilterStageMemory); - YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); - - TAtomic AbortFlag = 0; - NIndexes::TIndexCheckerContainer IndexChecker; + using TBase = NCommon::TSpecialReadContext; TReadMetadata::TConstPtr ReadMetadata; - std::shared_ptr EmptyColumns = std::make_shared(); std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; @@ -49,39 +28,11 @@ class TSpecialReadContext { std::shared_ptr AskAccumulatorsScript; public: - const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(); - const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(); - const ui64 ReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize; - - ui64 GetProcessMemoryControlId() const { - AFL_VERIFY(ProcessMemoryGuard); - return ProcessMemoryGuard->GetProcessId(); - } - ui64 GetRequestedMemoryBytes() const { - return MergeStageMemory->GetFullMemory() + FilterStageMemory->GetFullMemory() + FetchingStageMemory->GetFullMemory(); - } - const TReadMetadata::TConstPtr& GetReadMetadata() const { return ReadMetadata; } - bool IsAborted() const { - return AtomicGet(AbortFlag); - } - - void Abort() { - AtomicSet(AbortFlag, 1); - } - - ~TSpecialReadContext() { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("profile", ProfileDebugString()); - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("fetching", DebugString()); - } - - std::unique_ptr BuildMerger() const; - - TString DebugString() const; - TString ProfileDebugString() const; + virtual TString ProfileDebugString() const override; TSpecialReadContext(const std::shared_ptr& commonContext); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp index 39c548800d27..70f75d623451 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp @@ -4,31 +4,6 @@ namespace NKikimr::NOlap::NReader::NSimple { -TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr& context, const TReadMetadata::TConstPtr& readMetadata) - : Context(context) - , ReadMetadata(readMetadata) - , ReadyResults(context->GetCounters()) { - IndexedData = readMetadata->BuildReader(Context); - Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted()); -} - -TConclusion> TColumnShardScanIterator::GetBatch() { - FillReadyResults(); - return ReadyResults.pop_front(); -} - -void TColumnShardScanIterator::PrepareResults() { - FillReadyResults(); -} - -TConclusion TColumnShardScanIterator::ReadNextInterval() { - return IndexedData->ReadNextInterval(); -} - -void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const { - return IndexedData->OnSentDataFromInterval(intervalIdx); -} - void TColumnShardScanIterator::FillReadyResults() { auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch); const i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit; @@ -39,21 +14,4 @@ void TColumnShardScanIterator::FillReadyResults() { } } -TColumnShardScanIterator::~TColumnShardScanIterator() { - if (!IndexedData->IsFinished()) { - IndexedData->Abort("iterator destructor"); - } - ReadMetadata->ReadStats->PrintToLog(); -} - -void TColumnShardScanIterator::Apply(const std::shared_ptr& task) { - if (!IndexedData->IsFinished()) { - Y_ABORT_UNLESS(task->Apply(*IndexedData)); - } -} - -const TReadStats& TColumnShardScanIterator::GetStats() const { - return *ReadMetadata->ReadStats; -} - } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h index 46d34944f20d..5e92b150c4dc 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h @@ -1,103 +1,15 @@ #pragma once -#include -#include -#include +#include namespace NKikimr::NOlap::NReader::NSimple { -struct TReadMetadata; - -class TReadyResults { -private: - const NColumnShard::TConcreteScanCounters Counters; - std::deque> Data; - i64 RecordsCount = 0; -public: - TString DebugString() const { - TStringBuilder sb; - sb - << "count:" << Data.size() << ";" - << "records_count:" << RecordsCount << ";" - ; - if (Data.size()) { - sb << "schema=" << Data.front()->GetResultBatch().schema()->ToString() << ";"; - } - return sb; - } - TReadyResults(const NColumnShard::TConcreteScanCounters& counters) - : Counters(counters) - { - - } - const std::shared_ptr& emplace_back(std::shared_ptr&& v) { - AFL_VERIFY(!!v); - RecordsCount += v->GetResultBatch().num_rows(); - Data.emplace_back(std::move(v)); - return Data.back(); - } - std::shared_ptr pop_front() { - if (Data.empty()) { - return {}; - } - auto result = std::move(Data.front()); - AFL_VERIFY(RecordsCount >= result->GetResultBatch().num_rows()); - RecordsCount -= result->GetResultBatch().num_rows(); - Data.pop_front(); - return result; - } - bool empty() const { - return Data.empty(); - } - size_t size() const { - return Data.size(); - } -}; - -class TColumnShardScanIterator: public TScanIteratorBase { +class TColumnShardScanIterator: public NCommon::TColumnShardScanIterator { private: - std::shared_ptr Context; - std::shared_ptr ReadMetadata; - TReadyResults ReadyResults; - std::shared_ptr IndexedData; - ui64 ItemsRead = 0; - const i64 MaxRowsInBatch = 5000; - virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override; + using TBase = NCommon::TColumnShardScanIterator; + virtual void FillReadyResults() override; public: - TColumnShardScanIterator(const std::shared_ptr& context, const std::shared_ptr& readMetadata); - ~TColumnShardScanIterator(); - - virtual TConclusionStatus Start() override { - AFL_VERIFY(IndexedData); - return IndexedData->Start(); - } - - virtual std::optional GetAvailableResultsCount() const override { - return ReadyResults.size(); - } - - virtual const TReadStats& GetStats() const override; - - virtual TString DebugString(const bool verbose) const override { - return TStringBuilder() - << "ready_results:(" << ReadyResults.DebugString() << ");" - << "indexed_data:(" << IndexedData->DebugString(verbose) << ")" - ; - } - - virtual void Apply(const std::shared_ptr& task) override; - - bool Finished() const override { - return IndexedData->IsFinished() && ReadyResults.empty(); - } - - virtual TConclusion> GetBatch() override; - virtual void PrepareResults() override; - - virtual TConclusion ReadNextInterval() override; - -private: - void FillReadyResults(); + using TBase::TBase; }; -} +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp index eab1651e5f4b..66676898435a 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp @@ -70,7 +70,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) { TStringBuilder detailedInfo; if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { - detailedInfo << " read metadata: (" << *readMetadataRange << ")"; + detailedInfo << " read metadata: (" << readMetadataRange->DebugString() << ")"; } const TVersionedIndex* index = nullptr; diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp index 1e682656024e..0eecece7c936 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp @@ -125,7 +125,7 @@ void TTxScan::Complete(const TActorContext& ctx) { TStringBuilder detailedInfo; if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { - detailedInfo << " read metadata: (" << *readMetadataRange << ")" + detailedInfo << " read metadata: (" << readMetadataRange->DebugString() << ")" << " req: " << request; }