Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table have to be readable with snapshot livetime #12964

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& pack : Packs) {
const auto& writeMeta = pack.GetWriteMeta();
AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
AFL_VERIFY(!writeMeta.HasLongTxId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
Expand Down
15 changes: 2 additions & 13 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);

if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);

auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(writeMeta.GetSource(), result.release());
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
continue;
}

if (putResult.GetPutStatus() != NKikimrProto::OK) {
Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
Expand Down Expand Up @@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
}

if (!TablesManager.IsReadyForWrite(pathId)) {
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
<< " at tablet " << TabletID());

Expand Down Expand Up @@ -558,7 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

const auto pathId = operation.GetTableId().GetTableId();

if (!TablesManager.IsReadyForWrite(pathId)) {
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() {
return;
}

auto changes =
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot();
THashSet<ui64> pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot);

auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
Expand All @@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() {
}

THashSet<ui64> pathIdsEmptyInInsertTable;
for (auto&& i : TablesManager.GetPathsToDrop()) {
for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) {
if (InsertTable->HasPathIdData(i)) {
continue;
}
Expand Down
39 changes: 25 additions & 14 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return false;
}
if (table.IsDropped()) {
PathsToDrop.insert(table.GetPathId());
AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second);
}

AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);
Expand Down Expand Up @@ -201,19 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return true;
}

bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const {
bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional<NOlap::TSnapshot> minReadSnapshot) const {
auto it = Tables.find(pathId);
if (it == Tables.end()) {
return false;
}
if (it->second.IsDropped()) {
if (it->second.IsDropped(minReadSnapshot)) {
return withDeleted;
}
return true;
}

bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
return HasPrimaryIndex() && HasTable(pathId);
bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const {
return HasPrimaryIndex() && HasTable(pathId, withDeleted);
}

bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const {
return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot);
}

bool TTablesManager::HasPreset(const ui32 presetId) const {
Expand All @@ -237,7 +241,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
AFL_VERIFY(Tables.contains(pathId));
auto& table = Tables[pathId];
table.SetDropVersion(version);
PathsToDrop.insert(pathId);
AFL_VERIFY(PathsToDrop[version].emplace(pathId).second);
Ttl.erase(pathId);
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
}
Expand Down Expand Up @@ -363,27 +367,34 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
}

bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
auto itDrop = PathsToDrop.find(pathId);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(itDrop->second.contains(pathId));

AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
NIceDb::TNiceDb db(dbTable);
NColumnShard::Schema::EraseTableInfo(db, pathId);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
for (auto&& tableVersion : itTable->second.GetVersions()) {
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
}
return true;
}

bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) {
auto itDrop = PathsToDrop.find(pathId);
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
PathsToDrop.erase(itDrop);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
{
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
AFL_VERIFY(itDrop != PathsToDrop.end());
AFL_VERIFY(itDrop->second.erase(pathId));
if (itDrop->second.empty()) {
PathsToDrop.erase(itDrop);
}
}
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
Tables.erase(itTable);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size());
return true;
Expand Down
36 changes: 28 additions & 8 deletions ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,28 @@ class TTableInfo {
return PathId;
}

const NOlap::TSnapshot& GetDropVersionVerified() const {
AFL_VERIFY(DropVersion);
return *DropVersion;
}

void SetDropVersion(const NOlap::TSnapshot& version) {
AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString());
DropVersion = version;
}

void AddVersion(const NOlap::TSnapshot& snapshot) {
Versions.insert(snapshot);
}

bool IsDropped() const {
return DropVersion.has_value();
bool IsDropped(const std::optional<NOlap::TSnapshot>& minReadSnapshot = std::nullopt) const {
if (!DropVersion) {
return false;
}
if (!minReadSnapshot) {
return true;
}
return *DropVersion < *minReadSnapshot;
}

TTableInfo() = default;
Expand All @@ -139,7 +151,7 @@ class TTablesManager {
THashMap<ui64, TTableInfo> Tables;
THashSet<ui32> SchemaPresetsIds;
THashMap<ui32, NKikimrSchemeOp::TColumnTableSchema> ActualSchemaForPreset;
THashSet<ui64> PathsToDrop;
std::map<NOlap::TSnapshot, THashSet<ui64>> PathsToDrop;
THashMap<ui64, NOlap::TTiering> Ttl;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
Expand All @@ -166,12 +178,19 @@ class TTablesManager {
return Ttl;
}

const THashSet<ui64>& GetPathsToDrop() const {
const std::map<NOlap::TSnapshot, THashSet<ui64>>& GetPathsToDrop() const {
return PathsToDrop;
}

THashSet<ui64>& MutablePathsToDrop() {
return PathsToDrop;
THashSet<ui64> GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const {
THashSet<ui64> result;
for (auto&& i : PathsToDrop) {
if (minReadSnapshot < i.first) {
break;
}
result.insert(i.second.begin(), i.second.end());
}
return result;
}

const THashMap<ui64, TTableInfo>& GetTables() const {
Expand Down Expand Up @@ -236,8 +255,9 @@ class TTablesManager {
const TTableInfo& GetTable(const ui64 pathId) const;
ui64 GetMemoryUsage() const;

bool HasTable(const ui64 pathId, bool withDeleted = false) const;
bool IsReadyForWrite(const ui64 pathId) const;
bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional<NOlap::TSnapshot> minReadSnapshot = std::nullopt) const;
bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const;
bool IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const;
bool HasPreset(const ui32 presetId) const;

void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);
Expand Down
Loading