Skip to content

Commit

Permalink
Merge f84e6a2 into 70de92e
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm authored Sep 18, 2024
2 parents 70de92e + f84e6a2 commit 7a23dbe
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
14 changes: 13 additions & 1 deletion ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {

public:
TAsyncReshardingTest() {
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
}

void AddBatch(int numRows) {
Expand Down Expand Up @@ -475,6 +475,18 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
tester.CheckCount();
}

Y_UNIT_TEST(SplitEmpty) {
TAsyncReshardingTest tester;

tester.CheckCount();

tester.StartResharding("SPLIT");

tester.CheckCount();
tester.WaitResharding();
tester.CheckCount();
}

Y_UNIT_TEST(ChangeSchemaAndSplit) {
TAsyncReshardingTest tester;
tester.DisableCompaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) {
THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>> portionsByPath;
THashSet<TString> StoragesIds;
for (auto&& i : GetPathIdsForStart()) {
auto& portionsVector = portionsByPath[i];
const auto& g = index.GetGranuleVerified(i);
for (auto&& p : g.GetPortionsOlderThenSnapshot(GetSnapshotBarrier())) {
if (shard.GetDataLocksManager()->IsLocked(*p.second, { "sharing_session:" + GetSessionId() })) {
return false;
}
portionsVector.emplace_back(p.second);
portionsByPath[i].emplace_back(p.second);
}
}

Expand All @@ -52,7 +51,7 @@ void TCommonSession::PrepareToStart(const NColumnShard::TColumnShard& shard) {
}

void TCommonSession::Finish(const NColumnShard::TColumnShard& shard, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) {
AFL_VERIFY(State == EState::InProgress);
AFL_VERIFY(State == EState::InProgress || State == EState::Prepared);
State = EState::Finished;
shard.GetSharingSessionsManager()->FinishSharingSession();
AFL_VERIFY(LockGuard);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL
AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
}

if (protoSessionCursorDynamic && !protoSessionCursorStatic) {
protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{};
}

AFL_VERIFY(index);
session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate();
AFL_VERIFY(SourceSessions.emplace(session->GetSessionId(), session).second);
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu
for (auto&& i : protoStatic.GetPathHashes()) {
PathPortionHashes.emplace(i.GetPathId(), i.GetHash());
}
AFL_VERIFY(PathPortionHashes.size());
IsStaticSaved = true;
if (PathPortionHashes.empty()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "empty static cursor");
} else {
IsStaticSaved = true;
}
return TConclusionStatus::Success();
}

Expand Down Expand Up @@ -178,10 +181,13 @@ bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManag
local.emplace(i.first, std::move(portionsMap));
}
std::swap(PortionsForSend, local);
if (!StartPathId) {
AFL_VERIFY(PortionsForSend.size());
AFL_VERIFY(PortionsForSend.begin()->second.size());

if (PortionsForSend.empty()) {
NextPathId = std::nullopt;
NextPortionId = std::nullopt;
return true;
} else if (!StartPathId) {
AFL_VERIFY(PortionsForSend.begin()->second.size());
NextPathId = PortionsForSend.begin()->first;
NextPortionId = PortionsForSend.begin()->second.begin()->first;
AFL_VERIFY(Next(storagesManager, index));
Expand Down

0 comments on commit 7a23dbe

Please sign in to comment.