Skip to content

Commit

Permalink
Merge d55b3c2 into 6b225ec
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 26, 2024
2 parents 6b225ec + d55b3c2 commit 18f84a6
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
break;
}
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());

for (auto& [_, portionInfo] : PortionsToRemove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
while (FetchingSources.size()) {
auto frontSource = *FetchingSources.begin();
if (!frontSource->HasStageResult()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
break;
}
if (!frontSource->GetStageResult().HasResultChunk()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
break;
}
auto table = frontSource->MutableStageResult().ExtractResultChunk();
Expand All @@ -28,24 +32,31 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
sourceIdxToContinue = frontSource->GetSourceIdx();
}
if (table && table->num_rows()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
auto cursor =
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(nullptr, nullptr, table, cursor, sourceIdxToContinue));
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
ContinueSource(*sourceIdxToContinue);
break;
}
if (!isFinished) {
break;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
FinishedSources.emplace(*FetchingSources.begin());
frontSource->ClearResult();
FinishedSources.emplace(frontSource);
}
FetchingSources.erase(FetchingSources.begin());
while (FetchingSources.size() && FinishedSources.size()) {
auto finishedSource = *FinishedSources.begin();
auto fetchingSource = *FetchingSources.begin();
auto finishedSource = *FinishedSources.begin();
if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
FetchedCount += finishedSource->GetRecordsCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
AFL_VERIFY(FetchingPlan);
AFL_VERIFY(!Context->IsAborted());
ProcessingStarted = true;
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
TFetchingScriptCursor cursor(FetchingPlan, 0);
Expand Down Expand Up @@ -237,9 +239,7 @@ TPortionDataSource::TPortionDataSource(
portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(),
portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount())
, Portion(portion)
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion))
, SourceGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId())) {
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) {
}

} // namespace NKikimr::NOlap::NReader::NSimple
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class IDataSource: public ICursorEntity {
}

std::optional<TFetchingScriptCursor> ScriptCursor;

std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;
protected:
std::optional<bool> IsSourceInMemoryFlag;

Expand All @@ -95,11 +95,22 @@ class IDataSource: public ICursorEntity {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0;

public:
virtual ui64 GetMemoryGroupId() const = 0;
bool GetIsStartedByCursor() const {
return IsStartedByCursor;
}

ui64 GetMemoryGroupId() const {
AFL_VERIFY(SourceGroupGuard);
return SourceGroupGuard->GetGroupId();
}

virtual void ClearResult() {
StageData.reset();
StageResult.reset();
ResourceGuards.clear();
SourceGroupGuard = nullptr;
}

void SetIsStartedByCursor() {
IsStartedByCursor = true;
}
Expand Down Expand Up @@ -323,7 +334,6 @@ class TPortionDataSource: public IDataSource {
using TBase = IDataSource;
const TPortionInfo::TConstPtr Portion;
std::shared_ptr<ISnapshotSchema> Schema;
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;

void NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter);
Expand Down Expand Up @@ -365,10 +375,6 @@ class TPortionDataSource: public IDataSource {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;

public:
virtual ui64 GetMemoryGroupId() const override {
return SourceGroupGuard->GetGroupId();
}

virtual ui64 PredictAccessorsSize() const override {
return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std::optional<NStorageOptimizer::TOptimizationPriority> TGranulesStorage::GetCom
maxPriorityGranule = granulesSorted.front().GetGranule();
break;
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
std::pop_heap(granulesSorted.begin(), granulesSorted.end());
granulesSorted.pop_back();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask(
result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize());
}
auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
"level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString());
result->SetCheckPoints(std::move(positions));
for (auto&& i : result->GetSwitchedPortions()) {
Expand Down

0 comments on commit 18f84a6

Please sign in to comment.