From f84e6a266436edc843a6bfd951f26f5bd8e6340c Mon Sep 17 00:00:00 2001 From: Artem Alekseev Date: Tue, 17 Sep 2024 15:03:38 +0300 Subject: [PATCH] Fix partitioning empty tables --- ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp | 14 +++++++++++++- .../data_sharing/common/session/common.cpp | 5 ++--- .../data_sharing/manager/sessions.cpp | 4 ++++ .../data_sharing/source/session/cursor.cpp | 16 +++++++++++----- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp index ea97c44484f3..d4cfb8d85f10 100644 --- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp +++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp @@ -411,7 +411,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { public: TAsyncReshardingTest() { - TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4); + TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4); } void AddBatch(int numRows) { @@ -475,6 +475,18 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { tester.CheckCount(); } + Y_UNIT_TEST(SplitEmpty) { + TAsyncReshardingTest tester; + + tester.CheckCount(); + + tester.StartResharding("SPLIT"); + + tester.CheckCount(); + tester.WaitResharding(); + tester.CheckCount(); + } + Y_UNIT_TEST(ChangeSchemaAndSplit) { TAsyncReshardingTest tester; tester.DisableCompaction(); diff --git a/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp b/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp index eb95cc36711a..ae1ec4e63fb1 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp +++ b/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp @@ -22,13 +22,12 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) { THashMap>> portionsByPath; THashSet StoragesIds; for (auto&& i : GetPathIdsForStart()) { - auto& portionsVector = portionsByPath[i]; const auto& g = index.GetGranuleVerified(i); for (auto&& p : g.GetPortionsOlderThenSnapshot(GetSnapshotBarrier())) { if (shard.GetDataLocksManager()->IsLocked(*p.second, { "sharing_session:" + GetSessionId() })) { return false; } - portionsVector.emplace_back(p.second); + portionsByPath[i].emplace_back(p.second); } } @@ -52,7 +51,7 @@ void TCommonSession::PrepareToStart(const NColumnShard::TColumnShard& shard) { } void TCommonSession::Finish(const NColumnShard::TColumnShard& shard, const std::shared_ptr& dataLocksManager) { - AFL_VERIFY(State == EState::InProgress); + AFL_VERIFY(State == EState::InProgress || State == EState::Prepared); State = EState::Finished; shard.GetSharingSessionsManager()->FinishSharingSession(); AFL_VERIFY(LockGuard); diff --git a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp index daea14bd8edb..7f5a8cc9f5be 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp +++ b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp @@ -79,6 +79,10 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue())); } + if (protoSessionCursorDynamic && !protoSessionCursorStatic) { + protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{}; + } + AFL_VERIFY(index); session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate(); AFL_VERIFY(SourceSessions.emplace(session->GetSessionId(), session).second); diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp index 66fbcb98348a..54894bb414ee 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp @@ -132,8 +132,11 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu for (auto&& i : protoStatic.GetPathHashes()) { PathPortionHashes.emplace(i.GetPathId(), i.GetHash()); } - AFL_VERIFY(PathPortionHashes.size()); - IsStaticSaved = true; + if (PathPortionHashes.empty()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "empty static cursor"); + } else { + IsStaticSaved = true; + } return TConclusionStatus::Success(); } @@ -178,10 +181,13 @@ bool TSourceCursor::Start(const std::shared_ptr& storagesManag local.emplace(i.first, std::move(portionsMap)); } std::swap(PortionsForSend, local); - if (!StartPathId) { - AFL_VERIFY(PortionsForSend.size()); - AFL_VERIFY(PortionsForSend.begin()->second.size()); + if (PortionsForSend.empty()) { + NextPathId = std::nullopt; + NextPortionId = std::nullopt; + return true; + } else if (!StartPathId) { + AFL_VERIFY(PortionsForSend.begin()->second.size()); NextPathId = PortionsForSend.begin()->first; NextPortionId = PortionsForSend.begin()->second.begin()->first; AFL_VERIFY(Next(storagesManager, index));