diff --git a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp index 18a30ac76061..daea14bd8edb 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp +++ b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp @@ -67,11 +67,17 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL NKikimrColumnShardDataSharingProto::TSourceSession protoSession; AFL_VERIFY(protoSession.ParseFromString(rowset.GetValue())); - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic protoSessionCursorDynamic; - AFL_VERIFY(protoSessionCursorDynamic.ParseFromString(rowset.GetValue())); + std::optional protoSessionCursorDynamic; + if (rowset.HaveValue()) { + protoSessionCursorDynamic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic{}; + AFL_VERIFY(protoSessionCursorDynamic->ParseFromString(rowset.GetValue())); + } - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic protoSessionCursorStatic; - AFL_VERIFY(protoSessionCursorStatic.ParseFromString(rowset.GetValue())); + std::optional protoSessionCursorStatic; + if (rowset.HaveValue()) { + protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{}; + AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue())); + } AFL_VERIFY(index); session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate(); diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp index 5bc37cd29122..66fbcb98348a 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp @@ -1,6 +1,9 @@ #include "source.h" -#include + +#include #include +#include + #include namespace NKikimr::NOlap::NDataSharing { @@ -130,18 +133,29 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu PathPortionHashes.emplace(i.GetPathId(), i.GetHash()); } AFL_VERIFY(PathPortionHashes.size()); - StaticSaved = true; + IsStaticSaved = true; return TConclusionStatus::Success(); } TSourceCursor::TSourceCursor(const TTabletId selfTabletId, const std::set& pathIds, const TTransferContext transferContext) : SelfTabletId(selfTabletId) , TransferContext(transferContext) - , PathIds(pathIds) -{ + , PathIds(pathIds) { } -bool TSourceCursor::Start(const std::shared_ptr& storagesManager, const THashMap>>& portions, const TVersionedIndex& index) { +void TSourceCursor::SaveToDatabase(NIceDb::TNiceDb& db, const TString& sessionId) { + using SourceSessions = NKikimr::NColumnShard::Schema::SourceSessions; + db.Table().Key(sessionId).Update( + NIceDb::TUpdate(SerializeDynamicToProto().SerializeAsString())); + if (!IsStaticSaved) { + db.Table().Key(sessionId).Update( + NIceDb::TUpdate(SerializeStaticToProto().SerializeAsString())); + IsStaticSaved = true; + } +} + +bool TSourceCursor::Start(const std::shared_ptr& storagesManager, + const THashMap>>& portions, const TVersionedIndex& index) { AFL_VERIFY(!IsStartedFlag); std::map>> local; std::vector> portionsLock; @@ -177,5 +191,4 @@ bool TSourceCursor::Start(const std::shared_ptr& storagesManag IsStartedFlag = true; return true; } - -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h index 3f4cdba86c15..ac8daea95cbf 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h +++ b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h @@ -8,6 +8,10 @@ class TColumnEngineForLogs; class TVersionedIndex; } +namespace NKikimr::NIceDb { +class TNiceDb; +} + namespace NKikimr::NOlap::NDataSharing { class TSharedBlobsManager; @@ -30,8 +34,11 @@ class TSourceCursor { std::set PathIds; THashMap PathPortionHashes; bool IsStartedFlag = false; - YDB_ACCESSOR(bool, StaticSaved, false); + bool IsStaticSaved = false; void BuildSelection(const std::shared_ptr& storagesManager, const TVersionedIndex& index); + NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const; + NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const; + public: bool IsAckDataReceived() const { return AckReceivedForPackIdx == PackIdx; @@ -96,11 +103,10 @@ class TSourceCursor { TSourceCursor(const TTabletId selfTabletId, const std::set& pathIds, const TTransferContext transferContext); - bool Start(const std::shared_ptr& storagesManager, const THashMap>>& portions, const TVersionedIndex& index); - - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const; - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const; + void SaveToDatabase(class NIceDb::TNiceDb& db, const TString& sessionId); + bool Start(const std::shared_ptr& storagesManager, + const THashMap>>& portions, const TVersionedIndex& index); [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic& proto, const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic& protoStatic); }; diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp b/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp index 7c3e244ade19..2bb079d5baf0 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp @@ -1,8 +1,9 @@ #include "source.h" -#include + +#include #include +#include #include -#include #include namespace NKikimr::NOlap::NDataSharing { @@ -68,6 +69,10 @@ TConclusion> TSourceSession:: } } +void TSourceSession::SaveCursorToDatabase(NIceDb::TNiceDb& db) { + GetCursorVerified()->SaveToDatabase(db, GetSessionId()); +} + void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shard, const std::shared_ptr& dataLocksManager) { AFL_VERIFY(IsInProgress() || IsPrepared()); AFL_VERIFY(Cursor); diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/source.h b/ydb/core/tx/columnshard/data_sharing/source/session/source.h index 903fc61c1783..30a5e2208bd5 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/source.h +++ b/ydb/core/tx/columnshard/data_sharing/source/session/source.h @@ -3,6 +3,10 @@ #include #include +namespace NKikimr::NIceDb { +class TNiceDb; +} + namespace NKikimr::NOlap::NDataSharing { class TSharedBlobsManager; @@ -58,7 +62,9 @@ class TSourceSession: public TCommonSession { AFL_VERIFY(!!Cursor); return Cursor; } -/* + + void SaveCursorToDatabase(NIceDb::TNiceDb& db); + /* bool TryNextCursor(const ui32 packIdx, const std::shared_ptr& storagesManager, const TVersionedIndex& index) { AFL_VERIFY(Cursor); if (packIdx != Cursor->GetPackIdx()) { diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp index 5a9bb1cf1274..d5c37846be9d 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp @@ -22,13 +22,7 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc } NIceDb::TNiceDb db(txc.DB); - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString())); - if (!Session->GetCursorVerified()->GetStaticSaved()) { - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString())); - Session->GetCursorVerified()->SetStaticSaved(true); - } + Session->SaveCursorToDatabase(db); std::swap(SharedBlobIds, sharedTabletBlobIds); return true; } diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp index 4af96622de2b..a4c67ed121cf 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp @@ -6,8 +6,7 @@ namespace NKikimr::NOlap::NDataSharing { bool TTxWriteSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString())); + Session->SaveCursorToDatabase(db); return true; }