Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scanners unification plain/simple for reuse code #12847

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace NKikimr::NOlap::NReader::NCommon {

TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
: CommonContext(commonContext) {
auto readMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
Y_ABORT_UNLESS(readMetadata->SelectInfo);
ReadMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);

double kffAccessors = 0.01;
double kffFilter = 0.45;
double kffFetching = 0.45;
double kffMerge = 0.10;
TString stagePrefix;
if (readMetadata->GetEarlyFilterColumnIds().size()) {
if (ReadMetadata->GetEarlyFilterColumnIds().size()) {
stagePrefix = "EF";
kffFilter = 0.7;
kffFetching = 0.15;
Expand All @@ -41,15 +41,15 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId());

auto readSchema = readMetadata->GetResultSchema();
auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
IndexChecker = readMetadata->GetProgram().GetIndexChecker();
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
auto predicateColumns = readMetadata->GetPKRangesFilter().GetColumnIds(readMetadata->GetIndexInfo());
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
if (predicateColumns.size()) {
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, readSchema);
} else {
Expand All @@ -58,26 +58,26 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
}
{
std::set<ui32> columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, readMetadata->GetResultSchema());
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, ReadMetadata->GetResultSchema());
}

if (!!readMetadata->GetRequestShardingInfo()) {
if (!!ReadMetadata->GetRequestShardingInfo()) {
auto shardingColumnIds =
readMetadata->GetIndexInfo().GetColumnIdsVerified(readMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, readMetadata->GetResultSchema());
ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, ReadMetadata->GetResultSchema());
} else {
ShardingColumns = std::make_shared<TColumnsSet>();
}
{
auto efColumns = readMetadata->GetEarlyFilterColumnIds();
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
EFColumns = std::make_shared<TColumnsSet>(efColumns, readSchema);
} else {
EFColumns = std::make_shared<TColumnsSet>();
}
}
if (readMetadata->HasProcessingColumnIds()) {
FFColumns = std::make_shared<TColumnsSet>(readMetadata->GetProcessingColumnIds(), readSchema);
if (ReadMetadata->HasProcessingColumnIds()) {
FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), readSchema);
if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) {
FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString());
Expand All @@ -95,7 +95,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
}
AllUsageColumns = std::make_shared<TColumnsSet>(*FFColumns + *PredicateColumns);

PKColumns = std::make_shared<TColumnsSet>(readMetadata->GetPKColumnIds(), readSchema);
PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), readSchema);
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
#include "columns_set.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TFetchingScript;
class IDataSource;

class TSpecialReadContext {
private:
YDB_READONLY_DEF(std::shared_ptr<TReadContext>, CommonContext);
Expand All @@ -28,13 +32,36 @@ class TSpecialReadContext {
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FilterStageMemory);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FetchingStageMemory);

TReadMetadata::TConstPtr ReadMetadata;
TAtomic AbortFlag = 0;

virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) = 0;

protected:
NIndexes::TIndexCheckerContainer IndexChecker;
std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();

public:
template <class T>
std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<T>& source) {
return GetColumnsFetchingPlan(std::static_pointer_cast<IDataSource>(source));
}

std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
return DoGetColumnsFetchingPlan(source);
}

const TReadMetadata::TConstPtr& GetReadMetadata() const {
return ReadMetadata;
}

template <class T>
std::shared_ptr<T> GetReadMetadataVerifiedAs() const {
auto result = std::dynamic_pointer_cast<T>(ReadMetadata);
AFL_VERIFY(!!result);
return result;
}

ui64 GetProcessMemoryControlId() const {
AFL_VERIFY(ProcessMemoryGuard);
return ProcessMemoryGuard->GetProcessId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include "fetch_steps.h"
#include "source.h"

#include <ydb/core/formats/arrow/common/container.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>

#include <ydb/library/formats/arrow/simple_arrays_cache.h>

namespace NKikimr::NOlap::NReader::NCommon {

TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
return !source->StartFetchingColumns(source, step, Columns);
}

ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnBlobBytes(Columns.GetColumnIds());
}

TConclusion<bool> TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->AssembleColumns(Columns);
return true;
}

ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnRawBytes(Columns->GetColumnIds());
}

TConclusion<bool> TOptionalAssemblerStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->AssembleColumns(Columns, !source->IsSourceInMemory());
return true;
}

ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential);
}

bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*allocation*/) {
auto data = Source.lock();
if (!data || data->GetContext()->IsAborted()) {
guard->Release();
return false;
}
if (StageIndex == EStageFeaturesIndexes::Accessors) {
data->MutableStageData().SetAccessorsGuard(std::move(guard));
} else {
data->RegisterAllocationGuard(std::move(guard));
}
Step.Next();
auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId());
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
return true;
}

TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex)
: TBase(mem)
, Source(source)
, Step(step)
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, StageIndex(stageIndex) {
}

void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
auto sourcePtr = Source.lock();
if (sourcePtr) {
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
}
}

TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
ui64 size = PredefinedSize.value_or(0);
for (auto&& i : Packs) {
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) {
const ui32 filtered =
source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust());
if (filtered < source->GetRecordsCount()) {
sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount();
}
}
size += sizeLocal;
}

auto allocation = std::make_shared<TFetchingStepAllocation>(source, size, step, StageIndex);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex);
return false;
}

ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const {
return 0;
}

NKikimr::TConclusion<bool> TBuildStageResultStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->BuildStageResult(source);
return true;
}

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#pragma once
#include "fetching.h"

#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TAllocateMemoryStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
class TColumnsPack {
private:
YDB_READONLY_DEF(TColumnsSetIds, Columns);
YDB_READONLY(EMemType, MemType, EMemType::Blob);

public:
TColumnsPack(const TColumnsSetIds& columns, const EMemType memType)
: Columns(columns)
, MemType(memType) {
}
};
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
const EStageFeaturesIndexes StageIndex;
const std::optional<ui64> PredefinedSize;

protected:
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
private:
using TBase = NGroupedMemoryManager::IAllocation;
std::weak_ptr<IDataSource> Source;
TFetchingScriptCursor Step;
NColumnShard::TCounterGuard TasksGuard;
const EStageFeaturesIndexes StageIndex;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step,
const EStageFeaturesIndexes stageIndex);
};
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "stage=" << StageIndex << ";";
}

public:
void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) {
if (!ids.GetColumnsCount()) {
return;
}
for (auto&& i : ids.GetColumnIds()) {
AFL_VERIFY(Control[i].emplace(memType).second);
}
Packs.emplace_back(ids, memType);
}
EStageFeaturesIndexes GetStage() const {
return StageIndex;
}

TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}

TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex)
, PredefinedSize(memSize) {
}
};

class TAssemblerStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
: TBase("ASSEMBLER" + (specName ? "::" + specName : ""))
, Columns(columns) {
AFL_VERIFY(Columns);
AFL_VERIFY(Columns->GetColumnsCount());
}
};

class TBuildStageResultStep: public IFetchingStep {
private:
using TBase = IFetchingStep;

public:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const override;
TBuildStageResultStep()
: TBase("BUILD_STAGE_RESULT") {
}
};

class TOptionalAssemblerStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;

virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
TOptionalAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
: TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : ""))
, Columns(columns) {
AFL_VERIFY(Columns);
AFL_VERIFY(Columns->GetColumnsCount());
}
};

class TColumnBlobsFetchingStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
TColumnsSetIds Columns;

protected:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
TColumnBlobsFetchingStep(const TColumnsSetIds& columns)
: TBase("FETCHING_COLUMNS")
, Columns(columns) {
AFL_VERIFY(Columns.GetColumnsCount());
}
};

} // namespace NKikimr::NOlap::NReader::NCommon
Loading
Loading