From 16418df6e17d4a6d93218df1887937387df07ee6 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Dec 2024 20:13:40 +0300 Subject: [PATCH] commit processing fixes (#12519) --- .../columnshard/columnshard__progress_tx.cpp | 31 ++++++++++++++----- .../tx/columnshard/engines/column_engine.cpp | 10 +++--- .../tx/columnshard/engines/column_engine.h | 2 +- .../engines/column_engine_logs.cpp | 24 ++++++++------ .../constructor/read_metadata.cpp | 2 +- .../plain_reader/constructor/read_metadata.h | 2 +- .../plain_reader/iterator/plain_read_data.cpp | 4 +-- .../simple_reader/constructor/read_metadata.h | 2 +- .../iterator/plain_read_data.cpp | 4 +-- .../columnshard/engines/ut/ut_logs_engine.cpp | 22 ++++++------- .../transactions/operators/ev_write/primary.h | 30 +++++++++--------- .../operators/ev_write/secondary.h | 13 +++----- .../transactions/tx_controller.cpp | 28 ++++++++--------- .../columnshard/transactions/tx_controller.h | 16 ++++++++++ ydb/library/services/services.proto | 1 + 15 files changed, 111 insertions(+), 80 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 73a4a0200d97..44bb0a27f860 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -28,9 +28,12 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - NActors::TLogContextGuard logGuard = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); - Y_ABORT_UNLESS(Self->ProgressTxInFlight); + NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tablet_id", Self->TabletID())( + "tx_state", "TTxProgressTx::Execute")("tx_current", Self->ProgressTxInFlight); + if (!Self->ProgressTxInFlight) { + AbortedThroughRemoveExpired = true; + return true; + } Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds()); const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc); @@ -45,15 +48,24 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { const auto plannedItem = Self->ProgressTxController->GetFirstPlannedTx(); if (!!plannedItem) { PlannedQueueItem.emplace(plannedItem->PlanStep, plannedItem->TxId); - ui64 step = plannedItem->PlanStep; - ui64 txId = plannedItem->TxId; + const ui64 step = plannedItem->PlanStep; + const ui64 txId = plannedItem->TxId; + NActors::TLogContextGuard logGuardTx = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tx_id", txId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart"); TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId); if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "BuildTxPrepareForProgress"); AbortedThroughRemoveExpired = true; Self->ProgressTxInFlight = txId; Self->Execute(txPrepare.release(), ctx); return true; + } else if (TxOperator->IsInProgress()) { + AbortedThroughRemoveExpired = true; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemContinue"); + AFL_VERIFY(Self->ProgressTxInFlight == txId); + return true; } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "PopFirstPlannedTx"); Self->ProgressTxController->PopFirstPlannedTx(); } StartExecution = TMonotonic::Now(); @@ -80,8 +92,9 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { if (AbortedThroughRemoveExpired) { return; } - NActors::TLogContextGuard logGuard = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); + NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)( + "tablet_id", Self->TabletID())( + "tx_state", "TTxProgressTx::Complete"); if (TxOperator) { TxOperator->ProgressOnComplete(*Self, ctx); Self->RescheduleWaitingReads(); @@ -104,11 +117,13 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { }; void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional continueTxId) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTx")("tablet_id", TabletID())("tx_id", continueTxId); if (continueTxId) { AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId); } if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTxStart")("tablet_id", TabletID())("tx_id", continueTxId)( + "tx_current", ProgressTxInFlight); ProgressTxInFlight = continueTxId.value_or(0); Execute(new TTxProgressTx(this), ctx); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index e350fbb05719..bd6cf4925a4e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -38,10 +38,10 @@ void IColumnEngine::FetchDataAccessors(const std::shared_ptr uniqBlob; - for (auto& portionInfo : PortionsOrderedPK) { + for (auto& portionInfo : Portions) { out.Rows += portionInfo->GetRecordsCount(); for (auto& blobId : portionInfo->GetBlobIds()) { out.Bytes += blobId.BlobSize(); @@ -53,10 +53,10 @@ TSelectInfo::TStats TSelectInfo::Stats() const { TString TSelectInfo::DebugString() const { TStringBuilder result; - result << "count:" << PortionsOrderedPK.size() << ";"; - if (PortionsOrderedPK.size()) { + result << "count:" << Portions.size() << ";"; + if (Portions.size()) { result << "portions:"; - for (auto& portionInfo : PortionsOrderedPK) { + for (auto& portionInfo : Portions) { result << portionInfo->DebugString(); } } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 940a055963b8..13d5c954920b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -46,7 +46,7 @@ struct TSelectInfo { } }; - std::vector> PortionsOrderedPK; + std::vector> Portions; TStats Stats() const; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 518513b5635d..30ad46d591de 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -502,18 +502,22 @@ std::shared_ptr TColumnEngineForLogs::Select( return out; } - if (withUncommitted) { - for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) { - AFL_VERIFY(portionInfo->HasInsertWriteId()); - AFL_VERIFY(!portionInfo->HasCommitSnapshot()); - const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)( - "portion", portionInfo->DebugString()); - if (skipPortion) { + for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) { + AFL_VERIFY(portionInfo->HasInsertWriteId()); + if (withUncommitted) { + if (!portionInfo->IsVisible(snapshot, !withUncommitted)) { continue; } - out->PortionsOrderedPK.emplace_back(portionInfo); + } else if (!portionInfo->HasCommitSnapshot()) { + continue; + } + const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)( + "portion", portionInfo->DebugString()); + if (skipPortion) { + continue; } + out->Portions.emplace_back(portionInfo); } for (const auto& [_, portionInfo] : spg->GetPortions()) { if (!portionInfo->IsVisible(snapshot, !withUncommitted)) { @@ -525,7 +529,7 @@ std::shared_ptr TColumnEngineForLogs::Select( if (skipPortion) { continue; } - out->PortionsOrderedPK.emplace_back(portionInfo); + out->Portions.emplace_back(portionInfo); } return out; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index ec712ef066c0..56a14c9b23fe 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -20,7 +20,7 @@ TConclusionStatus TReadMetadata::Init( SelectInfo = dataAccessor.Select(readDescription, !!LockId); if (LockId) { - for (auto&& i : SelectInfo->PortionsOrderedPK) { + for (auto&& i : SelectInfo->Portions) { if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) { if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) { } else { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h index 17f56ef0ff33..b0242d486aaa 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h @@ -21,7 +21,7 @@ class TReadMetadata: public NCommon::TReadMetadata { std::vector CommittedBlobs; virtual bool Empty() const override { Y_ABORT_UNLESS(SelectInfo); - return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty(); + return SelectInfo->Portions.empty() && CommittedBlobs.empty(); } virtual std::shared_ptr BuildReader(const std::shared_ptr& context) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index 036fdcb66550..8633f5d692a8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) , SpecialReadContext(std::make_shared(context)) { ui32 sourceIdx = 0; std::deque> sources; - const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK; + const auto& portions = GetReadMetadata()->SelectInfo->Portions; const auto& committed = GetReadMetadata()->CommittedBlobs; ui64 compactedPortionsBytes = 0; ui64 insertedPortionsBytes = 0; @@ -49,7 +49,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) Scanner = std::make_shared(std::move(sources), SpecialReadContext); auto& stats = GetReadMetadata()->ReadStats; - stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size(); + stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size(); stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs(); stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size(); stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount(); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h index eb1e302ca21e..be603922a060 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h @@ -12,7 +12,7 @@ class TReadMetadata: public NCommon::TReadMetadata { virtual bool Empty() const override { Y_ABORT_UNLESS(SelectInfo); - return SelectInfo->PortionsOrderedPK.empty(); + return SelectInfo->Portions.empty(); } virtual std::shared_ptr BuildReader(const std::shared_ptr& context) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp index d794ff4a24ac..eb8c21e291b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) , SpecialReadContext(std::make_shared(context)) { ui32 sourceIdx = 0; std::deque> sources; - const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK; + const auto& portions = GetReadMetadata()->SelectInfo->Portions; ui64 compactedPortionsBytes = 0; ui64 insertedPortionsBytes = 0; for (auto&& i : portions) { @@ -26,7 +26,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) Scanner = std::make_shared(std::move(sources), SpecialReadContext); auto& stats = GetReadMetadata()->ReadStats; - stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size(); + stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size(); stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs(); stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount(); stats->InsertedPortionsBytes = insertedPortionsBytes; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 8201deaf4c5c..475c51a93fa5 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -566,28 +566,28 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 planStep = 1; ui64 txId = 0; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } { // select from snap between insert (greater txId) ui64 planStep = 1; ui64 txId = 2; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } { // select from snap after insert (greater planStep) ui64 planStep = 2; ui64 txId = 1; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1); } { // select another pathId ui64 planStep = 2; ui64 txId = 1; auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); } } @@ -657,7 +657,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20); } // predicates @@ -671,7 +671,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { NOlap::TPKRangesFilter pkFilter(false); Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, indexInfo.GetReplaceKey())); auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10); } { @@ -683,7 +683,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { NOlap::TPKRangesFilter pkFilter(false); Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, indexInfo.GetReplaceKey())); auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 9); } } @@ -841,7 +841,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20); } // Cleanup @@ -850,7 +850,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20); } // TTL @@ -866,7 +866,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10); } } { @@ -882,7 +882,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10); } } } diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 40a2a6586ab4..f53042bf0e26 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -23,7 +23,6 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac std::set WaitShardsBrokenFlags; std::set WaitShardsResultAck; std::optional TxBroken; - mutable TAtomicCounter ControlCounter = 0; virtual NKikimrTxColumnShard::TCommitWriteTxBody SerializeToProto() const override { NKikimrTxColumnShard::TCommitWriteTxBody result; @@ -48,7 +47,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac virtual bool DoParseImpl(TColumnShard& /*owner*/, const NKikimrTxColumnShard::TCommitWriteTxBody& commitTxBody) override { if (!commitTxBody.HasPrimaryTabletData()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot read proto")("proto", commitTxBody.DebugString()); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("event", "cannot read proto")("proto", commitTxBody.DebugString()); return false; } auto& protoData = commitTxBody.GetPrimaryTabletData(); @@ -92,7 +91,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag; Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString()); } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId); } return true; } @@ -101,11 +100,11 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac if (op->WaitShardsBrokenFlags.erase(TabletId)) { op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag; op->SendBrokenFlagAck(*Self, TabletId); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))( "receive", TabletId); op->InitializeRequests(*Self); } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId); } } @@ -132,17 +131,18 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override { auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); auto copy = *op; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))("receive", TabletId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( + "receive", TabletId); AFL_VERIFY(copy.WaitShardsResultAck.erase(TabletId)); Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString()); return true; } virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override { auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( "receive", TabletId); if (!op->WaitShardsResultAck.erase(TabletId)) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))( + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))( "receive", TabletId); } op->CheckFinished(*Self); @@ -174,7 +174,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac void CheckFinished(TColumnShard& owner) { if (WaitShardsResultAck.empty()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "finished"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "finished"); owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext(), GetTxId()); } } @@ -248,7 +248,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac if (op->WaitShardsBrokenFlags.empty()) { AFL_VERIFY(op->WaitShardsResultAck.erase(Self->TabletID())); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))( "receive", Self->TabletID()); op->CheckFinished(*Self); } @@ -265,13 +265,11 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac InitializeRequests(owner); } + virtual bool DoIsInProgress() const override { + return WaitShardsResultAck.size(); + } virtual std::unique_ptr DoBuildTxPrepareForProgress(TColumnShard* owner) const override { - if (WaitShardsResultAck.empty()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_prepare_for_progress")("lock_id", LockId); - return nullptr; - } - AFL_VERIFY(ControlCounter.Inc() <= 1); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "prepare_for_progress_started")("lock_id", LockId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "prepare_for_progress_started")("lock_id", LockId); return std::make_unique(owner, GetTxId()); } diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h index ae249b07995f..8bbf9d4d6f55 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h @@ -20,7 +20,6 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans bool NeedReceiveBroken = false; bool ReceiveAck = false; bool SelfBroken = false; - mutable TAtomicCounter ControlCounter = 0; std::optional TxBroken; virtual NKikimrTxColumnShard::TCommitWriteTxBody SerializeToProto() const override { @@ -38,7 +37,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans virtual bool DoParseImpl(TColumnShard& /*owner*/, const NKikimrTxColumnShard::TCommitWriteTxBody& commitTxBody) override { if (!commitTxBody.HasSecondaryTabletData()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot read proto")("proto", commitTxBody.DebugString()); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("event", "cannot read proto")("proto", commitTxBody.DebugString()); return false; } auto& protoData = commitTxBody.GetSecondaryTabletData(); @@ -187,13 +186,11 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans } }; + virtual bool DoIsInProgress() const override { + return !TxBroken && (NeedReceiveBroken || !ReceiveAck); + } virtual std::unique_ptr DoBuildTxPrepareForProgress(TColumnShard* owner) const override { - if (TxBroken || (!NeedReceiveBroken && ReceiveAck)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_prepare_for_progress")("lock_id", LockId); - return nullptr; - } - AFL_VERIFY(ControlCounter.Inc() <= 1); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "prepare_for_progress_started")("lock_id", LockId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "prepare_for_progress_started")("lock_id", LockId); return std::make_unique(owner, GetTxId()); } diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index afb1e8a33d50..a32ebbf0250b 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -86,7 +86,7 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return false; } } - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)( + AFL_INFO(NKikimrServices::TX_COLUMNSHARD_TX)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)( "operators", Operators.size())("plan", PlanQueue.size())("dl_queue", DeadlineQueue.size()); return true; } @@ -277,10 +277,10 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const { TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { auto it = Operators.find(txId); if (it == Operators.end()) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_plan_tx")("tx_id", txId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "skip_plan_tx")("tx_id", txId); return EPlanResult::Skipped; } else { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_TX)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep); } auto& txInfo = it->second->MutableTxInfo(); if (txInfo.PlanStep == 0) { @@ -308,12 +308,12 @@ std::shared_ptr TTxController::StartPropose const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) { NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start"); std::shared_ptr txOperator( TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); AFL_VERIFY(!!txOperator); if (!txOperator->Parse(Owner, txBody)) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse txOperator"); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot parse txOperator"); return txOperator; } Counters.OnStartProposeOnExecute(txOperator->GetOpType()); @@ -321,13 +321,13 @@ std::shared_ptr TTxController::StartPropose auto txInfoPtr = GetTxInfo(txInfo.TxId); if (!!txInfoPtr) { if (!txOperator->CheckAllowUpdate(*txInfoPtr)) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect duplication")("actual_tx", txInfoPtr->DebugString()); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "incorrect duplication")("actual_tx", txInfoPtr->DebugString()); TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed"); txOperator->SetProposeStartInfo(proposeResult); return txOperator; } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "update duplication data")("deprecated_tx", txInfoPtr->DebugString()); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "update duplication data")("deprecated_tx", txInfoPtr->DebugString()); return UpdateTxSourceInfo(txOperator->GetTxInfo(), txc); } } else { @@ -337,9 +337,9 @@ std::shared_ptr TTxController::StartPropose } else { RegisterTx(txOperator, txBody, txc); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "registered"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "registered"); } else { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "problem on start")( + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("error", "problem on start")( "message", txOperator->GetProposeStartInfoVerified().GetStatusMessage()); } return txOperator; @@ -349,7 +349,7 @@ std::shared_ptr TTxController::StartPropose void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) { NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txOperator.GetTxId()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start"); txOperator.StartProposeOnComplete(Owner, ctx); Counters.OnStartProposeOnComplete(txOperator.GetOpType()); } @@ -357,18 +357,18 @@ void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, con void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnExecute")("tx_id", txId); if (auto txOperator = GetTxOperatorOptional(txId)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start"); txOperator->FinishProposeOnExecute(Owner, txc); Counters.OnFinishProposeOnExecute(txOperator->GetOpType()); } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot found txOperator in propose transaction base")("tx_id", txId); } } void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) { NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txOperator.GetTxId()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString()); TTxController::TProposeResult proposeResult = txOperator.GetProposeStartInfoVerified(); AFL_VERIFY(!txOperator.IsFail()); txOperator.FinishProposeOnComplete(Owner, ctx); @@ -379,7 +379,7 @@ void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, co void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) { auto txOperator = GetTxOperatorOptional(txId); if (!txOperator) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId); return; } return FinishProposeOnComplete(*txOperator, ctx); diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index e48f10d3796d..a546f50001f0 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -198,6 +198,8 @@ class TTxController { std::optional Status = EStatus::Created; private: + mutable TAtomicCounter PreparationsStarted = 0; + friend class TTxController; virtual bool DoParse(TColumnShard& owner, const TString& data) = 0; virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0; @@ -215,6 +217,9 @@ class TTxController { return false; } + virtual bool DoIsInProgress() const { + return false; + } virtual std::unique_ptr DoBuildTxPrepareForProgress(TColumnShard* /*owner*/) const { return nullptr; } @@ -240,6 +245,10 @@ class TTxController { using TFactory = NObjectFactory::TParametrizedObjectFactory; using OpType = TString; + bool IsInProgress() const { + return DoIsInProgress(); + } + bool PingTimeout(TColumnShard& owner, const TMonotonic now) { return DoPingTimeout(owner, now); } @@ -257,6 +266,13 @@ class TTxController { } std::unique_ptr BuildTxPrepareForProgress(TColumnShard* owner) const { + if (!IsInProgress()) { + return nullptr; + } + if (PreparationsStarted.Val()) { + return nullptr; + } + PreparationsStarted.Inc(); return DoBuildTxPrepareForProgress(owner); } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 5bd248386bef..ee0a8a7929dd 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -302,6 +302,7 @@ enum EServiceKikimr { TX_COLUMNSHARD_ACTUALIZATION = 850; TX_COLUMNSHARD_COMPACTION = 851; TX_COLUMNSHARD_WRITE = 852; + TX_COLUMNSHARD_TX = 853; // System views SYSTEM_VIEWS = 900;