From 8b23a4eef1a835bd144ab134f31b5e75189801fd Mon Sep 17 00:00:00 2001 From: Eric Sheng Date: Wed, 5 Jun 2024 14:05:41 -0700 Subject: [PATCH] [#21741] docdb: Filter intents by min running hybrid time during tablet bootstrap Summary: When CDC streams are lagging, there may be a large number of intent SST files whose contents have all been applied already, but must be maintained for CDC purposes. We work around the performance implications of having a large number of these files by filtering out SST files by min running hybrid time (D33131 / 97536b4a021633f374a90b03a7595e3a1945059b), but this approach does not work as is for bootstrap, since min running hybrid time is not currently determined until bootstrap has finished. This change adds the saving of min running hybrid time periodically with retryable requests state, and then loads this min running hybrid time into the transaction participant early in bootstrap, to allow the SST file filter used in D33131 / 97536b4a021633f374a90b03a7595e3a1945059b to be used at bootstrap time as well. To avoid reintroducing the issue introduced by D34389 / 2458c086b0e25488cca4b0ecd73f7c2d1d7ff6e4, this diff also removes the requirement that min running hybrid time must not be set before bootstrap, by moving the requirement to `transactions_loaded_`. **Upgrade/Rollback safety:** This change is not guarded by a gflag or autoflag. If the newly added min running hybrid time field is missing (upgrade), we do not apply a filter (the current behavior), and the presence of the optional protobuf field when downgrading is entirely ignored (the old behavior is to unconditionally not apply a filter). There are no correctness issues involved with either applying or not applying the filter, as it is entirely a performance optimization. Jira: DB-10615 Test Plan: Jenkins Reviewers: yyan, qhu Reviewed By: yyan, qhu Subscribers: rthallam, ybase Differential Revision: https://phorge.dev.yugabyte.com/D35639 --- src/yb/consensus/consensus.proto | 1 + src/yb/tablet/tablet_bootstrap.cc | 35 +++++++++----- .../tablet/tablet_bootstrap_state_manager.cc | 27 +++++++++++ .../tablet/tablet_bootstrap_state_manager.h | 25 ++++++++++ src/yb/tablet/tablet_peer.cc | 13 ++++++ src/yb/tablet/tablet_peer.h | 2 + src/yb/tablet/transaction_loader.cc | 13 ++++-- src/yb/tablet/transaction_loader.h | 1 + src/yb/tablet/transaction_participant.cc | 46 ++++++++++++++++--- src/yb/tablet/transaction_participant.h | 4 ++ 10 files changed, 146 insertions(+), 21 deletions(-) diff --git a/src/yb/consensus/consensus.proto b/src/yb/consensus/consensus.proto index db1995b10fbb..5db17252fda8 100644 --- a/src/yb/consensus/consensus.proto +++ b/src/yb/consensus/consensus.proto @@ -613,6 +613,7 @@ message ClientReplicatedRetryableRequestRangesPB { message TabletBootstrapStatePB { optional OpIdPB last_op_id = 1; repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2; + optional fixed64 min_running_ht = 3; } // A Raft implementation. diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index 1ec4aa02bd8c..ba5de137b9bc 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -502,7 +502,21 @@ class TabletBootstrap { VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString(); } - const bool has_blocks = VERIFY_RESULT(OpenTablet()); + std::optional bootstrap_state_pb = std::nullopt; + HybridTime min_running_ht = HybridTime::kInvalid; + if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) { + auto result = data_.bootstrap_state_manager->LoadFromDisk(); + if (result.ok()) { + bootstrap_state_pb = std::move(*result); + + const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state(); + min_running_ht = bootstrap_state.GetMinRunningHybridTime(); + } else if (!result.status().IsNotFound()) { + return result.status(); + } + } + + const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht)); if (data_.retryable_requests) { const auto retryable_request_timeout_secs = meta_->IsSysCatalog() @@ -513,15 +527,8 @@ class TabletBootstrap { } // Load retryable requests after metrics entity has been instantiated. - if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && - data_.bootstrap_retryable_requests && data_.retryable_requests && - data_.bootstrap_state_manager) { - auto result = data_.bootstrap_state_manager->LoadFromDisk(); - if (result.ok()) { - data_.retryable_requests->FromPB(*result); - } else if (!result.status().IsNotFound()) { - return result.status(); - } + if (bootstrap_state_pb && data_.bootstrap_retryable_requests && data_.retryable_requests) { + data_.retryable_requests->FromPB(*bootstrap_state_pb); } if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) { @@ -611,7 +618,7 @@ class TabletBootstrap { } // Sets result to true if there was any data on disk for this tablet. - Result OpenTablet() { + Result OpenTablet(HybridTime min_running_ht) { CleanupSnapshots(); // Use operator new instead of make_shared for creating the shared_ptr. That way, we would have // the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the @@ -622,6 +629,12 @@ class TabletBootstrap { // reference count drops to 0. With make_shared, there's a risk of a leaked weak_ptr holding up // the object's memory even after all shared_ptrs go out of scope. std::shared_ptr tablet(new Tablet(data_.tablet_init_data)); + + auto participant = tablet->transaction_participant(); + if (participant) { + participant->SetMinRunningHybridTimeLowerBound(min_running_ht); + } + // Doing nothing for now except opening a tablet locally. LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") { RETURN_NOT_OK(tablet->Open()); diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.cc b/src/yb/tablet/tablet_bootstrap_state_manager.cc index 7da17ce738df..ef0573e868a1 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.cc +++ b/src/yb/tablet/tablet_bootstrap_state_manager.cc @@ -26,6 +26,29 @@ namespace yb::tablet { +TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs): + min_running_ht_(rhs.min_running_ht_.load()) {} + +TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs): + min_running_ht_(rhs.min_running_ht_.load()) {} + +void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) { + min_running_ht_.store(rhs.min_running_ht_.load()); +} + +void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) { + min_running_ht_.store(rhs.min_running_ht_.load()); +} + +void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const { + pb->set_min_running_ht(min_running_ht_.load().ToUint64()); +} + +void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) { + min_running_ht_.store( + pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid); +} + TabletBootstrapStateManager::TabletBootstrapStateManager() { } TabletBootstrapStateManager::TabletBootstrapStateManager( @@ -55,8 +78,11 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co return Status::OK(); } + TabletBootstrapState bootstrap_state(bootstrap_state_); + consensus::TabletBootstrapStatePB pb; retryable_requests->ToPB(&pb); + bootstrap_state.ToPB(&pb); auto path = NewFilePath(); LOG(INFO) << "Saving bootstrap state up to " << pb.last_op_id() << " to " << path; @@ -88,6 +114,7 @@ Result TabletBootstrapStateManager::LoadFromD RETURN_NOT_OK_PREPEND( pb_util::ReadPBContainerFromPath(fs_manager()->env(), path, &pb), Format("Could not load bootstrap state from $0", path)); + bootstrap_state_.FromPB(pb); LOG(INFO) << Format("Loaded tablet ($0) bootstrap state " "(max_replicated_op_id_=$1) from $2", tablet_id_, pb.last_op_id(), path); diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.h b/src/yb/tablet/tablet_bootstrap_state_manager.h index 4950a963d6e8..dd7c60dd2be6 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.h +++ b/src/yb/tablet/tablet_bootstrap_state_manager.h @@ -30,6 +30,28 @@ namespace yb::tablet { +class TabletBootstrapState { + public: + TabletBootstrapState() = default; + ~TabletBootstrapState() = default; + + TabletBootstrapState(const TabletBootstrapState& rhs); + + TabletBootstrapState(TabletBootstrapState&& rhs); + void operator=(TabletBootstrapState&& rhs); + + void CopyFrom(const TabletBootstrapState& rhs); + + void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); } + HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); } + + void ToPB(consensus::TabletBootstrapStatePB* pb) const; + void FromPB(const consensus::TabletBootstrapStatePB& pb); + + private: + std::atomic min_running_ht_{HybridTime::kInvalid}; +}; + class TabletBootstrapStateManager { public: TabletBootstrapStateManager(); @@ -40,6 +62,8 @@ class TabletBootstrapStateManager { Status Init(); FsManager* fs_manager() const { return fs_manager_; } + const TabletBootstrapState& bootstrap_state() const { return bootstrap_state_; } + TabletBootstrapState& bootstrap_state() { return bootstrap_state_; } static std::string FilePath(const std::string& path) { return JoinPathSegments(path, FileName()); @@ -84,6 +108,7 @@ class TabletBootstrapStateManager { bool has_file_on_disk_ = false; FsManager* fs_manager_ = nullptr; std::string dir_; + TabletBootstrapState bootstrap_state_; static constexpr char kSuffixNew[] = ".NEW"; static constexpr char kTabletBootstrapStateFileName[] = "retryable_requests"; diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index 578ca0f54e5e..28942c1baa18 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -329,6 +329,12 @@ Status TabletPeer::InitTabletPeer( prepare_thread_ = std::make_unique(consensus_.get(), tablet_prepare_pool); + auto txn_participant = tablet_->transaction_participant(); + if (txn_participant) { + txn_participant->SetMinRunningHybridTimeUpdateCallback( + std::bind_front(&TabletPeer::MinRunningHybridTimeUpdated, this)); + } + // "Publish" the tablet object right before releasing the lock. tablet_obj_state_.store(TabletObjectState::kAvailable, std::memory_order_release); } @@ -1816,6 +1822,13 @@ TabletBootstrapFlushState TabletPeer::TEST_TabletBootstrapStateFlusherState() co : TabletBootstrapFlushState::kFlushIdle; } +void TabletPeer::MinRunningHybridTimeUpdated(HybridTime min_running_ht) { + if (min_running_ht && min_running_ht != HybridTime::kMax) { + VLOG_WITH_PREFIX(2) << "Min running hybrid time updated: " << min_running_ht; + bootstrap_state_manager_->bootstrap_state().SetMinRunningHybridTime(min_running_ht); + } +} + Preparer* TabletPeer::DEBUG_GetPreparer() { return prepare_thread_.get(); } bool TabletPeer::HasSufficientDiskSpaceForWrite() { diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 4756ba413dcf..2c6fbb1e54db 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -600,6 +600,8 @@ class TabletPeer : public std::enable_shared_from_this, bool FlushBootstrapStateEnabled() const; + void MinRunningHybridTimeUpdated(HybridTime min_running_ht); + MetricRegistry* metric_registry_; bool IsLeader() override { diff --git a/src/yb/tablet/transaction_loader.cc b/src/yb/tablet/transaction_loader.cc index c332ce837a5b..cee93c9ddf07 100644 --- a/src/yb/tablet/transaction_loader.cc +++ b/src/yb/tablet/transaction_loader.cc @@ -17,6 +17,7 @@ #include "yb/dockv/intent.h" #include "yb/docdb/bounded_rocksdb_iterator.h" +#include "yb/docdb/doc_ql_filefilter.h" #include "yb/docdb/docdb_rocksdb_util.h" #include "yb/docdb/iter_util.h" @@ -51,11 +52,12 @@ namespace tablet { namespace { -docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) { +docdb::BoundedRocksDbIterator CreateFullScanIterator( + rocksdb::DB* db, std::shared_ptr filter) { return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator( db, &docdb::KeyBounds::kNoBounds, docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, - /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId)); + /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId, filter)); } } // namespace @@ -75,8 +77,11 @@ class TransactionLoader::Executor { if (!scoped_pending_operation_.ok()) { return false; } - regular_iterator_ = CreateFullScanIterator(db.regular); - intents_iterator_ = CreateFullScanIterator(db.intents); + auto min_running_ht = context().MinRunningHybridTime(); + VLOG_WITH_PREFIX(1) << "TransactionLoader min_running_ht: " << min_running_ht; + regular_iterator_ = CreateFullScanIterator(db.regular, nullptr /* filter */); + intents_iterator_ = CreateFullScanIterator(db.intents, + docdb::CreateIntentHybridTimeFileFilter(min_running_ht)); loader_.state_.store(TransactionLoaderState::kLoading, std::memory_order_release); CHECK_OK(yb::Thread::Create( "transaction_loader", "loader", &Executor::Execute, this, &loader_.load_thread_)) diff --git a/src/yb/tablet/transaction_loader.h b/src/yb/tablet/transaction_loader.h index d48fe1547413..e317cbca9c8e 100644 --- a/src/yb/tablet/transaction_loader.h +++ b/src/yb/tablet/transaction_loader.h @@ -58,6 +58,7 @@ class TransactionLoaderContext { OneWayBitmap&& replicated_batches, const ApplyStateWithCommitHt* pending_apply) = 0; virtual void LoadFinished(Status load_status) = 0; + virtual HybridTime MinRunningHybridTime() = 0; }; YB_DEFINE_ENUM(TransactionLoaderState, (kNotStarted)(kLoading)(kCompleted)(kFailed)); diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index d5a31b3371b3..a1e6079794ae 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -1073,9 +1073,21 @@ class TransactionParticipant::Impl return &participant_context_; } - HybridTime MinRunningHybridTime() { + void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { + if (lower_bound == HybridTime::kMax || lower_bound == HybridTime::kInvalid) { + return; + } + HybridTime current_ht = min_running_ht_.load(std::memory_order_acquire); + while ((!current_ht || current_ht < lower_bound) + && !min_running_ht_.compare_exchange_weak(current_ht, lower_bound)) {} + VLOG_WITH_PREFIX(1) << "Updated min running hybrid time to at least " << lower_bound + << ", was " << current_ht; + } + + HybridTime MinRunningHybridTime() override { auto result = min_running_ht_.load(std::memory_order_acquire); - if (result == HybridTime::kMax || result == HybridTime::kInvalid) { + if (result == HybridTime::kMax || result == HybridTime::kInvalid + || !transactions_loaded_.load()) { return result; } auto now = CoarseMonoClock::now(); @@ -1238,6 +1250,11 @@ class TransactionParticipant::Impl return transactions_.size(); } + void SetMinRunningHybridTimeUpdateCallback(std::function callback) { + std::lock_guard lock(mutex_); + min_running_ht_callback_ = std::move(callback); + } + OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) { std::lock_guard lock(mutex_); auto it = transactions_.find(id); @@ -1467,7 +1484,7 @@ class TransactionParticipant::Impl ++idx; } } - transactions_loaded_ = true; + transactions_loaded_.store(true); TransactionsModifiedUnlocked(&min_running_notifier); } @@ -1488,6 +1505,13 @@ class TransactionParticipant::Impl } } + void SetMinRunningHybridTime(HybridTime min_running_ht) REQUIRES(mutex_) { + min_running_ht_.store(min_running_ht, std::memory_order_release); + if (min_running_ht_callback_) { + min_running_ht_callback_(min_running_ht); + } + } + void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { metric_transactions_running_->set_value(transactions_.size()); if (!transactions_loaded_) { @@ -1495,14 +1519,14 @@ class TransactionParticipant::Impl } if (transactions_.empty()) { - min_running_ht_.store(HybridTime::kMax, std::memory_order_release); + SetMinRunningHybridTime(HybridTime::kMax); CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier); return; } auto& first_txn = **transactions_.get().begin(); if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) { - min_running_ht_.store(first_txn.start_ht(), std::memory_order_release); + SetMinRunningHybridTime(first_txn.start_ht()); next_check_min_running_.store( CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms, std::memory_order_release); @@ -2157,6 +2181,7 @@ class TransactionParticipant::Impl std::atomic next_check_min_running_{CoarseTimePoint()}; HybridTime waiting_for_min_running_ht_ = HybridTime::kMax; std::atomic shutdown_done_{false}; + std::function min_running_ht_callback_ GUARDED_BY(mutex_); LRUCache cleanup_cache_{FLAGS_transactions_cleanup_cache_size}; @@ -2178,7 +2203,7 @@ class TransactionParticipant::Impl std::shared_ptr mem_tracker_ GUARDED_BY(mutex_); - bool transactions_loaded_ GUARDED_BY(mutex_) = false; + std::atomic transactions_loaded_{false}; bool pending_applied_notified_ = false; std::mutex pending_applies_mutex_; @@ -2303,6 +2328,10 @@ TransactionParticipantContext* TransactionParticipant::context() const { return impl_->participant_context(); } +void TransactionParticipant::SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { + impl_->SetMinRunningHybridTimeLowerBound(lower_bound); +} + HybridTime TransactionParticipant::MinRunningHybridTime() const { return impl_->MinRunningHybridTime(); } @@ -2412,5 +2441,10 @@ void TransactionParticipant::RecordConflictResolutionScanLatency(MonoDelta laten impl_->RecordConflictResolutionScanLatency(latency); } +void TransactionParticipant::SetMinRunningHybridTimeUpdateCallback( + std::function callback) { + impl_->SetMinRunningHybridTimeUpdateCallback(std::move(callback)); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index c0e66179b8c0..5079ae79702c 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -185,6 +185,8 @@ class TransactionParticipant : public TransactionStatusManager { TransactionParticipantContext* context() const; + void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound); + HybridTime MinRunningHybridTime() const override; Result WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override; @@ -238,6 +240,8 @@ class TransactionParticipant : public TransactionStatusManager { size_t GetNumRunningTransactions() const; + void SetMinRunningHybridTimeUpdateCallback(std::function callback); + struct CountIntentsResult { size_t num_intents; size_t num_transactions;