Skip to content

Commit

Permalink
dont reallocate memory after merge through incorrect memory consumpti… (
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 15, 2024
1 parent b40adfb commit 0fddef1
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ std::unique_ptr<NArrow::NMerger::TMergePartialStream> TSpecialReadContext::Build
ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_ptr<IDataSource>>& sources) {
ui64 result = 0;
for (auto&& i : sources) {
auto fetchingPlan = GetColumnsFetchingPlan(i.second);
AFL_VERIFY(i.second->GetIntervalsCount());
const ui64 sourceMemory = std::max<ui64>(1, i.second->GetResourceGuardsMemory() / i.second->GetIntervalsCount());
result += sourceMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<ID
}
size += sizeLocal;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TAllocateMemoryStep::DoExecuteInplace")("source", source->GetSourceIdx())("memory", size);

auto allocation = std::make_shared<TFetchingStepAllocation>(source, size, step, StageIndex);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ void TFetchingInterval::ConstructResult() {
if (ready != WaitSourcesCount) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_construct_result")("interval_idx", IntervalIdx)(
"count", WaitSourcesCount)("ready", ready)("interval_id", GetIntervalId());
return;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx)(
"interval_id", GetIntervalId());
}
if (AtomicCas(&SourcesFinalized, 1, 0)) {
} else if (AtomicCas(&SourcesFinalized, 1, 0)) {
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitMergerStart);

MergingContext->SetIntervalChunkMemory(Context->GetMemoryForSources(Sources));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx)(
"interval_id", GetIntervalId())("memory", MergingContext->GetIntervalChunkMemory())("count", WaitSourcesCount);

auto task = std::make_shared<TStartMergeTask>(MergingContext, Context, std::move(Sources));
task->SetPriority(NConveyor::ITask::EPriority::High);
Expand Down Expand Up @@ -81,6 +78,8 @@ void TFetchingInterval::OnPartSendingComplete() {
}
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitMergerContinue);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "continue_construct_result")("interval_idx", IntervalIdx)(
"interval_id", GetIntervalId())("memory", MergingContext->GetIntervalChunkMemory())("count", WaitSourcesCount);
auto task = std::make_shared<TContinueMergeTask>(MergingContext, Context, std::move(Merger));
task->SetPriority(NConveyor::ITask::EPriority::High);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ void TBaseMergeTask::PrepareResultBatch() {
LastPK = nullptr;
return;
}
const ui64 dataSizeBefore = NArrow::GetTableDataSize(ResultBatch);
const ui64 memorySizeBefore = NArrow::GetTableMemorySize(ResultBatch);
{
ResultBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector());
AFL_VERIFY((ui32)ResultBatch->num_columns() == Context->GetProgramInputColumns()->GetColumnNamesVector().size());
Expand All @@ -45,7 +47,10 @@ void TBaseMergeTask::PrepareResultBatch() {
} else {
ShardedBatch = NArrow::TShardedRecordBatch(ResultBatch);
}
AllocationGuard->Update(NArrow::GetTableMemorySize(ResultBatch));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "update_memory_merger")("before_data", dataSizeBefore)(
"before_memory", memorySizeBefore)("after_memory", NArrow::GetTableMemorySize(ResultBatch))(
"after_data", NArrow::GetTableDataSize(ResultBatch))("guard", AllocationGuard->GetMemory());
// AllocationGuard->Update(NArrow::GetTableMemorySize(ResultBatch));
AFL_VERIFY(!!LastPK == !!ShardedBatch->GetRecordsCount())("lpk", !!LastPK)("sb", ShardedBatch->GetRecordsCount());
} else {
AllocationGuard = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/usage/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,13 @@ class TStageFeatures {
Waiting.Sub(volume);
if (HardLimit < Usage.Val() + volume) {
Counters->OnCannotAllocate();
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "cannot_allocate")("limit", HardLimit)(
"usage", Usage.Val())(
"delta", volume);
return TConclusionStatus::Fail(TStringBuilder() << Name << "::(limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ");");
}
Usage.Add(volume);
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "allocate")("usage", Usage.Val())("delta", volume);
if (Counters) {
Counters->Add(volume, true);
Counters->Sub(volume, false);
Expand All @@ -155,6 +159,7 @@ class TStageFeatures {
} else {
Waiting.Sub(volume);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "free")("usage", Usage.Val())("delta", volume);

if (withOwner && Owner) {
Owner->Free(volume, allocated);
Expand All @@ -166,6 +171,8 @@ class TStageFeatures {
Counters->Sub(from, allocated);
Counters->Add(to, allocated);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())(
"allocated", allocated)("from", from)("to", to);
if (allocated) {
Usage.Sub(from);
Usage.Add(to);
Expand Down

0 comments on commit 0fddef1

Please sign in to comment.