From 138b81a96c69789c82282b3a4c469857ba5cd5e2 Mon Sep 17 00:00:00 2001 From: yusong-yan Date: Mon, 22 Apr 2024 18:22:38 +0000 Subject: [PATCH] [#22099] docdb: Postpone TransactionParticipant's min_running_ht Initialization Summary: **Issue:** Initialized `min_running_ht` in `TransactionParticipant` indicates completed transaction loading by `TransactionLoader`. Transaction loading starts during tablet bootstrap. However, there's a chance Transaction loading finishes before tablet bootstrap completes, which means `min_running_ht` could be initialized during the bootstrap. This could lead to unexpected behavior when `TransactionParticipant::MinRunningHybridTime()` is called during bootstrap. A recent code change D33131 has exposed by introducing calls to `TransactionParticipant::MinRunningHybridTime()` in various functions that are likely to be executed during the bootstrap WAL reply process. And early `min_running_ht` initialization triggers unexpected code execution and caused segmentation fault.(See [[ https://github.com/yugabyte/yugabyte-db/issues/21877 | #21877 ]] for more details) **Fix:** To address this, `min_running_ht` initialization now happens within the `LoadFinished` function, and it guarantees: * Successful Transaction Loading: At this point, all transactions for the tablet have been loaded successfully. * Local Bootstrap Completion: Once start_latch_.Wait() completes, it means `TransactionParticipant::Start()` has been called. This ensures the local bootstrap process has finished successfully. This ensure `min_running_ht` is initialized at a safer point in the startup process. Jira: DB-11029 Test Plan: Unit test WIP To validate the effectiveness of the fix, we re-ran the stress tests that originally exposed issue [[ https://github.com/yugabyte/yugabyte-db/issues/21877 | #21877 ]]. The tests completed successfully with no segmentation faults observed. Reviewers: esheng, sergei Reviewed By: sergei Subscribers: slingam, rthallam, ybase Differential Revision: https://phorge.dev.yugabyte.com/D34389 --- src/yb/client/ql-transaction-test.cc | 46 +++++++++++++++++++ src/yb/tablet/tablet.cc | 6 +++ src/yb/tablet/transaction_loader.cc | 6 +-- src/yb/tablet/transaction_loader.h | 1 - src/yb/tablet/transaction_participant.cc | 57 +++++++++++++++--------- 5 files changed, 90 insertions(+), 26 deletions(-) diff --git a/src/yb/client/ql-transaction-test.cc b/src/yb/client/ql-transaction-test.cc index cf847a01caca..b0d02709d84b 100644 --- a/src/yb/client/ql-transaction-test.cc +++ b/src/yb/client/ql-transaction-test.cc @@ -79,6 +79,10 @@ DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests); DECLARE_uint64(aborted_intent_cleanup_ms); DECLARE_uint64(max_clock_skew_usec); DECLARE_uint64(transaction_heartbeat_usec); +DECLARE_bool(TEST_load_transactions_sync); +DECLARE_uint64(TEST_inject_sleep_before_applying_intents_ms); +DECLARE_bool(TEST_skip_process_apply); +DECLARE_bool(TEST_skip_remove_intent); namespace yb { namespace client { @@ -1782,6 +1786,48 @@ TEST_F_EX(QLTransactionTest, GCLogsAfterTransactionalWritesStop, QLTransactionTe thread_holder.Stop(); } +class QLTransactionTestSingleTS : public QLTransactionTest { + protected: + void SetUp() override { + mini_cluster_opt_.num_tablet_servers = 1; + QLTransactionTest::SetUp(); + } +}; + +TEST_F_EX(QLTransactionTest, TransactionsEarlyLoadedTest, QLTransactionTestSingleTS) { + auto txn_1 = CreateTransaction(); + ASSERT_OK(WriteRow( + CreateSession(txn_1), + /* key = */ 0, + /* value = */ 0, + WriteOpType::INSERT, + Flush::kTrue)); + auto txn_2 = CreateTransaction(); + ASSERT_OK(WriteRow( + CreateSession(txn_2), + /* key = */ 100, + /* value = */ 100, + WriteOpType::INSERT, + Flush::kTrue)); + + // Skip applying and removing intent before stoping the cluster. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = true; + txn_1->Abort(); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = true; + ASSERT_OK(txn_2->CommitFuture().get()); + + cluster_->StopSync(); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_inject_sleep_before_applying_intents_ms) = 1000; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_load_transactions_sync) = true; + // Replay WAL [txn_1(ABORTED), txn_2(COMMITTED)] + ASSERT_OK(cluster_->Start()); + CheckAllTabletsRunning(); + AssertNoRunningTransactions(); +} + TEST_F(QLTransactionTest, DeleteTableDuringWrite) { DisableApplyingIntents(); ASSERT_NO_FATALS(WriteData()); diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index cce40ff167aa..2e0810812a64 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -289,6 +289,9 @@ DEFINE_test_flag(uint64, inject_sleep_before_applying_write_batch_ms, 0, DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0, "Sleep before applying intents to docdb after transaction commit"); +DEFINE_test_flag(bool, skip_remove_intent, false, + "If true, remove intent will be skipped"); + DECLARE_bool(TEST_invalidate_last_change_metadata_op); using namespace std::placeholders; @@ -2025,6 +2028,9 @@ Result Tablet::ApplyIntents(const TransactionApply template Status Tablet::RemoveIntentsImpl( const RemoveIntentsData& data, RemoveReason reason, const Ids& ids) { + if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_remove_intent))) { + return Status::OK(); + } auto scoped_read_operation = CreateScopedRWOperationNotBlockingRocksDbShutdownStart(); RETURN_NOT_OK(scoped_read_operation); diff --git a/src/yb/tablet/transaction_loader.cc b/src/yb/tablet/transaction_loader.cc index 00a6667ae083..c332ce837a5b 100644 --- a/src/yb/tablet/transaction_loader.cc +++ b/src/yb/tablet/transaction_loader.cc @@ -148,9 +148,9 @@ class TransactionLoader::Executor { intents_iterator_.Reset(); RETURN_NOT_OK(CheckForShutdown()); - context().CompleteLoad([this] { - loader_.state_ = TransactionLoaderState::kCompleted; - }); + + loader_.state_ = TransactionLoaderState::kCompleted; + { // We need to lock and unlock the mutex here to avoid missing a notification in WaitLoaded // and WaitAllLoaded. The waiting loop in those functions is equivalent to the following, diff --git a/src/yb/tablet/transaction_loader.h b/src/yb/tablet/transaction_loader.h index 8e2b00954e8e..d48fe1547413 100644 --- a/src/yb/tablet/transaction_loader.h +++ b/src/yb/tablet/transaction_loader.h @@ -52,7 +52,6 @@ class TransactionLoaderContext { virtual TransactionStatusResolver& AddStatusResolver() = 0; virtual const std::string& LogPrefix() const = 0; - virtual void CompleteLoad(const std::function& functor) = 0; virtual void LoadTransaction( TransactionMetadata&& metadata, TransactionalBatchData&& last_batch_data, diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index 220c179be244..336e3cba874b 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -149,6 +149,12 @@ DEFINE_test_flag(double, txn_participant_error_on_load, 0.0, "Probability that the participant would error on call to SetDB before launching " "the transaction loader thread."); +DEFINE_test_flag(bool, skip_process_apply, false, + "If true, ProcessApply will be skipped"); + +DEFINE_test_flag(bool, load_transactions_sync, false, + "If true, the test will block until the loader has finished loading all txns."); + namespace yb { namespace tablet { @@ -815,6 +821,9 @@ class TransactionParticipant::Impl } Status ProcessApply(const TransactionApplyData& data) { + if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_process_apply))) { + return Status::OK(); + } VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString(); RETURN_NOT_OK(loader_.WaitLoaded(data.transaction_id)); @@ -1024,6 +1033,10 @@ class TransactionParticipant::Impl return STATUS_FORMAT(InternalError, "Flag TEST_txn_participant_error_on_load set."); } loader_.Start(pending_op_counter_blocking_rocksdb_shutdown_start, db_); + if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_load_transactions_sync))) { + RETURN_NOT_OK(loader_.WaitAllLoaded()); + std::this_thread::sleep_for(500ms); + } return Status::OK(); } @@ -1398,13 +1411,6 @@ class TransactionParticipant::Impl > > Transactions; - void CompleteLoad(const std::function& functor) override { - MinRunningNotifier min_running_notifier(&applier_); - std::lock_guard lock(mutex_); - functor(); - TransactionsModifiedUnlocked(&min_running_notifier); - } - void LoadFinished(Status load_status) EXCLUDES(status_resolvers_mutex_) override { // The start_latch will be hit either from a CountDown from Start, or from Shutdown, so make // sure that at the end of Load, we unblock shutdown. @@ -1442,24 +1448,29 @@ class TransactionParticipant::Impl std::this_thread::sleep_for(10ms); } - if (!pending_applies.empty()) { - LOG_WITH_PREFIX(INFO) + { + LOG_IF_WITH_PREFIX(INFO, !pending_applies.empty()) << __func__ << ": starting " << pending_applies.size() << " pending applies"; + MinRunningNotifier min_running_notifier(&applier_); std::lock_guard lock(mutex_); - size_t idx = 0; - for (const auto& p : pending_applies) { - auto it = transactions_.find(p.first); - if (it == transactions_.end()) { - LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first); - continue; - } + if (!pending_applies.empty()) { + size_t idx = 0; + for (const auto& p : pending_applies) { + auto it = transactions_.find(p.first); + if (it == transactions_.end()) { + LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first); + continue; + } - TransactionApplyData apply_data; - apply_data.transaction_id = p.first; - apply_data.commit_ht = p.second.commit_ht; - (**it).SetApplyData(p.second.state, &apply_data, &operations[idx]); - ++idx; + TransactionApplyData apply_data; + apply_data.transaction_id = p.first; + apply_data.commit_ht = p.second.commit_ht; + (**it).SetApplyData(p.second.state, &apply_data, &operations[idx]); + ++idx; + } } + transactions_loaded_ = true; + TransactionsModifiedUnlocked(&min_running_notifier); } { @@ -1481,7 +1492,7 @@ class TransactionParticipant::Impl void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { metric_transactions_running_->set_value(transactions_.size()); - if (auto res = loader_.Completed(); !res.ok() || !(*res)) { + if (!transactions_loaded_) { return; } @@ -2169,6 +2180,8 @@ class TransactionParticipant::Impl std::shared_ptr mem_tracker_ GUARDED_BY(mutex_); + bool transactions_loaded_ GUARDED_BY(mutex_) = false; + bool pending_applied_notified_ = false; std::mutex pending_applies_mutex_; std::vector> pending_applies_