diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index cb214d7505af..6fbfa72e279c 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -23,11 +23,30 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } }; + class TTableKey { + public: + ui64 PathId; + ui64 Step; + ui64 TxId; + ui64 Version; + + public: + TTableKey(ui64 pathId, ui64 step, ui64 txId, ui64 version) + : PathId(pathId) + , Step(step) + , TxId(txId) + , Version(version) + { + } + }; + std::vector VersionsToRemove; + std::vector TableVersionsToRemove; public: - TNormalizerResult(std::vector&& versions) + TNormalizerResult(std::vector&& versions, std::vector&& tableVersions) : VersionsToRemove(versions) + , TableVersionsToRemove(tableVersions) { } @@ -35,8 +54,13 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); for (auto& key: VersionsToRemove) { + LOG_S_DEBUG("Removing schema version in TSchemaVersionNormalizer " << key.Version); db.Table().Key(key.Id, key.Step, key.TxId).Delete(); } + for (auto& key: TableVersionsToRemove) { + LOG_S_DEBUG("Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId); + db.Table().Key(key.PathId, key.Step, key.TxId).Delete(); + } return true; } @@ -78,6 +102,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } std::vector unusedSchemaIds; + std::vector unusedTableSchemaIds; std::optional maxVersion; std::vector changes; @@ -107,18 +132,57 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return std::nullopt; + } + + while (!rowset.EndOfSet()) { + const ui64 pathId = rowset.GetValue(); + + NKikimrTxColumnShard::TTableVersionInfo versionInfo; + Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue())); + if (versionInfo.HasSchema()) { + ui64 version = versionInfo.GetSchema().GetVersion(); + if (!usedSchemaVersions.contains(version)) { + unusedTableSchemaIds.emplace_back(pathId, rowset.GetValue(), rowset.GetValue(), version); + } + } + + if (!rowset.Next()) { + return std::nullopt; + } + } + } + + std::vector tablePortion; std::vector portion; + tablePortion.reserve(10000); portion.reserve(10000); + auto addPortion = [&]() { + if (portion.size() + tablePortion.size() >= 10000) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); + portion = std::vector(); + tablePortion = std::vector(); + } + }; for (const auto& id: unusedSchemaIds) { if (!maxVersion.has_value() || (id.Version != *maxVersion)) { portion.push_back(id); - if (portion.size() >= 10000) { - changes.emplace_back(std::make_shared(std::move(portion))); - } + addPortion(); + } + } + + for (const auto& id: unusedTableSchemaIds) { + if (!maxVersion.has_value() || (id.Version != *maxVersion)) { + tablePortion.push_back(id); + addPortion(); } } - if (portion.size() > 0) { - changes.emplace_back(std::make_shared(std::move(portion))); + + if (portion.size() + tablePortion.size() > 0) { + changes.emplace_back(std::make_shared(std::move(portion), std::move(tablePortion))); } return changes; } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index d941d548414c..9c6d6607ee98 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -176,6 +176,12 @@ class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier { Y_ABORT_UNLESS(info.SerializeToString(&serialized)); db.Table().Key(11, 1, 1).Update(NIceDb::TUpdate(serialized)); + // Add invalid widow table version, if SchemaVersionCleaner will not erase it, then test will fail + NKikimrTxColumnShard::TTableVersionInfo versionInfo; + versionInfo.MutableSchema()->SetVersion(minVersion - 1); + Y_ABORT_UNLESS(versionInfo.SerializeToString(&serialized)); + db.Table().Key(1, 1, 1).Update(NIceDb::TUpdate(serialized)); + db.Table().Key(10).Update(NIceDb::TUpdate("default")); }