Skip to content

Commit

Permalink
dry-run mode for CS normalizers (ydb-platform#11934)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored and zverevgeny committed Jan 5, 2025
1 parent 0d9074a commit a6d0227
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 18 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,7 @@ message TColumnShardConfig {
message TRepairInfo {
optional string ClassName = 1;
optional string Description = 2;
optional bool DryRun = 3;
}
repeated TRepairInfo Repairs = 15;

Expand Down
22 changes: 16 additions & 6 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
public:
TTxApplyNormalizer(TColumnShard* self, NOlap::INormalizerChanges::TPtr changes)
: TBase(self)
, IsDryRun(self->NormalizerController.GetNormalizer()->GetIsDryRun())
, Changes(changes) {
}

Expand All @@ -187,16 +188,20 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
}

private:
const bool IsDryRun;
bool NormalizerFinished = false;
NOlap::INormalizerChanges::TPtr Changes;
};

bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
return false;
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString())(
"dry_run", IsDryRun);
if (!IsDryRun) {
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
return false;
}
}

if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 0) {
Expand All @@ -211,9 +216,14 @@ void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"event", "TTxApplyNormalizer::Complete");
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
Changes->ApplyOnComplete(Self->NormalizerController);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")("details", Self->NormalizerController.DebugString())(
"size", Changes->GetSize())("dry_run", IsDryRun);
if (IsDryRun) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_changes_dry_run")(
"normalizer", Self->NormalizerController.GetNormalizer()->GetClassName())("changes", Changes->DebugString());
} else {
Changes->ApplyOnComplete(Self->NormalizerController);
}
if (!NormalizerFinished) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx);
AFL_VERIFY(component)("class_name", i.GetClassName());
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release()));
normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription());
normalizer->SetIsRepair(true).SetIsDryRun(i.GetDryRun()).SetUniqueDescription(i.GetDescription());
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class INormalizerChanges {
}

virtual ui64 GetSize() const = 0;
virtual TString DebugString() const {
return TStringBuilder() << "size=" << GetSize();
}
};

class TTrivialNormalizerTask: public INormalizerTask {
Expand Down Expand Up @@ -170,6 +173,7 @@ class TNormalizationController {
class INormalizerComponent {
private:
YDB_ACCESSOR(bool, IsRepair, false);
YDB_ACCESSOR(bool, IsDryRun, false);
YDB_ACCESSOR_DEF(TString, UniqueDescription);
YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString());

Expand Down
26 changes: 16 additions & 10 deletions ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
#include <ydb/core/tx/columnshard/tables_manager.h>

#include <util/string/vector.h>

namespace NKikimr::NOlap::NNormalizer::NBrokenBlobs {

class TNormalizerResult: public INormalizerChanges {
Expand All @@ -33,17 +35,8 @@ class TNormalizerResult: public INormalizerChanges {
copy.SaveMetaToDatabase(db);
}
if (BrokenPortions.size()) {
TStringBuilder sb;
ui64 recordsCount = 0;
sb << "path_ids:[";
for (auto&& [_, p] : BrokenPortions) {
sb << p.GetPortionInfo().GetPathId() << ",";
recordsCount += p.GetPortionInfo().GetRecordsCount();
}
sb << "];";
sb << "records_count:" << recordsCount;
NIceDb::TNiceDb db(txc.DB);
normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", sb);
normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", DebugString());
}
return true;
}
Expand All @@ -54,6 +47,19 @@ class TNormalizerResult: public INormalizerChanges {
ui64 GetSize() const override {
return BrokenPortions.size();
}

TString DebugString() const override {
TStringBuilder sb;
ui64 recordsCount = 0;
sb << "path_ids=[";
for (auto&& [_, p] : BrokenPortions) {
sb << p.GetPortionInfo().GetPathId() << ",";
recordsCount += p.GetPortionInfo().GetRecordsCount();
}
sb << "]";
sb << ";records_count=" << recordsCount;
return sb;
}
};

class TReadTask: public NOlap::NBlobOperations::NRead::ITask {
Expand Down
43 changes: 42 additions & 1 deletion ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@

#include <ydb/library/actors/core/actor.h>

#include <util/string/vector.h>

namespace NKikimr::NOlap {

class TLeakedBlobsNormalizerChanges: public INormalizerChanges {
private:
THashSet<TLogoBlobID> Leaks;
const ui64 TabletId;
NColumnShard::TBlobGroupSelector DsGroupSelector;
ui64 LeakeadBlobsSize;

public:
TLeakedBlobsNormalizerChanges(THashSet<TLogoBlobID>&& leaks, const ui64 tabletId, NColumnShard::TBlobGroupSelector dsGroupSelector)
: Leaks(std::move(leaks))
, TabletId(tabletId)
, DsGroupSelector(dsGroupSelector) {
LeakeadBlobsSize = 0;
for (const auto& blob : Leaks) {
LeakeadBlobsSize += blob.BlobSize();
}
}

bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override {
Expand All @@ -42,6 +49,18 @@ class TLeakedBlobsNormalizerChanges: public INormalizerChanges {
ui64 GetSize() const override {
return Leaks.size();
}

TString DebugString() const override {
TStringBuilder sb;
sb << "tablet=" << TabletId;
sb << ";leaked_blob_count=" << Leaks.size();
sb << ";leaked_blobs_size=" << LeakeadBlobsSize;
auto blobSampleEnd = Leaks.begin();
for (ui64 i = 0; i < 10 && blobSampleEnd != Leaks.end(); ++i, ++blobSampleEnd) {
}
sb << ";leaked_blobs=[" << JoinStrings(Leaks.begin(), blobSampleEnd, ",") << "]";
return sb;
}
};

class TRemoveLeakedBlobsActor: public TActorBootstrapped<TRemoveLeakedBlobsActor> {
Expand Down Expand Up @@ -157,7 +176,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TLeakedBlobsNormalizer::DoInit(
NIceDb::TNiceDb db(txc.DB);
const bool ready = (int)Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme()) &
(int)Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()) &
(int)Schema::Precharge<Schema::IndexIndexes>(db, txc.DB.GetScheme());
(int)Schema::Precharge<Schema::IndexIndexes>(db, txc.DB.GetScheme()) &
(int)Schema::Precharge<Schema::BlobsToDeleteWT>(db, txc.DB.GetScheme());
if (!ready) {
return TConclusionStatus::Fail("Not ready");
}
Expand Down Expand Up @@ -220,6 +240,24 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds(
}
Indexes = std::move(indexesLocal);
}
if (BlobsToDelete.empty()) {
THashSet<TUnifiedBlobId> blobsToDelete;
auto rowset = db.Table<NColumnShard::Schema::BlobsToDeleteWT>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("Not ready: BlobsToDeleteWT");
}
while (!rowset.EndOfSet()) {
const TString& blobIdStr = rowset.GetValue<NColumnShard::Schema::BlobsToDeleteWT::BlobId>();
TString error;
TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString(blobIdStr, &DsGroupSelector, error);
AFL_VERIFY(blobId.IsValid())("event", "cannot_parse_blob")("error", error)("original_string", blobIdStr);
blobsToDelete.emplace(blobId);
if (!rowset.Next()) {
return TConclusionStatus::Fail("Local table is not loaded: BlobsToDeleteWT");
}
}
BlobsToDelete = std::move(blobsToDelete);
}
AFL_VERIFY(Portions.size() == Records.size())("portions", Portions.size())("records", Records.size());
THashSet<TLogoBlobID> resultLocal;
for (auto&& i : Portions) {
Expand All @@ -241,6 +279,9 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds(
for (auto&& c : it->second) {
resultLocal.emplace(c.GetLogoBlobId());
}
for (const auto& c : BlobsToDelete) {
resultLocal.emplace(c.GetLogoBlobId());
}
}
std::swap(resultLocal, result);
return TConclusionStatus::Success();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ class TLeakedBlobsNormalizer: public TNormalizationController::INormalizerCompon
THashMap<ui64, TPortionInfoConstructor> Portions;
THashMap<ui64, std::vector<TColumnChunkLoadContextV1>> Records;
THashMap<ui64, std::vector<TIndexChunkLoadContext>> Indexes;
THashSet<TUnifiedBlobId> BlobsToDelete;
};
} // namespace NKikimr::NOlap

0 comments on commit a6d0227

Please sign in to comment.