Skip to content

Commit

Permalink
correct and speed up compaction (ydb-platform#10867)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 7798d5a commit f3b0f5d
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 40 deletions.
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);

// Y_ABORT_UNLESS(SwitchedPortions.size());
THashMap<TString, THashSet<TBlobRange>> blobRanges;
auto& index = self.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
for (const auto& p : SwitchedPortions) {
Expand Down
39 changes: 24 additions & 15 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,26 +237,35 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
}

ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) {
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16));
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes();
++PortionsCount;
THashMap<ui32, ui64> maxChunkSizeByColumn;
auto it = MaxMemoryByColumnChunk.begin();
SumMemoryDelta = 0;
const auto advanceIterator = [&](const ui32 columnId, const ui64 maxColumnChunkRawBytes) {
while (it != MaxMemoryByColumnChunk.end() && it->ColumnId < columnId) {
++it;
}
if (it == MaxMemoryByColumnChunk.end() || columnId < it->ColumnId) {
it = MaxMemoryByColumnChunk.insert(it, TColumnInfo(columnId));
}
it->MemoryUsage += maxColumnChunkRawBytes;
SumMemoryDelta = std::max(SumMemoryDelta, it->MemoryUsage);
};
ui32 columnId = 0;
ui64 maxChunkSize = 0;
for (auto&& i : portionInfo.GetRecords()) {
SumMemoryFix += i.BlobRange.Size;
auto it = maxChunkSizeByColumn.find(i.GetColumnId());
if (it == maxChunkSizeByColumn.end()) {
maxChunkSizeByColumn.emplace(i.GetColumnId(), i.GetMeta().GetRawBytes());
} else {
if (it->second < i.GetMeta().GetRawBytes()) {
it->second = i.GetMeta().GetRawBytes();
if (columnId != i.GetColumnId()) {
if (columnId) {
advanceIterator(columnId, maxChunkSize);
}
columnId = i.GetColumnId();
maxChunkSize = 0;
}
if (maxChunkSize < i.GetMeta().GetRawBytes()) {
maxChunkSize = i.GetMeta().GetRawBytes();
}
}

SumMemoryDelta = 0;
for (auto&& i : maxChunkSizeByColumn) {
MaxMemoryByColumnChunk[i.first] += i.second;
SumMemoryDelta = std::max(SumMemoryDelta, MaxMemoryByColumnChunk[i.first]);
}
advanceIterator(columnId, maxChunkSize);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)(
"portion_info", portionInfo.DebugString());
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,17 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
ui64 SumMemoryDelta = 0;
ui64 SumMemoryFix = 0;
ui32 PortionsCount = 0;
THashMap<ui32, ui64> MaxMemoryByColumnChunk;
class TColumnInfo {
public:
const ui32 ColumnId;
ui64 MemoryUsage = 0;
TColumnInfo(const ui32 columnId)
: ColumnId(columnId)
{

}
};
std::list<TColumnInfo> MaxMemoryByColumnChunk;

public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
}

void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size() + AppendedPortions.size());
for (auto&& i : AppendedPortions) {
i.GetPortionConstructor().SetPortionId(context.NextPortionId());
i.GetPortionConstructor().MutableMeta().SetCompactionLevel(TargetCompactionLevel.value_or(0));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
}

result.Indexes = std::move(Indexes);
result.Indexes.shrink_to_fit();
result.Records = std::move(Records);
result.Records.shrink_to_fit();
result.BlobIds = std::move(BlobIds);
result.BlobIds.shrink_to_fit();
result.Precalculate();
return result;
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/chunks/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class TChunkPreparation: public IPortionColumnChunk {
, Record(address, column)
, ColumnInfo(columnInfo) {
Y_ABORT_UNLESS(column->GetRecordsCount());
First = column->GetScalar(0);
Last = column->GetScalar(column->GetRecordsCount() - 1);
if (ColumnInfo.GetPKColumnIndex()) {
First = column->GetScalar(0);
Last = column->GetScalar(column->GetRecordsCount() - 1);
}
Record.BlobRange.Size = data.size();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TCompactionTaskData {
StopSeparation = point;
}

std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 levelIdx) const {
std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 /*levelIdx*/) const {
std::vector<std::shared_ptr<TPortionInfo>> result;
if (MemoryUsage > ((ui64)1 << 30)) {
auto predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor();
Expand All @@ -154,7 +154,7 @@ class TCompactionTaskData {
}
}
return result;
} else if (levelIdx == 0) {
} else {
return Portions;
}
auto moveIds = GetMovePortionIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ TOptimizerPlanner::TOptimizerPlanner(
const ui64 maxPortionBlobBytes = (ui64)1 << 20;
Levels.emplace_back(
std::make_shared<TLevelPortions>(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2)));
Levels.emplace_back(
std::make_shared<TLevelPortions>(1, 0.1, maxPortionBlobBytes, Levels.back(), PortionsInfo, Counters->GetLevelCounters(1)));
*/
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, nullptr, Counters->GetLevelCounters(2)));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0)));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max()));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max()));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180)));
std::reverse(Levels.begin(), Levels.end());
RefreshWeights();
}
Expand All @@ -34,14 +33,14 @@ std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TOptimizerPlanner::DoGetOp
auto data = level->GetOptimizationTask();
TSaverContext saverContext(StoragesManager);
std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result;
if (level->GetLevelId() == 0) {
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
} else {
// if (level->GetLevelId() == 0) {
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
result->AddMovePortions(data.GetMovePortions());
}
// } else {
// result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
// granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
// result->AddMovePortions(data.GetMovePortions());
// }
result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
auto levelPortions = std::dynamic_pointer_cast<TLevelPortions>(Levels[data.GetTargetCompactionLevel()]);
if (levelPortions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
if (!NextLevel || Portions.size() < 10) {
return 0;
}
if (TInstant::Now() - *PredOptimization < TDuration::Seconds(180)) {
if (PortionsInfo.GetCount() <= 100 || PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
return 0;
}
} else {
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (512 << 10)) {
if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) {
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
return 0;
}
}

THashSet<ui64> portionIds;
const ui64 affectedRawBytes =
NextLevel->GetAffectedPortionBytes(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
/*
THashSet<ui64> portionIds;
auto chain =
targetLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
ui64 affectedRawBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TZeroLevelPortions: public IPortionsLevel {
private:
using TBase = IPortionsLevel;
const TLevelCounters LevelCounters;
const TDuration DurationToDrop;
class TOrderedPortion {
private:
YDB_READONLY_DEF(std::shared_ptr<TPortionInfo>, Portion);
Expand Down Expand Up @@ -87,9 +88,11 @@ class TZeroLevelPortions: public IPortionsLevel {
virtual TCompactionTaskData DoGetOptimizationTask() const override;

public:
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters)
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop)
: TBase(levelIdx, nextLevel)
, LevelCounters(levelCounters) {
, LevelCounters(levelCounters)
, DurationToDrop(durationToDrop)
{
}
};

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,16 +434,22 @@ namespace NKikimr::NColumnShard {
NOlap::TIndexInfo BuildTableInfo(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
const std::vector<NArrow::NTest::TTestColumn>& key) {
THashMap<ui32, NTable::TColumn> columns;
THashMap<TString, NTable::TColumn*> columnByName;
for (ui32 i = 0; i < ydbSchema.size(); ++i) {
ui32 id = i + 1;
auto& name = ydbSchema[i].GetName();
auto& type = ydbSchema[i].GetType();

columns[id] = NTable::TColumn(name, id, type, "");
AFL_VERIFY(columnByName.emplace(name, &columns[id]).second);
}

std::vector<TString> pkNames;
ui32 idx = 0;
for (const auto& c : key) {
auto it = columnByName.find(c.GetName());
AFL_VERIFY(it != columnByName.end());
it->second->KeyOrder = idx++;
pkNames.push_back(c.GetName());
}
return NOlap::TIndexInfo::BuildDefault(NOlap::TTestStoragesManager::GetInstance(), columns, pkNames);
Expand Down

0 comments on commit f3b0f5d

Please sign in to comment.