Skip to content

Commit

Permalink
fix simple reading with accessors fetching (ydb-platform#12000)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent e3e1585 commit 34fe6db
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 43 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
break;
}
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());

for (auto& [_, portionInfo] : PortionsToRemove) {
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NKikimr::NOlap::NReader {
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult: public TNonCopyable {
private:
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>, ResourcesGuard);
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TGroupGuard>, GroupGuard);
NArrow::TShardedRecordBatch ResultBatch;

Expand Down Expand Up @@ -54,11 +54,11 @@ class TPartialReadResult: public TNonCopyable {
return ScanCursor;
}

explicit TPartialReadResult(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& resourcesGuard,
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, const NArrow::TShardedRecordBatch& batch,
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
: ResourcesGuard(std::move(resourcesGuard))
, GroupGuard(std::move(gGuard))
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
Expand All @@ -68,7 +68,7 @@ class TPartialReadResult: public TNonCopyable {

explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult(nullptr, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
} else {
gGuard = itInterval->second->GetGroupGuard();
}
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(std::move(allocationGuard), std::move(gGuard), *newBatch,
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,27 @@ class TColumnsAccumulator {
const bool sequential) {
auto actualColumns = columns - AssemblerReadyColumns;
AssemblerReadyColumns = AssemblerReadyColumns + columns;
if (!actualColumns.IsEmpty()) {
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (sequential) {
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notSequentialColumnIds.size()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
script.AddStep<TAssemblerStep>(cross, purposeId);
*actualSet = *actualSet - *cross;
}
if (!actualSet->IsEmpty()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
}
} else {
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
script.AddStep<TAssemblerStep>(actualSet, purposeId);
if (actualColumns.IsEmpty()) {
return false;
}
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (sequential) {
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notSequentialColumnIds.size()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
script.AddStep<TAssemblerStep>(cross, purposeId);
*actualSet = *actualSet - *cross;
}
return true;
if (!actualSet->IsEmpty()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
}
} else {
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
script.AddStep<TAssemblerStep>(actualSet, purposeId);
}
return false;
return true;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
while (FetchingSources.size()) {
auto frontSource = *FetchingSources.begin();
if (!frontSource->HasStageResult()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
break;
}
if (!frontSource->GetStageResult().HasResultChunk()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
break;
}
auto table = frontSource->MutableStageResult().ExtractResultChunk();
Expand All @@ -28,24 +32,32 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
sourceIdxToContinue = frontSource->GetSourceIdx();
}
if (table && table->num_rows()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
auto cursor =
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(nullptr, nullptr, table, cursor, sourceIdxToContinue));
reader.OnIntervalResult(
std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue));
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
ContinueSource(*sourceIdxToContinue);
break;
}
if (!isFinished) {
break;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
FinishedSources.emplace(*FetchingSources.begin());
frontSource->ClearResult();
FinishedSources.emplace(frontSource);
}
FetchingSources.erase(FetchingSources.begin());
while (FetchingSources.size() && FinishedSources.size()) {
auto finishedSource = *FinishedSources.begin();
auto fetchingSource = *FetchingSources.begin();
auto finishedSource = *FinishedSources.begin();
if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
FetchedCount += finishedSource->GetRecordsCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
AFL_VERIFY(FetchingPlan);
AFL_VERIFY(!Context->IsAborted());
ProcessingStarted = true;
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
TFetchingScriptCursor cursor(FetchingPlan, 0);
Expand Down Expand Up @@ -237,9 +239,7 @@ TPortionDataSource::TPortionDataSource(
portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(),
portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount())
, Portion(portion)
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion))
, SourceGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId())) {
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) {
}

} // namespace NKikimr::NOlap::NReader::NSimple
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class IDataSource: public ICursorEntity {
YDB_READONLY(bool, HasDeletions, false);
virtual NJson::TJsonValue DoDebugJson() const = 0;
std::shared_ptr<TFetchingScript> FetchingPlan;
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> ResourceGuards;
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage);
bool ProcessingStarted = false;
bool IsStartedByCursor = false;
Expand All @@ -74,7 +74,7 @@ class IDataSource: public ICursorEntity {
}

std::optional<TFetchingScriptCursor> ScriptCursor;

std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;
protected:
std::optional<bool> IsSourceInMemoryFlag;

Expand All @@ -95,11 +95,27 @@ class IDataSource: public ICursorEntity {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0;

public:
virtual ui64 GetMemoryGroupId() const = 0;
bool GetIsStartedByCursor() const {
return IsStartedByCursor;
}

const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& GetGroupGuard() const {
AFL_VERIFY(SourceGroupGuard);
return SourceGroupGuard;
}

ui64 GetMemoryGroupId() const {
AFL_VERIFY(SourceGroupGuard);
return SourceGroupGuard->GetGroupId();
}

virtual void ClearResult() {
StageData.reset();
StageResult.reset();
ResourceGuards.clear();
SourceGroupGuard = nullptr;
}

void SetIsStartedByCursor() {
IsStartedByCursor = true;
}
Expand Down Expand Up @@ -323,7 +339,6 @@ class TPortionDataSource: public IDataSource {
using TBase = IDataSource;
const TPortionInfo::TConstPtr Portion;
std::shared_ptr<ISnapshotSchema> Schema;
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;

void NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter);
Expand Down Expand Up @@ -365,10 +380,6 @@ class TPortionDataSource: public IDataSource {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;

public:
virtual ui64 GetMemoryGroupId() const override {
return SourceGroupGuard->GetGroupId();
}

virtual ui64 PredictAccessorsSize() const override {
return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std::optional<NStorageOptimizer::TOptimizationPriority> TGranulesStorage::GetCom
maxPriorityGranule = granulesSorted.front().GetGranule();
break;
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
std::pop_heap(granulesSorted.begin(), granulesSorted.end());
granulesSorted.pop_back();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask(
result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize());
}
auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
"level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString());
result->SetCheckPoints(std::move(positions));
for (auto&& i : result->GetSwitchedPortions()) {
Expand Down

0 comments on commit 34fe6db

Please sign in to comment.