Skip to content

Commit

Permalink
memory control correction
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Nov 27, 2024
1 parent 8793419 commit b8e882a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NKikimr::NOlap::NReader {
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult: public TNonCopyable {
private:
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>, ResourcesGuard);
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TGroupGuard>, GroupGuard);
NArrow::TShardedRecordBatch ResultBatch;

Expand Down Expand Up @@ -54,11 +54,11 @@ class TPartialReadResult: public TNonCopyable {
return ScanCursor;
}

explicit TPartialReadResult(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& resourcesGuard,
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, const NArrow::TShardedRecordBatch& batch,
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
: ResourcesGuard(std::move(resourcesGuard))
, GroupGuard(std::move(gGuard))
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
Expand All @@ -68,7 +68,7 @@ class TPartialReadResult: public TNonCopyable {

explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult(nullptr, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
} else {
gGuard = itInterval->second->GetGroupGuard();
}
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(std::move(allocationGuard), std::move(gGuard), *newBatch,
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,27 @@ class TColumnsAccumulator {
const bool sequential) {
auto actualColumns = columns - AssemblerReadyColumns;
AssemblerReadyColumns = AssemblerReadyColumns + columns;
if (!actualColumns.IsEmpty()) {
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (sequential) {
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notSequentialColumnIds.size()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
script.AddStep<TAssemblerStep>(cross, purposeId);
*actualSet = *actualSet - *cross;
}
if (!actualSet->IsEmpty()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
}
} else {
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
script.AddStep<TAssemblerStep>(actualSet, purposeId);
if (actualColumns.IsEmpty()) {
return false;
}
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (sequential) {
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notSequentialColumnIds.size()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
script.AddStep<TAssemblerStep>(cross, purposeId);
*actualSet = *actualSet - *cross;
}
return true;
if (!actualSet->IsEmpty()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
}
} else {
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
script.AddStep<TAssemblerStep>(actualSet, purposeId);
}
return false;
return true;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
auto cursor =
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(nullptr, nullptr, table, cursor, sourceIdxToContinue));
reader.OnIntervalResult(
std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue));
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class IDataSource: public ICursorEntity {
YDB_READONLY(bool, HasDeletions, false);
virtual NJson::TJsonValue DoDebugJson() const = 0;
std::shared_ptr<TFetchingScript> FetchingPlan;
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> ResourceGuards;
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage);
bool ProcessingStarted = false;
bool IsStartedByCursor = false;
Expand Down Expand Up @@ -99,6 +99,11 @@ class IDataSource: public ICursorEntity {
return IsStartedByCursor;
}

const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& GetGroupGuard() const {
AFL_VERIFY(SourceGroupGuard);
return SourceGroupGuard;
}

ui64 GetMemoryGroupId() const {
AFL_VERIFY(SourceGroupGuard);
return SourceGroupGuard->GetGroupId();
Expand Down

0 comments on commit b8e882a

Please sign in to comment.