diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f3799a244b94..bb13b2d17850 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1685,6 +1685,7 @@ message TColumnShardConfig { message TRepairInfo { optional string ClassName = 1; optional string Description = 2; + optional bool DryRun = 3; } repeated TRepairInfo Repairs = 15; diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 57ed8eaa0ecd..ace229ac6ff1 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -177,6 +177,7 @@ class TTxApplyNormalizer: public TTransactionBase { public: TTxApplyNormalizer(TColumnShard* self, NOlap::INormalizerChanges::TPtr changes) : TBase(self) + , IsDryRun(self->NormalizerController.GetNormalizer()->GetIsDryRun()) , Changes(changes) { } @@ -187,6 +188,7 @@ class TTxApplyNormalizer: public TTransactionBase { } private: + const bool IsDryRun; bool NormalizerFinished = false; NOlap::INormalizerChanges::TPtr Changes; }; @@ -194,9 +196,12 @@ class TTxApplyNormalizer: public TTransactionBase { bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute"); - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString()); - if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) { - return false; + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString())( + "dry_run", IsDryRun); + if (!IsDryRun) { + if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) { + return false; + } } if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 0) { @@ -211,9 +216,14 @@ void TTxApplyNormalizer::Complete(const TActorContext& ctx) { NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())( "event", "TTxApplyNormalizer::Complete"); AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString()); - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")( - "details", Self->NormalizerController.DebugString())("size", Changes->GetSize()); - Changes->ApplyOnComplete(Self->NormalizerController); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")("details", Self->NormalizerController.DebugString())( + "size", Changes->GetSize())("dry_run", IsDryRun); + if (IsDryRun) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_changes_dry_run")( + "normalizer", Self->NormalizerController.GetNormalizer()->GetClassName())("changes", Changes->DebugString()); + } else { + Changes->ApplyOnComplete(Self->NormalizerController); + } if (!NormalizerFinished) { return; } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index a201ed881a67..7ca284d9d8d5 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -77,7 +77,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx); AFL_VERIFY(component)("class_name", i.GetClassName()); auto normalizer = RegisterNormalizer(std::shared_ptr(component.Release())); - normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription()); + normalizer->SetIsRepair(true).SetIsDryRun(i.GetDryRun()).SetUniqueDescription(i.GetDescription()); } } } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index d0629298e59f..636177db8f74 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -106,6 +106,9 @@ class INormalizerChanges { } virtual ui64 GetSize() const = 0; + virtual TString DebugString() const { + return TStringBuilder() << "size=" << GetSize(); + } }; class TTrivialNormalizerTask: public INormalizerTask { @@ -170,6 +173,7 @@ class TNormalizationController { class INormalizerComponent { private: YDB_ACCESSOR(bool, IsRepair, false); + YDB_ACCESSOR(bool, IsDryRun, false); YDB_ACCESSOR_DEF(TString, UniqueDescription); YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString()); diff --git a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp index 9fd03a681c46..86cea4553988 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace NKikimr::NOlap::NNormalizer::NBrokenBlobs { class TNormalizerResult: public INormalizerChanges { @@ -33,17 +35,8 @@ class TNormalizerResult: public INormalizerChanges { copy.SaveMetaToDatabase(db); } if (BrokenPortions.size()) { - TStringBuilder sb; - ui64 recordsCount = 0; - sb << "path_ids:["; - for (auto&& [_, p] : BrokenPortions) { - sb << p.GetPortionInfo().GetPathId() << ","; - recordsCount += p.GetPortionInfo().GetRecordsCount(); - } - sb << "];"; - sb << "records_count:" << recordsCount; NIceDb::TNiceDb db(txc.DB); - normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", sb); + normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", DebugString()); } return true; } @@ -54,6 +47,19 @@ class TNormalizerResult: public INormalizerChanges { ui64 GetSize() const override { return BrokenPortions.size(); } + + TString DebugString() const override { + TStringBuilder sb; + ui64 recordsCount = 0; + sb << "path_ids=["; + for (auto&& [_, p] : BrokenPortions) { + sb << p.GetPortionInfo().GetPathId() << ","; + recordsCount += p.GetPortionInfo().GetRecordsCount(); + } + sb << "]"; + sb << ";records_count=" << recordsCount; + return sb; + } }; class TReadTask: public NOlap::NBlobOperations::NRead::ITask { diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp index b51d8c43c6db..d8e0d5f48428 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -9,6 +9,8 @@ #include +#include + namespace NKikimr::NOlap { class TLeakedBlobsNormalizerChanges: public INormalizerChanges { @@ -16,12 +18,17 @@ class TLeakedBlobsNormalizerChanges: public INormalizerChanges { THashSet Leaks; const ui64 TabletId; NColumnShard::TBlobGroupSelector DsGroupSelector; + ui64 LeakeadBlobsSize; public: TLeakedBlobsNormalizerChanges(THashSet&& leaks, const ui64 tabletId, NColumnShard::TBlobGroupSelector dsGroupSelector) : Leaks(std::move(leaks)) , TabletId(tabletId) , DsGroupSelector(dsGroupSelector) { + LeakeadBlobsSize = 0; + for (const auto& blob : Leaks) { + LeakeadBlobsSize += blob.BlobSize(); + } } bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override { @@ -42,6 +49,18 @@ class TLeakedBlobsNormalizerChanges: public INormalizerChanges { ui64 GetSize() const override { return Leaks.size(); } + + TString DebugString() const override { + TStringBuilder sb; + sb << "tablet=" << TabletId; + sb << ";leaked_blob_count=" << Leaks.size(); + sb << ";leaked_blobs_size=" << LeakeadBlobsSize; + auto blobSampleEnd = Leaks.begin(); + for (ui64 i = 0; i < 10 && blobSampleEnd != Leaks.end(); ++i, ++blobSampleEnd) { + } + sb << ";leaked_blobs=[" << JoinStrings(Leaks.begin(), blobSampleEnd, ",") << "]"; + return sb; + } }; class TRemoveLeakedBlobsActor: public TActorBootstrapped { @@ -156,7 +175,8 @@ TConclusion> TLeakedBlobsNormalizer::DoInit( NIceDb::TNiceDb db(txc.DB); const bool ready = (int)Schema::Precharge(db, txc.DB.GetScheme()) & (int)Schema::Precharge(db, txc.DB.GetScheme()) & - (int)Schema::Precharge(db, txc.DB.GetScheme()); + (int)Schema::Precharge(db, txc.DB.GetScheme()) & + (int)Schema::Precharge(db, txc.DB.GetScheme()); if (!ready) { return TConclusionStatus::Fail("Not ready"); } @@ -219,6 +239,24 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds( } Indexes = std::move(indexesLocal); } + if (BlobsToDelete.empty()) { + THashSet blobsToDelete; + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready: BlobsToDeleteWT"); + } + while (!rowset.EndOfSet()) { + const TString& blobIdStr = rowset.GetValue(); + TString error; + TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString(blobIdStr, &DsGroupSelector, error); + AFL_VERIFY(blobId.IsValid())("event", "cannot_parse_blob")("error", error)("original_string", blobIdStr); + blobsToDelete.emplace(blobId); + if (!rowset.Next()) { + return TConclusionStatus::Fail("Local table is not loaded: BlobsToDeleteWT"); + } + } + BlobsToDelete = std::move(blobsToDelete); + } AFL_VERIFY(Portions.size() == Records.size())("portions", Portions.size())("records", Records.size()); THashSet resultLocal; for (auto&& i : Portions) { @@ -240,6 +278,9 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds( for (auto&& c : it->second) { resultLocal.emplace(c.GetLogoBlobId()); } + for (const auto& c : BlobsToDelete) { + resultLocal.emplace(c.GetLogoBlobId()); + } } std::swap(resultLocal, result); return TConclusionStatus::Success(); diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h index 4545ea3b605b..1a4617e2cd67 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h @@ -45,5 +45,6 @@ class TLeakedBlobsNormalizer: public TNormalizationController::INormalizerCompon THashMap Portions; THashMap> Records; THashMap> Indexes; + THashSet BlobsToDelete; }; } // namespace NKikimr::NOlap