diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp similarity index 65% rename from ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp rename to ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp index c39320e79981..a67da1467e87 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp @@ -1,8 +1,9 @@ #include "constructor.h" -#include + #include +#include -namespace NKikimr::NOlap::NReader::NSimple { +namespace NKikimr::NOlap::NReader::NCommon { void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData())); @@ -12,17 +13,18 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptrGetCommonContext()->GetScanActorId()) - ("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId); - NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), - std::make_unique(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())( + "scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus())( + "storage_id", storageId); + NActors::TActorContext::AsActorContext().Send( + Context->GetCommonContext()->GetScanActorId(), std::make_unique( + TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); return false; } TBlobsFetcherTask::TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, - const std::shared_ptr& context, - const TString& taskCustomer, const TString& externalTaskId) + const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId) : TBase(readActions, taskCustomer, externalTaskId) , Source(sourcePtr) , Step(step) @@ -30,4 +32,4 @@ TBlobsFetcherTask::TBlobsFetcherTask(const std::vectorGetCommonContext()->GetCounters().GetFetchBlobsGuard()) { } -} +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h similarity index 56% rename from ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h rename to ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h index e8dfc3f15e4f..f2f097d00f11 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h @@ -1,33 +1,37 @@ #pragma once -#include -#include -#include -#include -#include +#include "fetching.h" #include "source.h" -namespace NKikimr::NOlap::NReader::NPlain { +#include +#include +#include +#include +#include + +namespace NKikimr::NOlap::NReader::NCommon { class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter { private: using TBase = NBlobOperations::NRead::ITask; - const std::shared_ptr Source; + const std::shared_ptr Source; TFetchingScriptCursor Step; - const std::shared_ptr Context; + const std::shared_ptr Context; + const NColumnShard::TCounterGuard Guard; virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; + public: - TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, + template + TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId) - : TBase(readActions, taskCustomer, externalTaskId) - , Source(sourcePtr) - , Step(step) - , Context(context) - { - + : TBlobsFetcherTask(readActions, std::static_pointer_cast(sourcePtr), step, context, taskCustomer, externalTaskId) { } + + TBlobsFetcherTask(const std::vector>& readActions, + const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, + const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId); }; -} +} // namespace NKikimr::NOlap::NReader::NCommon diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 565c51d255b9..14ca43ec9960 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -1,6 +1,7 @@ #pragma once #include "columns_set.h" +#include #include #include @@ -17,11 +18,58 @@ class IDataSource; class TSpecialReadContext; class TFetchingScriptCursor; -class IFetchingStep { +class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr DurationCounter; + NMonitoring::TDynamicCounters::TCounterPtr BytesCounter; + +public: + TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner) + : TBase(std::move(owner)) + , DurationCounter(TBase::GetDeriviative("duration_ms")) + , BytesCounter(TBase::GetDeriviative("bytes_ms")) { + } + + void AddDuration(const TDuration d) const { + DurationCounter->Add(d.MilliSeconds()); + } + + void AddBytes(const ui32 v) const { + BytesCounter->Add(v); + } +}; + +class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + TMutex Mutex; + THashMap Collection; + TFetchingStepSignals GetSignalsImpl(const TString& name) { + TGuard g(Mutex); + auto it = Collection.find(name); + if (it == Collection.end()) { + it = Collection.emplace(name, TFetchingStepSignals(CreateSubGroup("step_name", name))).first; + } + return it->second; + } + +public: + TFetchingStepsSignalsCollection() + : TBase("scan_steps") { + } + + static TFetchingStepSignals GetSignals(const TString& name) { + return Singleton()->GetSignalsImpl(name); + } +}; + +class IFetchingStep: public TNonCopyable { private: YDB_READONLY_DEF(TString, Name); YDB_READONLY(TDuration, SumDuration, TDuration::Zero()); YDB_READONLY(ui64, SumSize, 0); + TFetchingStepSignals Signals; protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const = 0; @@ -32,9 +80,11 @@ class IFetchingStep { public: void AddDuration(const TDuration d) { SumDuration += d; + Signals.AddDuration(d); } void AddDataSize(const ui64 size) { SumSize += size; + Signals.AddBytes(size); } virtual ~IFetchingStep() = default; @@ -48,7 +98,8 @@ class IFetchingStep { } IFetchingStep(const TString& name) - : Name(name) { + : Name(name) + , Signals(TFetchingStepsSignalsCollection::GetSignals(name)) { } TString DebugString() const; @@ -195,9 +246,7 @@ class TStepAction: public IDataTasksProcessor::ITask { template TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId) - : TStepAction(std::static_pointer_cast(source), std::move(cursor), ownerActorId) - { - + : TStepAction(std::static_pointer_cast(source), std::move(cursor), ownerActorId) { } TStepAction(const std::shared_ptr& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId); }; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 5ff7dcd23ab4..d0b8b414622e 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -1,13 +1,14 @@ LIBRARY() SRCS( - fetched_data.cpp columns_set.cpp - iterator.cpp + constructor.cpp context.cpp - source.cpp - fetching.cpp fetch_steps.cpp + fetched_data.cpp + fetching.cpp + iterator.cpp + source.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp deleted file mode 100644 index 00bc62547e27..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include "constructor.h" -#include -#include - -namespace NKikimr::NOlap::NReader::NPlain { - -void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { - Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData())); - AFL_VERIFY(Step.Next()); - auto task = std::make_shared(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId()); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); -} - -bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId()) - ("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId); - NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), - std::make_unique(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); - return false; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 9a9bd23c8571..d0ab21a79cc7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -1,4 +1,3 @@ -#include "constructor.h" #include "fetched_data.h" #include "interval.h" #include "plain_read_data.h" @@ -7,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +17,7 @@ namespace NKikimr::NOlap::NReader::NPlain { void IDataSource::InitFetchingPlan(const std::shared_ptr& fetching) { AFL_VERIFY(fetching); -// AFL_VERIFY(!FetchingPlan); + // AFL_VERIFY(!FetchingPlan); FetchingPlan = fetching; } @@ -122,7 +122,8 @@ bool TPortionDataSource::DoStartFetchingColumns( return false; } - auto constructor = std::make_shared(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = + std::make_shared(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } @@ -157,7 +158,8 @@ bool TPortionDataSource::DoStartFetchingIndexes( return false; } - auto constructor = std::make_shared(readingActions, std::static_pointer_cast(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = + std::make_shared(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } @@ -276,7 +278,7 @@ bool TCommittedDataSource::DoStartFetchingColumns( readAction->AddRange(CommittedBlob.GetBlobRange()); std::vector> actions = { readAction }; - auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } @@ -290,7 +292,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& AFL_VERIFY(GetStageData().GetBlobs().size() == 1); auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion()); - auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end()))); + auto rBatch = NArrow::DeserializeBatch( + bData, std::make_shared(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end()))); AFL_VERIFY(rBatch)("schema", schema.ToString()); auto batch = std::make_shared(rBatch); std::set columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/ya.make index c406e131eb15..d19dede6b2ba 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( scanner.cpp - constructor.cpp source.cpp interval.cpp fetched_data.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h deleted file mode 100644 index ef8cd6b62a7b..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once -#include -#include -#include -#include -#include -#include "source.h" - -namespace NKikimr::NOlap::NReader::NSimple { - -class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter { -private: - using TBase = NBlobOperations::NRead::ITask; - const std::shared_ptr Source; - TFetchingScriptCursor Step; - const std::shared_ptr Context; - const NColumnShard::TCounterGuard Guard; - - virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; - virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; -public: - TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, - const TFetchingScriptCursor& step, const std::shared_ptr& context, const TString& taskCustomer, - const TString& externalTaskId); -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index af65ff1a8fc6..845efead7db9 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -1,4 +1,3 @@ -#include "constructor.h" #include "fetched_data.h" #include "plain_read_data.h" #include "source.h" @@ -6,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -29,7 +29,7 @@ void IDataSource::StartProcessing(const std::shared_ptr& sourcePtr) GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); SetMemoryGroupId(SourceGroupGuard->GetGroupId()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); -// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); + // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); TFetchingScriptCursor cursor(FetchingPlan, 0); auto task = std::make_shared(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); @@ -123,7 +123,8 @@ bool TPortionDataSource::DoStartFetchingColumns( return false; } - auto constructor = std::make_shared(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = + std::make_shared(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } @@ -158,8 +159,8 @@ bool TPortionDataSource::DoStartFetchingIndexes( return false; } - auto constructor = std::make_shared( - readingActions, std::static_pointer_cast(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), ""); + auto constructor = + std::make_shared(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make index 413fdf53073a..45fef368d323 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( scanner.cpp - constructor.cpp source.cpp fetched_data.cpp plain_read_data.cpp