Skip to content

Commit

Permalink
table have to be readable with snapshot livetime (ydb-platform#12964)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 343ef9e commit 53e0948
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& pack : Packs) {
const auto& writeMeta = pack.GetWriteMeta();
AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
AFL_VERIFY(!writeMeta.HasLongTxId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
Expand Down
15 changes: 2 additions & 13 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);

if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);

auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(writeMeta.GetSource(), result.release());
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
continue;
}

if (putResult.GetPutStatus() != NKikimrProto::OK) {
Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
Expand Down Expand Up @@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
}

if (!TablesManager.IsReadyForWrite(pathId)) {
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
<< " at tablet " << TabletID());

Expand Down Expand Up @@ -560,7 +549,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

const auto pathId = operation.GetTableId().GetTableId();

if (!TablesManager.IsReadyForWrite(pathId)) {
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() {
return;
}

auto changes =
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot();
THashSet<ui64> pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot);

auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
Expand All @@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() {
}

THashSet<ui64> pathIdsEmptyInInsertTable;
for (auto&& i : TablesManager.GetPathsToDrop()) {
for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) {
if (InsertTable->HasPathIdData(i)) {
continue;
}
Expand Down
39 changes: 25 additions & 14 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return false;
}
if (table.IsDropped()) {
PathsToDrop.insert(table.GetPathId());
AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second);
}

AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);
Expand Down Expand Up @@ -201,19 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return true;
}

bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const {
bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional<NOlap::TSnapshot> minReadSnapshot) const {
auto it = Tables.find(pathId);
if (it == Tables.end()) {
return false;
}
if (it->second.IsDropped()) {
if (it->second.IsDropped(minReadSnapshot)) {
return withDeleted;
}
return true;
}

bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
return HasPrimaryIndex() && HasTable(pathId);
bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const {
return HasPrimaryIndex() && HasTable(pathId, withDeleted);
}

bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const {
return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot);
}

bool TTablesManager::HasPreset(const ui32 presetId) const {
Expand All @@ -237,7 +241,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
AFL_VERIFY(Tables.contains(pathId));
auto& table = Tables[pathId];
table.SetDropVersion(version);
PathsToDrop.insert(pathId);
AFL_VERIFY(PathsToDrop[version].emplace(pathId).second);
Ttl.erase(pathId);
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
}
Expand Down Expand Up @@ -363,27 +367,34 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
}

bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
auto itDrop = PathsToDrop.find(pathId);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(itDrop->second.contains(pathId));

AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
NIceDb::TNiceDb db(dbTable);
NColumnShard::Schema::EraseTableInfo(db, pathId);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
for (auto&& tableVersion : itTable->second.GetVersions()) {
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
}
return true;
}

bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) {
auto itDrop = PathsToDrop.find(pathId);
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
PathsToDrop.erase(itDrop);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
{
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(itDrop->second.erase(pathId));
if (itDrop->second.empty()) {
PathsToDrop.erase(itDrop);
}
}
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
Tables.erase(itTable);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size());
return true;
Expand Down
36 changes: 28 additions & 8 deletions ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,28 @@ class TTableInfo {
return PathId;
}

const NOlap::TSnapshot& GetDropVersionVerified() const {
AFL_VERIFY(DropVersion);
return *DropVersion;
}

void SetDropVersion(const NOlap::TSnapshot& version) {
AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString());
DropVersion = version;
}

void AddVersion(const NOlap::TSnapshot& snapshot) {
Versions.insert(snapshot);
}

bool IsDropped() const {
return DropVersion.has_value();
bool IsDropped(const std::optional<NOlap::TSnapshot>& minReadSnapshot = std::nullopt) const {
if (!DropVersion) {
return false;
}
if (!minReadSnapshot) {
return true;
}
return *DropVersion < *minReadSnapshot;
}

TTableInfo() = default;
Expand All @@ -139,7 +151,7 @@ class TTablesManager {
THashMap<ui64, TTableInfo> Tables;
THashSet<ui32> SchemaPresetsIds;
THashMap<ui32, NKikimrSchemeOp::TColumnTableSchema> ActualSchemaForPreset;
THashSet<ui64> PathsToDrop;
std::map<NOlap::TSnapshot, THashSet<ui64>> PathsToDrop;
THashMap<ui64, NOlap::TTiering> Ttl;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
Expand All @@ -166,12 +178,19 @@ class TTablesManager {
return Ttl;
}

const THashSet<ui64>& GetPathsToDrop() const {
const std::map<NOlap::TSnapshot, THashSet<ui64>>& GetPathsToDrop() const {
return PathsToDrop;
}

THashSet<ui64>& MutablePathsToDrop() {
return PathsToDrop;
THashSet<ui64> GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const {
THashSet<ui64> result;
for (auto&& i : PathsToDrop) {
if (minReadSnapshot < i.first) {
break;
}
result.insert(i.second.begin(), i.second.end());
}
return result;
}

const THashMap<ui64, TTableInfo>& GetTables() const {
Expand Down Expand Up @@ -236,8 +255,9 @@ class TTablesManager {
const TTableInfo& GetTable(const ui64 pathId) const;
ui64 GetMemoryUsage() const;

bool HasTable(const ui64 pathId, bool withDeleted = false) const;
bool IsReadyForWrite(const ui64 pathId) const;
bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional<NOlap::TSnapshot> minReadSnapshot = std::nullopt) const;
bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const;
bool IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const;
bool HasPreset(const ui32 presetId) const;

void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);
Expand Down

0 comments on commit 53e0948

Please sign in to comment.