diff --git a/src/yb/consensus/consensus.proto b/src/yb/consensus/consensus.proto index db1995b10fbb..5db17252fda8 100644 --- a/src/yb/consensus/consensus.proto +++ b/src/yb/consensus/consensus.proto @@ -613,6 +613,7 @@ message ClientReplicatedRetryableRequestRangesPB { message TabletBootstrapStatePB { optional OpIdPB last_op_id = 1; repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2; + optional fixed64 min_running_ht = 3; } // A Raft implementation. diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index 1ec4aa02bd8c..ba5de137b9bc 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -502,7 +502,21 @@ class TabletBootstrap { VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString(); } - const bool has_blocks = VERIFY_RESULT(OpenTablet()); + std::optional bootstrap_state_pb = std::nullopt; + HybridTime min_running_ht = HybridTime::kInvalid; + if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) { + auto result = data_.bootstrap_state_manager->LoadFromDisk(); + if (result.ok()) { + bootstrap_state_pb = std::move(*result); + + const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state(); + min_running_ht = bootstrap_state.GetMinRunningHybridTime(); + } else if (!result.status().IsNotFound()) { + return result.status(); + } + } + + const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht)); if (data_.retryable_requests) { const auto retryable_request_timeout_secs = meta_->IsSysCatalog() @@ -513,15 +527,8 @@ class TabletBootstrap { } // Load retryable requests after metrics entity has been instantiated. - if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && - data_.bootstrap_retryable_requests && data_.retryable_requests && - data_.bootstrap_state_manager) { - auto result = data_.bootstrap_state_manager->LoadFromDisk(); - if (result.ok()) { - data_.retryable_requests->FromPB(*result); - } else if (!result.status().IsNotFound()) { - return result.status(); - } + if (bootstrap_state_pb && data_.bootstrap_retryable_requests && data_.retryable_requests) { + data_.retryable_requests->FromPB(*bootstrap_state_pb); } if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) { @@ -611,7 +618,7 @@ class TabletBootstrap { } // Sets result to true if there was any data on disk for this tablet. - Result OpenTablet() { + Result OpenTablet(HybridTime min_running_ht) { CleanupSnapshots(); // Use operator new instead of make_shared for creating the shared_ptr. That way, we would have // the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the @@ -622,6 +629,12 @@ class TabletBootstrap { // reference count drops to 0. With make_shared, there's a risk of a leaked weak_ptr holding up // the object's memory even after all shared_ptrs go out of scope. std::shared_ptr tablet(new Tablet(data_.tablet_init_data)); + + auto participant = tablet->transaction_participant(); + if (participant) { + participant->SetMinRunningHybridTimeLowerBound(min_running_ht); + } + // Doing nothing for now except opening a tablet locally. LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") { RETURN_NOT_OK(tablet->Open()); diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.cc b/src/yb/tablet/tablet_bootstrap_state_manager.cc index 7da17ce738df..ef0573e868a1 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.cc +++ b/src/yb/tablet/tablet_bootstrap_state_manager.cc @@ -26,6 +26,29 @@ namespace yb::tablet { +TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs): + min_running_ht_(rhs.min_running_ht_.load()) {} + +TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs): + min_running_ht_(rhs.min_running_ht_.load()) {} + +void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) { + min_running_ht_.store(rhs.min_running_ht_.load()); +} + +void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) { + min_running_ht_.store(rhs.min_running_ht_.load()); +} + +void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const { + pb->set_min_running_ht(min_running_ht_.load().ToUint64()); +} + +void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) { + min_running_ht_.store( + pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid); +} + TabletBootstrapStateManager::TabletBootstrapStateManager() { } TabletBootstrapStateManager::TabletBootstrapStateManager( @@ -55,8 +78,11 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co return Status::OK(); } + TabletBootstrapState bootstrap_state(bootstrap_state_); + consensus::TabletBootstrapStatePB pb; retryable_requests->ToPB(&pb); + bootstrap_state.ToPB(&pb); auto path = NewFilePath(); LOG(INFO) << "Saving bootstrap state up to " << pb.last_op_id() << " to " << path; @@ -88,6 +114,7 @@ Result TabletBootstrapStateManager::LoadFromD RETURN_NOT_OK_PREPEND( pb_util::ReadPBContainerFromPath(fs_manager()->env(), path, &pb), Format("Could not load bootstrap state from $0", path)); + bootstrap_state_.FromPB(pb); LOG(INFO) << Format("Loaded tablet ($0) bootstrap state " "(max_replicated_op_id_=$1) from $2", tablet_id_, pb.last_op_id(), path); diff --git a/src/yb/tablet/tablet_bootstrap_state_manager.h b/src/yb/tablet/tablet_bootstrap_state_manager.h index 4950a963d6e8..dd7c60dd2be6 100644 --- a/src/yb/tablet/tablet_bootstrap_state_manager.h +++ b/src/yb/tablet/tablet_bootstrap_state_manager.h @@ -30,6 +30,28 @@ namespace yb::tablet { +class TabletBootstrapState { + public: + TabletBootstrapState() = default; + ~TabletBootstrapState() = default; + + TabletBootstrapState(const TabletBootstrapState& rhs); + + TabletBootstrapState(TabletBootstrapState&& rhs); + void operator=(TabletBootstrapState&& rhs); + + void CopyFrom(const TabletBootstrapState& rhs); + + void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); } + HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); } + + void ToPB(consensus::TabletBootstrapStatePB* pb) const; + void FromPB(const consensus::TabletBootstrapStatePB& pb); + + private: + std::atomic min_running_ht_{HybridTime::kInvalid}; +}; + class TabletBootstrapStateManager { public: TabletBootstrapStateManager(); @@ -40,6 +62,8 @@ class TabletBootstrapStateManager { Status Init(); FsManager* fs_manager() const { return fs_manager_; } + const TabletBootstrapState& bootstrap_state() const { return bootstrap_state_; } + TabletBootstrapState& bootstrap_state() { return bootstrap_state_; } static std::string FilePath(const std::string& path) { return JoinPathSegments(path, FileName()); @@ -84,6 +108,7 @@ class TabletBootstrapStateManager { bool has_file_on_disk_ = false; FsManager* fs_manager_ = nullptr; std::string dir_; + TabletBootstrapState bootstrap_state_; static constexpr char kSuffixNew[] = ".NEW"; static constexpr char kTabletBootstrapStateFileName[] = "retryable_requests"; diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index 578ca0f54e5e..28942c1baa18 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -329,6 +329,12 @@ Status TabletPeer::InitTabletPeer( prepare_thread_ = std::make_unique(consensus_.get(), tablet_prepare_pool); + auto txn_participant = tablet_->transaction_participant(); + if (txn_participant) { + txn_participant->SetMinRunningHybridTimeUpdateCallback( + std::bind_front(&TabletPeer::MinRunningHybridTimeUpdated, this)); + } + // "Publish" the tablet object right before releasing the lock. tablet_obj_state_.store(TabletObjectState::kAvailable, std::memory_order_release); } @@ -1816,6 +1822,13 @@ TabletBootstrapFlushState TabletPeer::TEST_TabletBootstrapStateFlusherState() co : TabletBootstrapFlushState::kFlushIdle; } +void TabletPeer::MinRunningHybridTimeUpdated(HybridTime min_running_ht) { + if (min_running_ht && min_running_ht != HybridTime::kMax) { + VLOG_WITH_PREFIX(2) << "Min running hybrid time updated: " << min_running_ht; + bootstrap_state_manager_->bootstrap_state().SetMinRunningHybridTime(min_running_ht); + } +} + Preparer* TabletPeer::DEBUG_GetPreparer() { return prepare_thread_.get(); } bool TabletPeer::HasSufficientDiskSpaceForWrite() { diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 4756ba413dcf..2c6fbb1e54db 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -600,6 +600,8 @@ class TabletPeer : public std::enable_shared_from_this, bool FlushBootstrapStateEnabled() const; + void MinRunningHybridTimeUpdated(HybridTime min_running_ht); + MetricRegistry* metric_registry_; bool IsLeader() override { diff --git a/src/yb/tablet/transaction_loader.cc b/src/yb/tablet/transaction_loader.cc index c332ce837a5b..cee93c9ddf07 100644 --- a/src/yb/tablet/transaction_loader.cc +++ b/src/yb/tablet/transaction_loader.cc @@ -17,6 +17,7 @@ #include "yb/dockv/intent.h" #include "yb/docdb/bounded_rocksdb_iterator.h" +#include "yb/docdb/doc_ql_filefilter.h" #include "yb/docdb/docdb_rocksdb_util.h" #include "yb/docdb/iter_util.h" @@ -51,11 +52,12 @@ namespace tablet { namespace { -docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) { +docdb::BoundedRocksDbIterator CreateFullScanIterator( + rocksdb::DB* db, std::shared_ptr filter) { return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator( db, &docdb::KeyBounds::kNoBounds, docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, - /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId)); + /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId, filter)); } } // namespace @@ -75,8 +77,11 @@ class TransactionLoader::Executor { if (!scoped_pending_operation_.ok()) { return false; } - regular_iterator_ = CreateFullScanIterator(db.regular); - intents_iterator_ = CreateFullScanIterator(db.intents); + auto min_running_ht = context().MinRunningHybridTime(); + VLOG_WITH_PREFIX(1) << "TransactionLoader min_running_ht: " << min_running_ht; + regular_iterator_ = CreateFullScanIterator(db.regular, nullptr /* filter */); + intents_iterator_ = CreateFullScanIterator(db.intents, + docdb::CreateIntentHybridTimeFileFilter(min_running_ht)); loader_.state_.store(TransactionLoaderState::kLoading, std::memory_order_release); CHECK_OK(yb::Thread::Create( "transaction_loader", "loader", &Executor::Execute, this, &loader_.load_thread_)) diff --git a/src/yb/tablet/transaction_loader.h b/src/yb/tablet/transaction_loader.h index d48fe1547413..e317cbca9c8e 100644 --- a/src/yb/tablet/transaction_loader.h +++ b/src/yb/tablet/transaction_loader.h @@ -58,6 +58,7 @@ class TransactionLoaderContext { OneWayBitmap&& replicated_batches, const ApplyStateWithCommitHt* pending_apply) = 0; virtual void LoadFinished(Status load_status) = 0; + virtual HybridTime MinRunningHybridTime() = 0; }; YB_DEFINE_ENUM(TransactionLoaderState, (kNotStarted)(kLoading)(kCompleted)(kFailed)); diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index d5a31b3371b3..a1e6079794ae 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -1073,9 +1073,21 @@ class TransactionParticipant::Impl return &participant_context_; } - HybridTime MinRunningHybridTime() { + void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { + if (lower_bound == HybridTime::kMax || lower_bound == HybridTime::kInvalid) { + return; + } + HybridTime current_ht = min_running_ht_.load(std::memory_order_acquire); + while ((!current_ht || current_ht < lower_bound) + && !min_running_ht_.compare_exchange_weak(current_ht, lower_bound)) {} + VLOG_WITH_PREFIX(1) << "Updated min running hybrid time to at least " << lower_bound + << ", was " << current_ht; + } + + HybridTime MinRunningHybridTime() override { auto result = min_running_ht_.load(std::memory_order_acquire); - if (result == HybridTime::kMax || result == HybridTime::kInvalid) { + if (result == HybridTime::kMax || result == HybridTime::kInvalid + || !transactions_loaded_.load()) { return result; } auto now = CoarseMonoClock::now(); @@ -1238,6 +1250,11 @@ class TransactionParticipant::Impl return transactions_.size(); } + void SetMinRunningHybridTimeUpdateCallback(std::function callback) { + std::lock_guard lock(mutex_); + min_running_ht_callback_ = std::move(callback); + } + OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) { std::lock_guard lock(mutex_); auto it = transactions_.find(id); @@ -1467,7 +1484,7 @@ class TransactionParticipant::Impl ++idx; } } - transactions_loaded_ = true; + transactions_loaded_.store(true); TransactionsModifiedUnlocked(&min_running_notifier); } @@ -1488,6 +1505,13 @@ class TransactionParticipant::Impl } } + void SetMinRunningHybridTime(HybridTime min_running_ht) REQUIRES(mutex_) { + min_running_ht_.store(min_running_ht, std::memory_order_release); + if (min_running_ht_callback_) { + min_running_ht_callback_(min_running_ht); + } + } + void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { metric_transactions_running_->set_value(transactions_.size()); if (!transactions_loaded_) { @@ -1495,14 +1519,14 @@ class TransactionParticipant::Impl } if (transactions_.empty()) { - min_running_ht_.store(HybridTime::kMax, std::memory_order_release); + SetMinRunningHybridTime(HybridTime::kMax); CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier); return; } auto& first_txn = **transactions_.get().begin(); if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) { - min_running_ht_.store(first_txn.start_ht(), std::memory_order_release); + SetMinRunningHybridTime(first_txn.start_ht()); next_check_min_running_.store( CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms, std::memory_order_release); @@ -2157,6 +2181,7 @@ class TransactionParticipant::Impl std::atomic next_check_min_running_{CoarseTimePoint()}; HybridTime waiting_for_min_running_ht_ = HybridTime::kMax; std::atomic shutdown_done_{false}; + std::function min_running_ht_callback_ GUARDED_BY(mutex_); LRUCache cleanup_cache_{FLAGS_transactions_cleanup_cache_size}; @@ -2178,7 +2203,7 @@ class TransactionParticipant::Impl std::shared_ptr mem_tracker_ GUARDED_BY(mutex_); - bool transactions_loaded_ GUARDED_BY(mutex_) = false; + std::atomic transactions_loaded_{false}; bool pending_applied_notified_ = false; std::mutex pending_applies_mutex_; @@ -2303,6 +2328,10 @@ TransactionParticipantContext* TransactionParticipant::context() const { return impl_->participant_context(); } +void TransactionParticipant::SetMinRunningHybridTimeLowerBound(HybridTime lower_bound) { + impl_->SetMinRunningHybridTimeLowerBound(lower_bound); +} + HybridTime TransactionParticipant::MinRunningHybridTime() const { return impl_->MinRunningHybridTime(); } @@ -2412,5 +2441,10 @@ void TransactionParticipant::RecordConflictResolutionScanLatency(MonoDelta laten impl_->RecordConflictResolutionScanLatency(latency); } +void TransactionParticipant::SetMinRunningHybridTimeUpdateCallback( + std::function callback) { + impl_->SetMinRunningHybridTimeUpdateCallback(std::move(callback)); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index c0e66179b8c0..5079ae79702c 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -185,6 +185,8 @@ class TransactionParticipant : public TransactionStatusManager { TransactionParticipantContext* context() const; + void SetMinRunningHybridTimeLowerBound(HybridTime lower_bound); + HybridTime MinRunningHybridTime() const override; Result WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override; @@ -238,6 +240,8 @@ class TransactionParticipant : public TransactionStatusManager { size_t GetNumRunningTransactions() const; + void SetMinRunningHybridTimeUpdateCallback(std::function callback); + struct CountIntentsResult { size_t num_intents; size_t num_transactions;