From 4104588bfcf962b5cb1bcf0f39860b1308ce85dc Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Thu, 3 Oct 2024 14:59:38 +0000 Subject: [PATCH 1/3] Remove unused table versions along with schema versions in TSchemaVersionNormalizer --- .../normalizer/schema_version/version.cpp | 71 +++++++++++++++++-- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index cb214d7505af..4804fc5da6a3 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -23,11 +23,30 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } }; + class TTableKey { + public: + ui64 PathId; + ui64 Step; + ui64 TxId; + ui64 Version; + + public: + TTableKey(ui64 pathId, ui64 step, ui64 txId, ui64 version) + : PathId(pathId) + , Step(step) + , TxId(txId) + , Version(version) + { + } + }; + std::vector VersionsToRemove; + std::vector TableVersionsToRemove; public: - TNormalizerResult(std::vector&& versions) + TNormalizerResult(std::vector&& versions, std::vector&& tableVersions) : VersionsToRemove(versions) + , TableVersionsToRemove(tableVersions) { } @@ -35,8 +54,13 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); for (auto& key: VersionsToRemove) { + LOG_S_DEBUG("Removing schema version in TSchemaVersionNormalizer " << key.Version); db.Table().Key(key.Id, key.Step, key.TxId).Delete(); } + for (auto& key: TableVersionsToRemove) { + LOG_S_DEBUG("Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId); + db.Table().Key(key.PathId, key.Step, key.TxId).Delete(); + } return true; } @@ -78,6 +102,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } std::vector unusedSchemaIds; + std::vector unusedTableSchemaIds; std::optional maxVersion; std::vector changes; @@ -107,18 +132,54 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return std::nullopt; + } + + while (!rowset.EndOfSet()) { + const ui64 pathId = rowset.GetValue(); + + NKikimrTxColumnShard::TTableVersionInfo versionInfo; + Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue())); + if (versionInfo.HasSchema()) { + ui64 version = versionInfo.GetSchema().GetVersion(); + if (!usedSchemaVersions.contains(version)) { + unusedTableSchemaIds.emplace_back(pathId, rowset.GetValue(), rowset.GetValue(), version); + } + } + + if (!rowset.Next()) { + return std::nullopt; + } + } + } + + std::vector tablePortion; std::vector portion; + tablePortion.reserve(10000); portion.reserve(10000); for (const auto& id: unusedSchemaIds) { if (!maxVersion.has_value() || (id.Version != *maxVersion)) { portion.push_back(id); - if (portion.size() >= 10000) { - changes.emplace_back(std::make_shared(std::move(portion))); + if (portion.size() + tablePortion.size() >= 10000) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); } } } - if (portion.size() > 0) { - changes.emplace_back(std::make_shared(std::move(portion))); + + for (const auto& id: unusedTableSchemaIds) { + if (!maxVersion.has_value() || (id.Version != *maxVersion)) { + tablePortion.push_back(id); + if (portion.size() + tablePortion.size() >= 10000) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); + } + } + } + + if (portion.size() + tablePortion.size() > 0) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); } return changes; } From 1d66f026e4fef511569e4dec14b91cfe57e558b2 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Fri, 4 Oct 2024 07:57:44 +0000 Subject: [PATCH 2/3] Added test --- ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index d941d548414c..9c6d6607ee98 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -176,6 +176,12 @@ class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier { Y_ABORT_UNLESS(info.SerializeToString(&serialized)); db.Table().Key(11, 1, 1).Update(NIceDb::TUpdate(serialized)); + // Add invalid widow table version, if SchemaVersionCleaner will not erase it, then test will fail + NKikimrTxColumnShard::TTableVersionInfo versionInfo; + versionInfo.MutableSchema()->SetVersion(minVersion - 1); + Y_ABORT_UNLESS(versionInfo.SerializeToString(&serialized)); + db.Table().Key(1, 1, 1).Update(NIceDb::TUpdate(serialized)); + db.Table().Key(10).Update(NIceDb::TUpdate("default")); } From 49e9b4796edcce724bff4f96b9bbb769e8c49e01 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 8 Oct 2024 16:03:34 +0000 Subject: [PATCH 3/3] Moved repeated code to lambda --- .../normalizer/schema_version/version.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index 4804fc5da6a3..6fbfa72e279c 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -160,21 +160,24 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { std::vector portion; tablePortion.reserve(10000); portion.reserve(10000); + auto addPortion = [&]() { + if (portion.size() + tablePortion.size() >= 10000) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); + portion = std::vector(); + tablePortion = std::vector(); + } + }; for (const auto& id: unusedSchemaIds) { if (!maxVersion.has_value() || (id.Version != *maxVersion)) { portion.push_back(id); - if (portion.size() + tablePortion.size() >= 10000) { - changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); - } + addPortion(); } } for (const auto& id: unusedTableSchemaIds) { if (!maxVersion.has_value() || (id.Version != *maxVersion)) { tablePortion.push_back(id); - if (portion.size() + tablePortion.size() >= 10000) { - changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); - } + addPortion(); } }