diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp index b365f542df73..7d38657d83e6 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp @@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); auto& index = Self->MutableIndexAs(); + const auto minReadSnapshot = Self->GetMinReadSnapshot(); for (auto&& pack : Packs) { const auto& writeMeta = pack.GetWriteMeta(); - AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot)); AFL_VERIFY(!writeMeta.HasLongTxId()); auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 8bbcfce0e07d..52d3bed0a6b5 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + const auto minReadSnapshot = Self->GetMinReadSnapshot(); for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteMeta(); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot)); txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 68a83bf1a1f7..a3c81d2083a1 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo "writing_id", writeMeta.GetId())("status", putResult.GetPutStatus()); Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1); - if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { - ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); - Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); - - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(writeMeta.GetSource(), result.release()); - Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable); - wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); - continue; - } - if (putResult.GetPutStatus() != NKikimrProto::OK) { Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant()); Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); @@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled); } - if (!TablesManager.IsReadyForWrite(pathId)) { + if (!TablesManager.IsReadyForStartWrite(pathId, false)) { LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index") << " at tablet " << TabletID()); @@ -558,7 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto pathId = operation.GetTableId().GetTableId(); - if (!TablesManager.IsReadyForWrite(pathId)) { + if (!TablesManager.IsReadyForStartWrite(pathId, false)) { sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR); return; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f68cc93dd473..d9017da74e15 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() { return; } - auto changes = - TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager); + const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot(); + THashSet pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot); + + auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager); if (!changes) { ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes"); return; @@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() { } THashSet pathIdsEmptyInInsertTable; - for (auto&& i : TablesManager.GetPathsToDrop()) { + for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) { if (InsertTable->HasPathIdData(i)) { continue; } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 63a74e12fb2b..cd6eda199c41 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { return false; } if (table.IsDropped()) { - PathsToDrop.insert(table.GetPathId()); + AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second); } AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second); @@ -201,19 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { return true; } -bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const { +bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional minReadSnapshot) const { auto it = Tables.find(pathId); if (it == Tables.end()) { return false; } - if (it->second.IsDropped()) { + if (it->second.IsDropped(minReadSnapshot)) { return withDeleted; } return true; } -bool TTablesManager::IsReadyForWrite(const ui64 pathId) const { - return HasPrimaryIndex() && HasTable(pathId); +bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const { + return HasPrimaryIndex() && HasTable(pathId, withDeleted); +} + +bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const { + return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot); } bool TTablesManager::HasPreset(const ui32 presetId) const { @@ -237,7 +241,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio AFL_VERIFY(Tables.contains(pathId)); auto& table = Tables[pathId]; table.SetDropVersion(version); - PathsToDrop.insert(pathId); + AFL_VERIFY(PathsToDrop[version].emplace(pathId).second); Ttl.erase(pathId); Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId()); } @@ -363,13 +367,15 @@ TTablesManager::TTablesManager(const std::shared_ptr& s } bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const { - auto itDrop = PathsToDrop.find(pathId); + const auto& itTable = Tables.find(pathId); + AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); + auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified()); AFL_VERIFY(itDrop != PathsToDrop.end()); + AFL_VERIFY(itDrop->second.contains(pathId)); + AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); NIceDb::TNiceDb db(dbTable); NColumnShard::Schema::EraseTableInfo(db, pathId); - const auto& itTable = Tables.find(pathId); - AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); for (auto&& tableVersion : itTable->second.GetVersions()) { NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion); } @@ -377,13 +383,18 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co } bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) { - auto itDrop = PathsToDrop.find(pathId); - AFL_VERIFY(itDrop != PathsToDrop.end()); - AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); - AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId)); - PathsToDrop.erase(itDrop); const auto& itTable = Tables.find(pathId); AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); + { + auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified()); + AFL_VERIFY(itDrop != PathsToDrop.end()); + AFL_VERIFY(itDrop->second.erase(pathId)); + if (itDrop->second.empty()) { + PathsToDrop.erase(itDrop); + } + } + AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId)); + AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId)); Tables.erase(itTable); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size()); return true; diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 2f8d41832814..f3fc89b2582f 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -105,7 +105,13 @@ class TTableInfo { return PathId; } + const NOlap::TSnapshot& GetDropVersionVerified() const { + AFL_VERIFY(DropVersion); + return *DropVersion; + } + void SetDropVersion(const NOlap::TSnapshot& version) { + AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString()); DropVersion = version; } @@ -113,8 +119,14 @@ class TTableInfo { Versions.insert(snapshot); } - bool IsDropped() const { - return DropVersion.has_value(); + bool IsDropped(const std::optional& minReadSnapshot = std::nullopt) const { + if (!DropVersion) { + return false; + } + if (!minReadSnapshot) { + return true; + } + return *DropVersion < *minReadSnapshot; } TTableInfo() = default; @@ -139,7 +151,7 @@ class TTablesManager { THashMap Tables; THashSet SchemaPresetsIds; THashMap ActualSchemaForPreset; - THashSet PathsToDrop; + std::map> PathsToDrop; THashMap Ttl; std::unique_ptr PrimaryIndex; std::shared_ptr StoragesManager; @@ -166,12 +178,19 @@ class TTablesManager { return Ttl; } - const THashSet& GetPathsToDrop() const { + const std::map>& GetPathsToDrop() const { return PathsToDrop; } - THashSet& MutablePathsToDrop() { - return PathsToDrop; + THashSet GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const { + THashSet result; + for (auto&& i : PathsToDrop) { + if (minReadSnapshot < i.first) { + break; + } + result.insert(i.second.begin(), i.second.end()); + } + return result; } const THashMap& GetTables() const { @@ -236,8 +255,9 @@ class TTablesManager { const TTableInfo& GetTable(const ui64 pathId) const; ui64 GetMemoryUsage() const; - bool HasTable(const ui64 pathId, bool withDeleted = false) const; - bool IsReadyForWrite(const ui64 pathId) const; + bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional minReadSnapshot = std::nullopt) const; + bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const; + bool IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const; bool HasPreset(const ui32 presetId) const; void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);