Skip to content

Commit

Permalink
Fix loading session without cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm committed Sep 16, 2024
1 parent 946e6a2 commit 3855776
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 28 deletions.
14 changes: 10 additions & 4 deletions ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL
NKikimrColumnShardDataSharingProto::TSourceSession protoSession;
AFL_VERIFY(protoSession.ParseFromString(rowset.GetValue<Schema::SourceSessions::Details>()));

NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic protoSessionCursorDynamic;
AFL_VERIFY(protoSessionCursorDynamic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic> protoSessionCursorDynamic;
if (rowset.HaveValue<Schema::SourceSessions::CursorDynamic>()) {
protoSessionCursorDynamic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic{};
AFL_VERIFY(protoSessionCursorDynamic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
}

NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic protoSessionCursorStatic;
AFL_VERIFY(protoSessionCursorStatic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic> protoSessionCursorStatic;
if (rowset.HaveValue<Schema::SourceSessions::CursorStatic>()) {
protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{};
AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
}

AFL_VERIFY(index);
session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate();
Expand Down
28 changes: 21 additions & 7 deletions ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "source.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <ydb/library/formats/arrow/hash/xx_hash.h>

namespace NKikimr::NOlap::NDataSharing {
Expand Down Expand Up @@ -130,18 +134,29 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu
PathPortionHashes.emplace(i.GetPathId(), i.GetHash());
}
AFL_VERIFY(PathPortionHashes.size());
StaticSaved = true;
IsStaticSaved = true;
return TConclusionStatus::Success();
}

TSourceCursor::TSourceCursor(const TTabletId selfTabletId, const std::set<ui64>& pathIds, const TTransferContext transferContext)
: SelfTabletId(selfTabletId)
, TransferContext(transferContext)
, PathIds(pathIds)
{
, PathIds(pathIds) {
}

bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
void TSourceCursor::Persist(NIceDb::TNiceDb& db, const TString& sessionId) {
using SourceSessions = NKikimr::NColumnShard::Schema::SourceSessions;
db.Table<SourceSessions>().Key(sessionId).Update(
NIceDb::TUpdate<SourceSessions::CursorDynamic>(SerializeDynamicToProto().SerializeAsString()));
if (!IsStaticSaved) {
db.Table<SourceSessions>().Key(sessionId).Update(
NIceDb::TUpdate<SourceSessions::CursorStatic>(SerializeStaticToProto().SerializeAsString()));
IsStaticSaved = true;
}
}

bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager,
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
AFL_VERIFY(!IsStartedFlag);
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> local;
std::vector<std::shared_ptr<TPortionInfo>> portionsLock;
Expand Down Expand Up @@ -177,5 +192,4 @@ bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManag
IsStartedFlag = true;
return true;
}

}
} // namespace NKikimr::NOlap::NDataSharing
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/data_sharing/source/session/cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class TColumnEngineForLogs;
class TVersionedIndex;
}

namespace NKikimr::NIceDb {
class TNiceDb;
}

namespace NKikimr::NOlap::NDataSharing {

class TSharedBlobsManager;
Expand All @@ -30,8 +34,11 @@ class TSourceCursor {
std::set<ui64> PathIds;
THashMap<ui64, TString> PathPortionHashes;
bool IsStartedFlag = false;
YDB_ACCESSOR(bool, StaticSaved, false);
bool IsStaticSaved = false;
void BuildSelection(const std::shared_ptr<IStoragesManager>& storagesManager, const TVersionedIndex& index);
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const;
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const;

public:
bool IsAckDataReceived() const {
return AckReceivedForPackIdx == PackIdx;
Expand Down Expand Up @@ -96,11 +103,10 @@ class TSourceCursor {

TSourceCursor(const TTabletId selfTabletId, const std::set<ui64>& pathIds, const TTransferContext transferContext);

bool Start(const std::shared_ptr<IStoragesManager>& storagesManager, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index);

NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const;
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const;
void Persist(class NIceDb::TNiceDb& db, const TString& sessionId);

bool Start(const std::shared_ptr<IStoragesManager>& storagesManager,
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index);
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic& proto,
const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic& protoStatic);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "source.h"
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_finish_ack_to_source.h>

#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.h>
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_finish_ack_to_source.h>
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.h>
#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

namespace NKikimr::NOlap::NDataSharing {
Expand Down Expand Up @@ -68,6 +69,10 @@ TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TSourceSession::
}
}

void TSourceSession::PersistCursor(NIceDb::TNiceDb& db) {
GetCursorVerified()->Persist(db, GetSessionId());
}

void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shard, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) {
AFL_VERIFY(IsInProgress() || IsPrepared());
AFL_VERIFY(Cursor);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/data_sharing/source/session/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include <ydb/core/tx/columnshard/data_sharing/common/session/common.h>
#include <ydb/core/tx/columnshard/common/tablet_id.h>

namespace NKikimr::NIceDb {
class TNiceDb;
}

namespace NKikimr::NOlap::NDataSharing {

class TSharedBlobsManager;
Expand Down Expand Up @@ -58,7 +62,9 @@ class TSourceSession: public TCommonSession {
AFL_VERIFY(!!Cursor);
return Cursor;
}
/*

void PersistCursor(NIceDb::TNiceDb& db);
/*
bool TryNextCursor(const ui32 packIdx, const std::shared_ptr<IStoragesManager>& storagesManager, const TVersionedIndex& index) {
AFL_VERIFY(Cursor);
if (packIdx != Cursor->GetPackIdx()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc
}

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));
if (!Session->GetCursorVerified()->GetStaticSaved()) {
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorStatic>(Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString()));
Session->GetCursorVerified()->SetStaticSaved(true);
}
Session->PersistCursor(db);
std::swap(SharedBlobIds, sharedTabletBlobIds);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ namespace NKikimr::NOlap::NDataSharing {
bool TTxWriteSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));
Session->PersistCursor(db);
return true;
}

Expand Down

0 comments on commit 3855776

Please sign in to comment.