From 3d97247b571de8eba8ae13ee7ca61564e167f934 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Fri, 21 Aug 2020 00:10:13 +0000 Subject: [PATCH 1/8] tls: refactor to synchronize slot removal on workers At the expense of startup complexity, this removes the bookkeeper deletion model for slots, and blocks slot destruction until callbacks have been flushed on workers. This removes an entire class of bugs in which a user captures something in the lambda which get destroyed before the TLS operations run on the workers. Signed-off-by: Matt Klein --- include/envoy/thread_local/thread_local.h | 6 + .../common/thread_local/thread_local_impl.cc | 188 ++++++++---------- .../common/thread_local/thread_local_impl.h | 57 ++---- .../grpc/http_grpc_access_log_impl.cc | 2 +- .../grpc/http_grpc_access_log_impl.h | 2 +- source/server/server.cc | 1 + .../thread_local/thread_local_impl_test.cc | 42 ---- .../extensions/filters/common/lua/lua_test.cc | 7 +- test/mocks/thread_local/mocks.h | 1 + 9 files changed, 123 insertions(+), 183 deletions(-) diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 683617634a20..63ae8177b91e 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -121,6 +121,12 @@ class Instance : public SlotAllocator { */ virtual void registerThread(Event::Dispatcher& dispatcher, bool main_thread) PURE; + /** + * This should be called by the main thread before any worker threads start to run. This will + * ensure that threaded slot updates are properly synchronized with slot removal. + */ + virtual void startGlobalThreading() PURE; + /** * This should be called by the main thread before any worker threads start to exit. This will * block TLS removal during slot destruction, given that worker threads are about to call diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index d4d02f8b2f5f..e4356fb1d117 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -17,26 +17,77 @@ thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_; InstanceImpl::~InstanceImpl() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(shutdown_); + ASSERT(state_ == State::Shutdown); thread_local_data_.data_.clear(); } SlotPtr InstanceImpl::allocateSlot() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(!shutdown_); + ASSERT(state_ != State::Shutdown); if (free_slot_indexes_.empty()) { SlotImplPtr slot(new SlotImpl(*this, slots_.size())); - auto wrapper = std::make_unique(*this, std::move(slot)); - slots_.push_back(wrapper->slot_.get()); - return wrapper; + slots_.push_back(slot.get()); + return slot; } const uint32_t idx = free_slot_indexes_.front(); free_slot_indexes_.pop_front(); ASSERT(idx < slots_.size()); SlotImplPtr slot(new SlotImpl(*this, idx)); slots_[idx] = slot.get(); - return std::make_unique(*this, std::move(slot)); + return slot; +} + +InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint64_t index) + : parent_(parent), index_(index), still_alive_guard_(std::make_shared()), + active_callback_guard_( + new bool, + [&parent, still_alive_guard = std::weak_ptr(still_alive_guard_), this](bool* b) { + // If TLS is in the running state, signal that this slot is ready to be destroyed. This + // may happen on a worker when callbacks are run. We do not do this check during: + // a) initializing: there are error flows during initialization in which it is too + // complicated to destroy all pending callbacks before the slot is destroyed. Given + // that no workers are running there is nothing to synchronize anyway. + // b) shutting down: no slots will be destroyed between the time shutdown starts and all + // workers exit, so there is nothing to synchronize once all workers have exited. + // Additionally, there are initialization failure cases in which we move directly + // from initializing to shutting down, so for similar reasons to (a) we don't + // synchronize. + // c) when we can't lock the still alive guard. It's only possible for that to happen + // when a slot was destroyed before we entered the running state (during xDS init). + // This is guaranteed by the blocking in slot destruction. Thus, no further + // synchronization is needed. + if (parent.state_ == State::Running && still_alive_guard.lock()) { + absl::MutexLock lock(&shutdown_mutex_); + ready_to_destroy_ = true; + } + + delete b; + }) {} + +InstanceImpl::SlotImpl::~SlotImpl() { + // Reset active_callback_guard_ so the only active references will be held by pending callbacks. + active_callback_guard_.reset(); + // If running only, synchronize slot removal with all callbacks being drained from workers. + // See the constructor for why we only do this in the running state. + if (parent_.state_ == State::Running) { + absl::MutexLock lock(&shutdown_mutex_); + shutdown_mutex_.Await(absl::Condition(&ready_to_destroy_)); + } + + parent_.removeSlot(*this); +} + +Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { + // Capture active_callback_guard_ so that we can synchronize slot destruction. See ~SlotImpl(). + // Also capture still_alive_guard_ so we can verify the slot is still alive as it might have + // been destroyed pre-running state. See SlotImpl(). + return [active_callback_guard = active_callback_guard_, + still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { + if (still_alive_guard.lock()) { + cb(); + } + }; } bool InstanceImpl::SlotImpl::currentThreadRegistered() { @@ -44,66 +95,43 @@ bool InstanceImpl::SlotImpl::currentThreadRegistered() { } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { - parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }); + runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); -} - -ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { - ASSERT(currentThreadRegistered()); - return thread_local_data_.data_[index_]; + runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); } -InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, SlotImplPtr&& slot) - : parent_(parent), slot_(std::move(slot)), - ref_count_(/*not used.*/ nullptr, - [slot = slot_.get(), &parent = this->parent_](uint32_t* /* not used */) { - // On destruction, post a cleanup callback on main thread, this could happen on - // any thread. - parent.scheduleCleanup(slot); - }) {} - -ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot_->get(); } - -void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - slot_->runOnAllThreads( - [cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) { - return cb(std::move(previous)); - }, - complete_cb); +void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { + parent_.runOnAllThreads(wrapCallback(cb)); } -void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb) { - slot_->runOnAllThreads([cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) { - return cb(std::move(previous)); - }); +void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { + parent_.runOnAllThreads(wrapCallback(cb), main_callback); } -bool InstanceImpl::Bookkeeper::currentThreadRegistered() { - return slot_->currentThreadRegistered(); -} +void InstanceImpl::SlotImpl::set(InitializeCb cb) { + ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); + ASSERT(parent_.state_ != State::Shutdown); -void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) { - // Use ref_count_ to bookkeep how many on-the-fly callback are out there. - slot_->runOnAllThreads([cb, ref_count = this->ref_count_]() { cb(); }); -} + for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { + const uint32_t index = index_; + dispatcher.post(wrapCallback( + [index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); })); + } -void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - // Use ref_count_ to bookkeep how many on-the-fly callback are out there. - slot_->runOnAllThreads([cb, main_callback, ref_count = this->ref_count_]() { cb(); }, - main_callback); + // Handle main thread. + setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); } -void InstanceImpl::Bookkeeper::set(InitializeCb cb) { - slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher) - -> ThreadLocalObjectSharedPtr { return cb(dispatcher); }); +ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { + ASSERT(currentThreadRegistered()); + return thread_local_data_.data_[index_]; } void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(!shutdown_); + ASSERT(state_ != State::Shutdown); if (main_thread) { main_thread_dispatcher_ = &dispatcher; @@ -115,38 +143,6 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa } } -// Puts the slot into a deferred delete container, the slot will be destructed when its out-going -// callback reference count goes to 0. -void InstanceImpl::recycle(SlotImplPtr&& slot) { - ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(slot != nullptr); - auto* slot_addr = slot.get(); - deferred_deletes_.insert({slot_addr, std::move(slot)}); -} - -// Called by the Bookkeeper ref_count destructor, the SlotImpl in the deferred deletes map can be -// destructed now. -void InstanceImpl::scheduleCleanup(SlotImpl* slot) { - if (shutdown_) { - // If server is shutting down, do nothing here. - // The destruction of Bookkeeper has already transferred the SlotImpl to the deferred_deletes_ - // queue. No matter if this method is called from a Worker thread, the SlotImpl will be - // destructed on main thread when InstanceImpl destructs. - return; - } - if (std::this_thread::get_id() == main_thread_id_) { - // If called from main thread, save a callback. - ASSERT(deferred_deletes_.contains(slot)); - deferred_deletes_.erase(slot); - return; - } - main_thread_dispatcher_->post([slot, this]() { - ASSERT(deferred_deletes_.contains(slot)); - // The slot is guaranteed to be put into the deferred_deletes_ map by Bookkeeper destructor. - deferred_deletes_.erase(slot); - }); -} - void InstanceImpl::removeSlot(SlotImpl& slot) { ASSERT(std::this_thread::get_id() == main_thread_id_); @@ -154,7 +150,7 @@ void InstanceImpl::removeSlot(SlotImpl& slot) { // threads have already shut down and the dispatcher is no longer alive. There is also no reason // to do removal, because no allocations happen during shutdown and shutdownThread() will clean // things up on the other thread. - if (shutdown_) { + if (state_ == State::Shutdown) { return; } @@ -176,7 +172,7 @@ void InstanceImpl::removeSlot(SlotImpl& slot) { void InstanceImpl::runOnAllThreads(Event::PostCb cb) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(!shutdown_); + ASSERT(state_ != State::Shutdown); for (Event::Dispatcher& dispatcher : registered_threads_) { dispatcher.post(cb); @@ -188,7 +184,7 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) { void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(!shutdown_); + ASSERT(state_ != State::Shutdown); // Handle main thread first so that when the last worker thread wins, we could just call the // all_threads_complete_cb method. Parallelism of main thread execution is being traded off // for programming simplicity here. @@ -205,19 +201,6 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c } } -void InstanceImpl::SlotImpl::set(InitializeCb cb) { - ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); - ASSERT(!parent_.shutdown_); - - for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { - const uint32_t index = index_; - dispatcher.post([index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }); - } - - // Handle main thread. - setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); -} - void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) { if (thread_local_data_.data_.size() <= index) { thread_local_data_.data_.resize(index + 1); @@ -226,14 +209,19 @@ void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr obj thread_local_data_.data_[index] = object; } +void InstanceImpl::startGlobalThreading() { + ASSERT(state_ == State::Initializing); + state_ = State::Running; +} + void InstanceImpl::shutdownGlobalThreading() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(!shutdown_); - shutdown_ = true; + ASSERT(state_ != State::Shutdown); + state_ = State::Shutdown; } void InstanceImpl::shutdownThread() { - ASSERT(shutdown_); + ASSERT(state_ == State::Shutdown); // Destruction of slots is done in *reverse* order. This is so that filters and higher layer // things that are built on top of the cluster manager, stats, etc. will be destroyed before diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 71153107fb3d..bdfe61f1187b 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -27,61 +27,51 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub // ThreadLocal::Instance SlotPtr allocateSlot() override; void registerThread(Event::Dispatcher& dispatcher, bool main_thread) override; + void startGlobalThreading() override; void shutdownGlobalThreading() override; void shutdownThread() override; Event::Dispatcher& dispatcher() override; private: + enum class State { + Initializing, // TLS is initializing and no worker threads are running yet. + Running, // TLS is running with worker threads. + Shutdown // Worker threads are about to shut down. + }; + struct SlotImpl : public Slot { - SlotImpl(InstanceImpl& parent, uint64_t index) : parent_(parent), index_(index) {} - ~SlotImpl() override { parent_.removeSlot(*this); } + SlotImpl(InstanceImpl& parent, uint64_t index); + ~SlotImpl() override; + Event::PostCb wrapCallback(Event::PostCb cb); // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; bool currentThreadRegistered() override; void runOnAllThreads(const UpdateCb& cb) override; void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override; - void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } - void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { - parent_.runOnAllThreads(cb, main_callback); - } + void runOnAllThreads(Event::PostCb cb) override; + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override; void set(InitializeCb cb) override; InstanceImpl& parent_; const uint64_t index_; + // The following is used to safely verify via weak_ptr that this slot is still alive. + std::shared_ptr still_alive_guard_; + // The following is used to reference count active callbacks. When TLS is in the running + // state destruction will block until all callbacks have been flushed. + std::shared_ptr active_callback_guard_; + // The following are used to synchronize blocking for flushing while in the running state. + bool ready_to_destroy_ ABSL_GUARDED_BY(shutdown_mutex_); + absl::Mutex shutdown_mutex_; }; using SlotImplPtr = std::unique_ptr; - // A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue - // (detaches it). - struct Bookkeeper : public Slot { - Bookkeeper(InstanceImpl& parent, SlotImplPtr&& slot); - ~Bookkeeper() override { parent_.recycle(std::move(slot_)); } - - // ThreadLocal::Slot - ThreadLocalObjectSharedPtr get() override; - void runOnAllThreads(const UpdateCb& cb) override; - void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override; - bool currentThreadRegistered() override; - void runOnAllThreads(Event::PostCb cb) override; - void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override; - void set(InitializeCb cb) override; - - InstanceImpl& parent_; - SlotImplPtr slot_; - std::shared_ptr ref_count_; - }; - struct ThreadLocalData { Event::Dispatcher* dispatcher_{}; std::vector data_; }; - void recycle(SlotImplPtr&& slot); - // Cleanup the deferred deletes queue. - void scheduleCleanup(SlotImpl* slot); - void removeSlot(SlotImpl& slot); void runOnAllThreads(Event::PostCb cb); void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); @@ -89,11 +79,6 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub static thread_local ThreadLocalData thread_local_data_; - // A indexed container for Slots that has to be deferred to delete due to out-going callbacks - // pointing to the Slot. To let the ref_count_ deleter find the SlotImpl by address, the container - // is defined as a map of SlotImpl address to the unique_ptr. - absl::flat_hash_map deferred_deletes_; - std::vector slots_; // A list of index of freed slots. std::list free_slot_indexes_; @@ -101,7 +86,7 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub std::list> registered_threads_; std::thread::id main_thread_id_; Event::Dispatcher* main_thread_dispatcher_{}; - std::atomic shutdown_{}; + std::atomic state_{State::Initializing}; // Test only. friend class ThreadLocalInstanceImplTest; diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc index 92bc5e38ee73..9b3ec8bc1f63 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc @@ -29,7 +29,7 @@ HttpGrpcAccessLog::HttpGrpcAccessLog( ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache, Stats::Scope& scope) : Common::ImplBase(std::move(filter)), scope_(scope), config_(std::move(config)), - tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) { + access_logger_cache_(std::move(access_logger_cache)), tls_slot_(tls.allocateSlot()) { for (const auto& header : config_.additional_request_headers_to_log()) { request_headers_to_log_.emplace_back(header); } diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h index fcae58bd5f10..a1b7d99a3e54 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h @@ -51,12 +51,12 @@ class HttpGrpcAccessLog : public Common::ImplBase { Stats::Scope& scope_; const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config_; - const ThreadLocal::SlotPtr tls_slot_; const GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache_; std::vector request_headers_to_log_; std::vector response_headers_to_log_; std::vector response_trailers_to_log_; std::vector filter_states_to_log_; + const ThreadLocal::SlotPtr tls_slot_; // Destruct first to flush callbacks. }; using HttpGrpcAccessLogPtr = std::unique_ptr; diff --git a/source/server/server.cc b/source/server/server.cc index 912665143c98..f954d6e15ce0 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -551,6 +551,7 @@ void InstanceImpl::onRuntimeReady() { } void InstanceImpl::startWorkers() { + thread_local_.startGlobalThreading(); listener_manager_->startWorkers(*guard_dog_); initialization_timer_->complete(); // Update server stats as soon as initialization is done. diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 6747fa2eae99..5d7314e49e89 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -45,7 +45,6 @@ class ThreadLocalInstanceImplTest : public testing::Test { object.reset(); return object_ref; } - int deferredDeletesMapSize() { return tls_.deferred_deletes_.size(); } int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); } InstanceImpl tls_; @@ -60,7 +59,6 @@ TEST_F(ThreadLocalInstanceImplTest, All) { EXPECT_CALL(thread_dispatcher_, post(_)); SlotPtr slot1 = tls_.allocateSlot(); slot1.reset(); - EXPECT_EQ(deferredDeletesMapSize(), 0); EXPECT_EQ(freeSlotIndexesListSize(), 1); // Create a new slot which should take the place of the old slot. ReturnPointee() is used to @@ -86,52 +84,12 @@ TEST_F(ThreadLocalInstanceImplTest, All) { slot3.reset(); slot4.reset(); EXPECT_EQ(freeSlotIndexesListSize(), 0); - EXPECT_EQ(deferredDeletesMapSize(), 2); EXPECT_CALL(object_ref4, onDestroy()); EXPECT_CALL(object_ref3, onDestroy()); tls_.shutdownThread(); } -TEST_F(ThreadLocalInstanceImplTest, DeferredRecycle) { - InSequence s; - - // Free a slot without ever calling set. - EXPECT_CALL(thread_dispatcher_, post(_)); - SlotPtr slot1 = tls_.allocateSlot(); - slot1.reset(); - // Slot destructed directly, as there is no out-going callbacks. - EXPECT_EQ(deferredDeletesMapSize(), 0); - EXPECT_EQ(freeSlotIndexesListSize(), 1); - - // Allocate a slot and set value, hold the posted callback and the slot will only be returned - // after the held callback is destructed. - { - SlotPtr slot2 = tls_.allocateSlot(); - EXPECT_EQ(freeSlotIndexesListSize(), 0); - { - Event::PostCb holder; - EXPECT_CALL(thread_dispatcher_, post(_)).WillOnce(Invoke([&](Event::PostCb cb) { - // Holds the posted callback. - holder = cb; - })); - slot2->set( - [](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return nullptr; }); - slot2.reset(); - // Not released yet, as holder has a copy of the ref_count_. - EXPECT_EQ(freeSlotIndexesListSize(), 0); - EXPECT_EQ(deferredDeletesMapSize(), 1); - // This post is called when the holder dies. - EXPECT_CALL(thread_dispatcher_, post(_)); - } - // Slot is deleted now that there holder destructs. - EXPECT_EQ(deferredDeletesMapSize(), 0); - EXPECT_EQ(freeSlotIndexesListSize(), 1); - } - - tls_.shutdownGlobalThreading(); -} - // Test that the config passed into the update callback is the previous version stored in the slot. TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) { InSequence s; diff --git a/test/extensions/filters/common/lua/lua_test.cc b/test/extensions/filters/common/lua/lua_test.cc index 27c9e35f8cfe..0823bee4d852 100644 --- a/test/extensions/filters/common/lua/lua_test.cc +++ b/test/extensions/filters/common/lua/lua_test.cc @@ -192,9 +192,7 @@ TEST_F(ThreadSafeTest, StateDestructedBeforeWorkerRun) { state_->registerType(); main_dispatcher_->run(Event::Dispatcher::RunType::Block); - - // Destroy state_. - state_.reset(nullptr); + tls_.startGlobalThreading(); // Start a new worker thread to execute the callback functions in the worker dispatcher. Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread([this]() { @@ -202,6 +200,9 @@ TEST_F(ThreadSafeTest, StateDestructedBeforeWorkerRun) { // Verify we have the expected dispatcher for the new worker thread. EXPECT_EQ(worker_dispatcher_.get(), &tls_.dispatcher()); }); + + // Destroy state_. This will block until the worker callback runs or is destroyed. + state_.reset(nullptr); thread->join(); tls_.shutdownGlobalThreading(); diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index 9bbd26a64465..9e8162ae1a36 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -22,6 +22,7 @@ class MockInstance : public Instance { // Server::ThreadLocal MOCK_METHOD(SlotPtr, allocateSlot, ()); MOCK_METHOD(void, registerThread, (Event::Dispatcher & dispatcher, bool main_thread)); + MOCK_METHOD(void, startGlobalThreading, ()); MOCK_METHOD(void, shutdownGlobalThreading, ()); MOCK_METHOD(void, shutdownThread, ()); MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); From f86142951f0822c0bdf8676082c6148dc4e02158 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Wed, 2 Sep 2020 18:06:14 +0000 Subject: [PATCH 2/8] comments Signed-off-by: Matt Klein --- .../common/thread_local/thread_local_impl.cc | 156 ++++++++++-------- .../common/thread_local/thread_local_impl.h | 32 ++-- .../grpc/http_grpc_access_log_impl.cc | 2 +- .../grpc/http_grpc_access_log_impl.h | 2 +- .../thread_local/thread_local_impl_test.cc | 42 +++++ 5 files changed, 152 insertions(+), 82 deletions(-) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index e4356fb1d117..19745b1c470b 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -26,107 +26,86 @@ SlotPtr InstanceImpl::allocateSlot() { ASSERT(state_ != State::Shutdown); if (free_slot_indexes_.empty()) { - SlotImplPtr slot(new SlotImpl(*this, slots_.size())); + SlotPtr slot(new SlotImpl(*this, slots_.size())); slots_.push_back(slot.get()); return slot; } - const uint32_t idx = free_slot_indexes_.front(); + const uint64_t idx = free_slot_indexes_.front(); free_slot_indexes_.pop_front(); ASSERT(idx < slots_.size()); - SlotImplPtr slot(new SlotImpl(*this, idx)); + SlotPtr slot(new SlotImpl(*this, idx)); slots_[idx] = slot.get(); return slot; } -InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint64_t index) - : parent_(parent), index_(index), still_alive_guard_(std::make_shared()), - active_callback_guard_( - new bool, - [&parent, still_alive_guard = std::weak_ptr(still_alive_guard_), this](bool* b) { - // If TLS is in the running state, signal that this slot is ready to be destroyed. This - // may happen on a worker when callbacks are run. We do not do this check during: - // a) initializing: there are error flows during initialization in which it is too - // complicated to destroy all pending callbacks before the slot is destroyed. Given - // that no workers are running there is nothing to synchronize anyway. - // b) shutting down: no slots will be destroyed between the time shutdown starts and all - // workers exit, so there is nothing to synchronize once all workers have exited. - // Additionally, there are initialization failure cases in which we move directly - // from initializing to shutting down, so for similar reasons to (a) we don't - // synchronize. - // c) when we can't lock the still alive guard. It's only possible for that to happen - // when a slot was destroyed before we entered the running state (during xDS init). - // This is guaranteed by the blocking in slot destruction. Thus, no further - // synchronization is needed. - if (parent.state_ == State::Running && still_alive_guard.lock()) { - absl::MutexLock lock(&shutdown_mutex_); - ready_to_destroy_ = true; - } - - delete b; - }) {} - -InstanceImpl::SlotImpl::~SlotImpl() { - // Reset active_callback_guard_ so the only active references will be held by pending callbacks. - active_callback_guard_.reset(); - // If running only, synchronize slot removal with all callbacks being drained from workers. - // See the constructor for why we only do this in the running state. - if (parent_.state_ == State::Running) { - absl::MutexLock lock(&shutdown_mutex_); - shutdown_mutex_.Await(absl::Condition(&ready_to_destroy_)); - } +/*bool InstanceImpl::SlotImpl::currentThreadRegistered() { + return thread_local_data_.data_.size() > index_; +} - parent_.removeSlot(*this); +void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { + parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }); +} + +void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { + parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); } +ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { + ASSERT(currentThreadRegistered()); + return thread_local_data_.data_[index_]; +}*/ + +InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint64_t index) + : parent_(parent), index_(index), ref_count_(/*not used.*/ nullptr, + [index, &parent = parent_](uint32_t* /* not used */) { + // On destruction, post a cleanup callback on main thread, this could happen on + // any thread. + parent.scheduleCleanup(index); + }), + still_alive_guard_(std::make_shared(true)) {} + Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { // Capture active_callback_guard_ so that we can synchronize slot destruction. See ~SlotImpl(). // Also capture still_alive_guard_ so we can verify the slot is still alive as it might have - // been destroyed pre-running state. See SlotImpl(). - return [active_callback_guard = active_callback_guard_, - still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { + // been destroyed pre-running state. See SlotImpl(). fixfix + return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { if (still_alive_guard.lock()) { cb(); } }; } -bool InstanceImpl::SlotImpl::currentThreadRegistered() { - return thread_local_data_.data_.size() > index_; +ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { + ASSERT(currentThreadRegistered()); + return thread_local_data_.data_[index_]; +} + +void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { + runOnAllThreads([cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { - runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }); + runOnAllThreads([cb]() { setThreadLocal(index_, cb(get())); }); } -void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); +bool InstanceImpl::SlotImpl::currentThreadRegistered() { + return slot_->currentThreadRegistered(); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { - parent_.runOnAllThreads(wrapCallback(cb)); + // Use ref_count_ to bookkeep how many on-the-fly callback are out there. + slot_->runOnAllThreads(wrapCallback([cb, ref_count = this->ref_count_]() { cb(); })); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - parent_.runOnAllThreads(wrapCallback(cb), main_callback); + // Use ref_count_ to bookkeep how many on-the-fly callback are out there. + slot_->runOnAllThreads( + wrapCallback([cb, main_callback, ref_count = this->ref_count_]() { cb(); }), main_callback); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { - ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); - ASSERT(parent_.state_ != State::Shutdown); - - for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { - const uint32_t index = index_; - dispatcher.post(wrapCallback( - [index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); })); - } - - // Handle main thread. - setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); -} - -ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { - ASSERT(currentThreadRegistered()); - return thread_local_data_.data_[index_]; + slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher) + -> ThreadLocalObjectSharedPtr { return cb(dispatcher); }); } void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) { @@ -143,6 +122,38 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa } } +// Puts the slot into a deferred delete container, the slot will be destructed when its out-going +// callback reference count goes to 0. +void InstanceImpl::recycle(SlotImplPtr&& slot) { + ASSERT(std::this_thread::get_id() == main_thread_id_); + ASSERT(slot != nullptr); + auto* slot_addr = slot.get(); + deferred_deletes_.insert({slot_addr, std::move(slot)}); +} + +// Called by the SlotImpl ref_count destructor, the SlotImpl in the deferred deletes map can be +// destructed now. +void InstanceImpl::scheduleCleanup(SlotImpl* slot) { + if (state_ == State::Shutdown) { + // If server is shutting down, do nothing here. + // The destruction of SlotImpl has already transferred the SlotImpl to the deferred_deletes_ + // queue. No matter if this method is called from a Worker thread, the SlotImpl will be + // destructed on main thread when InstanceImpl destructs. + return; + } + if (std::this_thread::get_id() == main_thread_id_) { + // If called from main thread, save a callback. + ASSERT(deferred_deletes_.contains(slot)); + deferred_deletes_.erase(slot); + return; + } + main_thread_dispatcher_->post([slot, this]() { + ASSERT(deferred_deletes_.contains(slot)); + // The slot is guaranteed to be put into the deferred_deletes_ map by SlotImpl destructor. + deferred_deletes_.erase(slot); + }); +} + void InstanceImpl::removeSlot(SlotImpl& slot) { ASSERT(std::this_thread::get_id() == main_thread_id_); @@ -201,6 +212,19 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c } } +void InstanceImpl::SlotImpl::set(InitializeCb cb) { + ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); + ASSERT(parent_.state_ != State::Shutdown); + + for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { + const uint32_t index = index_; + dispatcher.post([index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }); + } + + // Handle main thread. + setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); +} + void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) { if (thread_local_data_.data_.size() <= index) { thread_local_data_.data_.resize(index + 1); diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index bdfe61f1187b..80ac320682fb 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -11,7 +11,7 @@ #include "common/common/logger.h" #include "common/common/non_copyable.h" -#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" namespace Envoy { namespace ThreadLocal { @@ -39,49 +39,53 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub Shutdown // Worker threads are about to shut down. }; + // A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue + // (detaches it). fixfix struct SlotImpl : public Slot { SlotImpl(InstanceImpl& parent, uint64_t index); - ~SlotImpl() override; + ~SlotImpl() override { parent_.recycle(index_); } Event::PostCb wrapCallback(Event::PostCb cb); // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; - bool currentThreadRegistered() override; void runOnAllThreads(const UpdateCb& cb) override; void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override; + bool currentThreadRegistered() override; void runOnAllThreads(Event::PostCb cb) override; void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override; void set(InitializeCb cb) override; InstanceImpl& parent_; const uint64_t index_; + std::shared_ptr ref_count_; // The following is used to safely verify via weak_ptr that this slot is still alive. std::shared_ptr still_alive_guard_; - // The following is used to reference count active callbacks. When TLS is in the running - // state destruction will block until all callbacks have been flushed. - std::shared_ptr active_callback_guard_; - // The following are used to synchronize blocking for flushing while in the running state. - bool ready_to_destroy_ ABSL_GUARDED_BY(shutdown_mutex_); - absl::Mutex shutdown_mutex_; }; - using SlotImplPtr = std::unique_ptr; - struct ThreadLocalData { Event::Dispatcher* dispatcher_{}; std::vector data_; }; - void removeSlot(SlotImpl& slot); + void recycle(uint64_t slot); + // Cleanup the deferred deletes queue. + void scheduleCleanup(uint64_t slot); + + void removeSlot(uint64_t slot); void runOnAllThreads(Event::PostCb cb); void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object); static thread_local ThreadLocalData thread_local_data_; - std::vector slots_; + // A indexed container for Slots that has to be deferred to delete due to out-going callbacks + // pointing to the Slot. To let the ref_count_ deleter find the SlotImpl by address, the container + // is defined as a map of SlotImpl address to the unique_ptr. + absl::flat_hash_set deferred_deletes_; + + std::vector slots_; // A list of index of freed slots. - std::list free_slot_indexes_; + std::list free_slot_indexes_; std::list> registered_threads_; std::thread::id main_thread_id_; diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc index 9b3ec8bc1f63..92bc5e38ee73 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc @@ -29,7 +29,7 @@ HttpGrpcAccessLog::HttpGrpcAccessLog( ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache, Stats::Scope& scope) : Common::ImplBase(std::move(filter)), scope_(scope), config_(std::move(config)), - access_logger_cache_(std::move(access_logger_cache)), tls_slot_(tls.allocateSlot()) { + tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) { for (const auto& header : config_.additional_request_headers_to_log()) { request_headers_to_log_.emplace_back(header); } diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h index a1b7d99a3e54..fcae58bd5f10 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h @@ -51,12 +51,12 @@ class HttpGrpcAccessLog : public Common::ImplBase { Stats::Scope& scope_; const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config_; + const ThreadLocal::SlotPtr tls_slot_; const GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache_; std::vector request_headers_to_log_; std::vector response_headers_to_log_; std::vector response_trailers_to_log_; std::vector filter_states_to_log_; - const ThreadLocal::SlotPtr tls_slot_; // Destruct first to flush callbacks. }; using HttpGrpcAccessLogPtr = std::unique_ptr; diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 5d7314e49e89..6747fa2eae99 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -45,6 +45,7 @@ class ThreadLocalInstanceImplTest : public testing::Test { object.reset(); return object_ref; } + int deferredDeletesMapSize() { return tls_.deferred_deletes_.size(); } int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); } InstanceImpl tls_; @@ -59,6 +60,7 @@ TEST_F(ThreadLocalInstanceImplTest, All) { EXPECT_CALL(thread_dispatcher_, post(_)); SlotPtr slot1 = tls_.allocateSlot(); slot1.reset(); + EXPECT_EQ(deferredDeletesMapSize(), 0); EXPECT_EQ(freeSlotIndexesListSize(), 1); // Create a new slot which should take the place of the old slot. ReturnPointee() is used to @@ -84,12 +86,52 @@ TEST_F(ThreadLocalInstanceImplTest, All) { slot3.reset(); slot4.reset(); EXPECT_EQ(freeSlotIndexesListSize(), 0); + EXPECT_EQ(deferredDeletesMapSize(), 2); EXPECT_CALL(object_ref4, onDestroy()); EXPECT_CALL(object_ref3, onDestroy()); tls_.shutdownThread(); } +TEST_F(ThreadLocalInstanceImplTest, DeferredRecycle) { + InSequence s; + + // Free a slot without ever calling set. + EXPECT_CALL(thread_dispatcher_, post(_)); + SlotPtr slot1 = tls_.allocateSlot(); + slot1.reset(); + // Slot destructed directly, as there is no out-going callbacks. + EXPECT_EQ(deferredDeletesMapSize(), 0); + EXPECT_EQ(freeSlotIndexesListSize(), 1); + + // Allocate a slot and set value, hold the posted callback and the slot will only be returned + // after the held callback is destructed. + { + SlotPtr slot2 = tls_.allocateSlot(); + EXPECT_EQ(freeSlotIndexesListSize(), 0); + { + Event::PostCb holder; + EXPECT_CALL(thread_dispatcher_, post(_)).WillOnce(Invoke([&](Event::PostCb cb) { + // Holds the posted callback. + holder = cb; + })); + slot2->set( + [](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return nullptr; }); + slot2.reset(); + // Not released yet, as holder has a copy of the ref_count_. + EXPECT_EQ(freeSlotIndexesListSize(), 0); + EXPECT_EQ(deferredDeletesMapSize(), 1); + // This post is called when the holder dies. + EXPECT_CALL(thread_dispatcher_, post(_)); + } + // Slot is deleted now that there holder destructs. + EXPECT_EQ(deferredDeletesMapSize(), 0); + EXPECT_EQ(freeSlotIndexesListSize(), 1); + } + + tls_.shutdownGlobalThreading(); +} + // Test that the config passed into the update callback is the previous version stored in the slot. TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) { InSequence s; From bb1e4283ab0a083d123021e324e951b75b9b701d Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Sat, 26 Sep 2020 23:47:21 +0000 Subject: [PATCH 3/8] fix Signed-off-by: Matt Klein --- include/envoy/thread_local/thread_local.h | 6 - .../common/thread_local/thread_local_impl.cc | 146 +++++++----------- .../common/thread_local/thread_local_impl.h | 34 ++-- source/server/server.cc | 1 - .../extensions/filters/common/lua/lua_test.cc | 7 +- test/mocks/thread_local/mocks.h | 1 - 6 files changed, 80 insertions(+), 115 deletions(-) diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 63ae8177b91e..683617634a20 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -121,12 +121,6 @@ class Instance : public SlotAllocator { */ virtual void registerThread(Event::Dispatcher& dispatcher, bool main_thread) PURE; - /** - * This should be called by the main thread before any worker threads start to run. This will - * ensure that threaded slot updates are properly synchronized with slot removal. - */ - virtual void startGlobalThreading() PURE; - /** * This should be called by the main thread before any worker threads start to exit. This will * block TLS removal during slot destruction, given that worker threads are about to call diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index 19745b1c470b..2dda90aeb69c 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -17,13 +17,13 @@ thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_; InstanceImpl::~InstanceImpl() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ == State::Shutdown); + ASSERT(shutdown_); thread_local_data_.data_.clear(); } SlotPtr InstanceImpl::allocateSlot() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ != State::Shutdown); + ASSERT(!shutdown_); if (free_slot_indexes_.empty()) { SlotPtr slot(new SlotImpl(*this, slots_.size())); @@ -38,25 +38,9 @@ SlotPtr InstanceImpl::allocateSlot() { return slot; } -/*bool InstanceImpl::SlotImpl::currentThreadRegistered() { - return thread_local_data_.data_.size() > index_; -} - -void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { - parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }); -} - -void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); -} - -ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { - ASSERT(currentThreadRegistered()); - return thread_local_data_.data_[index_]; -}*/ - -InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint64_t index) - : parent_(parent), index_(index), ref_count_(/*not used.*/ nullptr, +InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) + : parent_(parent), index_(index), + ref_count_(/*not used.*/ nullptr, [index, &parent = parent_](uint32_t* /* not used */) { // On destruction, post a cleanup callback on main thread, this could happen on // any thread. @@ -65,52 +49,65 @@ InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint64_t index) still_alive_guard_(std::make_shared(true)) {} Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { - // Capture active_callback_guard_ so that we can synchronize slot destruction. See ~SlotImpl(). - // Also capture still_alive_guard_ so we can verify the slot is still alive as it might have - // been destroyed pre-running state. See SlotImpl(). fixfix - return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { + // See the header file and comments for still_alive_guard_ and ref_count_ for the purpose + // of these captures. + return [still_alive_guard = std::weak_ptr(still_alive_guard_), ref_count = ref_count_, + cb] { if (still_alive_guard.lock()) { cb(); } }; } -ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { - ASSERT(currentThreadRegistered()); - return thread_local_data_.data_[index_]; +bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) { + return thread_local_data_.data_.size() > index; } -void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - runOnAllThreads([cb]() { setThreadLocal(index_, cb(get())); }, complete_cb); +bool InstanceImpl::SlotImpl::currentThreadRegistered() { + return currentThreadRegisteredWorker(index_); } -void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { - runOnAllThreads([cb]() { setThreadLocal(index_, cb(get())); }); +ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) { + ASSERT(currentThreadRegisteredWorker(index)); + return thread_local_data_.data_[index]; } -bool InstanceImpl::SlotImpl::currentThreadRegistered() { - return slot_->currentThreadRegistered(); +ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); } + +void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { + runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }, + complete_cb); +} + +void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { + runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { - // Use ref_count_ to bookkeep how many on-the-fly callback are out there. - slot_->runOnAllThreads(wrapCallback([cb, ref_count = this->ref_count_]() { cb(); })); + parent_.runOnAllThreads(wrapCallback([cb]() { cb(); })); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - // Use ref_count_ to bookkeep how many on-the-fly callback are out there. - slot_->runOnAllThreads( + parent_.runOnAllThreads( wrapCallback([cb, main_callback, ref_count = this->ref_count_]() { cb(); }), main_callback); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { - slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher) - -> ThreadLocalObjectSharedPtr { return cb(dispatcher); }); + ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); + ASSERT(!parent_.shutdown_); + + for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { + dispatcher.post(wrapCallback( + [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); })); + } + + // Handle main thread. + setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); } void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ != State::Shutdown); + ASSERT(!shutdown_); if (main_thread) { main_thread_dispatcher_ = &dispatcher; @@ -124,17 +121,15 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa // Puts the slot into a deferred delete container, the slot will be destructed when its out-going // callback reference count goes to 0. -void InstanceImpl::recycle(SlotImplPtr&& slot) { +void InstanceImpl::recycle(uint32_t slot) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(slot != nullptr); - auto* slot_addr = slot.get(); - deferred_deletes_.insert({slot_addr, std::move(slot)}); + deferred_deletes_.insert(slot); } // Called by the SlotImpl ref_count destructor, the SlotImpl in the deferred deletes map can be // destructed now. -void InstanceImpl::scheduleCleanup(SlotImpl* slot) { - if (state_ == State::Shutdown) { +void InstanceImpl::scheduleCleanup(uint32_t slot) { + if (shutdown_) { // If server is shutting down, do nothing here. // The destruction of SlotImpl has already transferred the SlotImpl to the deferred_deletes_ // queue. No matter if this method is called from a Worker thread, the SlotImpl will be @@ -143,47 +138,44 @@ void InstanceImpl::scheduleCleanup(SlotImpl* slot) { } if (std::this_thread::get_id() == main_thread_id_) { // If called from main thread, save a callback. - ASSERT(deferred_deletes_.contains(slot)); - deferred_deletes_.erase(slot); + removeSlot(slot); return; } - main_thread_dispatcher_->post([slot, this]() { - ASSERT(deferred_deletes_.contains(slot)); - // The slot is guaranteed to be put into the deferred_deletes_ map by SlotImpl destructor. - deferred_deletes_.erase(slot); - }); + main_thread_dispatcher_->post([slot, this]() { removeSlot(slot); }); } -void InstanceImpl::removeSlot(SlotImpl& slot) { +void InstanceImpl::removeSlot(uint32_t slot) { ASSERT(std::this_thread::get_id() == main_thread_id_); + ASSERT(deferred_deletes_.contains(slot)); + // The slot is guaranteed to be put into the deferred_deletes_ map by SlotImpl destructor. + deferred_deletes_.erase(slot); // When shutting down, we do not post slot removals to other threads. This is because the other // threads have already shut down and the dispatcher is no longer alive. There is also no reason // to do removal, because no allocations happen during shutdown and shutdownThread() will clean // things up on the other thread. - if (state_ == State::Shutdown) { + if (shutdown_) { return; } - const uint64_t index = slot.index_; - slots_[index] = nullptr; - ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), index) == + slots_[slot] = nullptr; + ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) == free_slot_indexes_.end(), fmt::format("slot index {} already in free slot set!", index)); - free_slot_indexes_.push_back(index); - runOnAllThreads([index]() -> void { + free_slot_indexes_.push_back(slot); + runOnAllThreads([slot]() -> void { // This runs on each thread and clears the slot, making it available for a new allocations. // This is safe even if a new allocation comes in, because everything happens with post() and // will be sequenced after this removal. - if (index < thread_local_data_.data_.size()) { - thread_local_data_.data_[index] = nullptr; + if (slot < thread_local_data_.data_.size()) { + thread_local_data_.data_[slot] = nullptr; } }); } void InstanceImpl::runOnAllThreads(Event::PostCb cb) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ != State::Shutdown); + ASSERT(!shutdown_); for (Event::Dispatcher& dispatcher : registered_threads_) { dispatcher.post(cb); @@ -195,7 +187,7 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) { void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ != State::Shutdown); + ASSERT(!shutdown_); // Handle main thread first so that when the last worker thread wins, we could just call the // all_threads_complete_cb method. Parallelism of main thread execution is being traded off // for programming simplicity here. @@ -212,19 +204,6 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c } } -void InstanceImpl::SlotImpl::set(InitializeCb cb) { - ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); - ASSERT(parent_.state_ != State::Shutdown); - - for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { - const uint32_t index = index_; - dispatcher.post([index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }); - } - - // Handle main thread. - setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); -} - void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) { if (thread_local_data_.data_.size() <= index) { thread_local_data_.data_.resize(index + 1); @@ -233,19 +212,14 @@ void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr obj thread_local_data_.data_[index] = object; } -void InstanceImpl::startGlobalThreading() { - ASSERT(state_ == State::Initializing); - state_ = State::Running; -} - void InstanceImpl::shutdownGlobalThreading() { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(state_ != State::Shutdown); - state_ = State::Shutdown; + ASSERT(!shutdown_); + shutdown_ = true; } void InstanceImpl::shutdownThread() { - ASSERT(state_ == State::Shutdown); + ASSERT(shutdown_); // Destruction of slots is done in *reverse* order. This is so that filters and higher layer // things that are built on top of the cluster manager, stats, etc. will be destroyed before diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 80ac320682fb..445cc173bc2f 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -27,24 +27,20 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub // ThreadLocal::Instance SlotPtr allocateSlot() override; void registerThread(Event::Dispatcher& dispatcher, bool main_thread) override; - void startGlobalThreading() override; void shutdownGlobalThreading() override; void shutdownThread() override; Event::Dispatcher& dispatcher() override; private: - enum class State { - Initializing, // TLS is initializing and no worker threads are running yet. - Running, // TLS is running with worker threads. - Shutdown // Worker threads are about to shut down. - }; - - // A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue - // (detaches it). fixfix + // On destruction returns the slot index to the deferred delete queue (detaches it). This allows + // a slot to be destructed on the main thread while controlling the lifetime of the underlying + // slot as callbacks drain from workers. struct SlotImpl : public Slot { - SlotImpl(InstanceImpl& parent, uint64_t index); + SlotImpl(InstanceImpl& parent, uint32_t index); ~SlotImpl() override { parent_.recycle(index_); } Event::PostCb wrapCallback(Event::PostCb cb); + static bool currentThreadRegisteredWorker(uint32_t index); + static ThreadLocalObjectSharedPtr getWorker(uint32_t index); // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; @@ -56,9 +52,13 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub void set(InitializeCb cb) override; InstanceImpl& parent_; - const uint64_t index_; + const uint32_t index_; + // The following is used to make sure that the slot index is not recycled until there are no + // more pending callbacks against the slot. This prevents crashes for well behaved std::shared_ptr ref_count_; - // The following is used to safely verify via weak_ptr that this slot is still alive. + // The following is used to safely verify via weak_ptr that this slot is still alive. This + // does not prevent all races if a callback does not capture appropriately, but it does fix + // the common case of a slot destroyed immediately before anything is posted to a worker. std::shared_ptr still_alive_guard_; }; @@ -67,11 +67,11 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub std::vector data_; }; - void recycle(uint64_t slot); + void recycle(uint32_t slot); // Cleanup the deferred deletes queue. - void scheduleCleanup(uint64_t slot); + void scheduleCleanup(uint32_t slot); - void removeSlot(uint64_t slot); + void removeSlot(uint32_t slot); void runOnAllThreads(Event::PostCb cb); void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object); @@ -85,12 +85,12 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub std::vector slots_; // A list of index of freed slots. - std::list free_slot_indexes_; + std::list free_slot_indexes_; std::list> registered_threads_; std::thread::id main_thread_id_; Event::Dispatcher* main_thread_dispatcher_{}; - std::atomic state_{State::Initializing}; + std::atomic shutdown_{}; // Test only. friend class ThreadLocalInstanceImplTest; diff --git a/source/server/server.cc b/source/server/server.cc index 6e5add33f3e4..89adc58ec53b 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -565,7 +565,6 @@ void InstanceImpl::onRuntimeReady() { } void InstanceImpl::startWorkers() { - thread_local_.startGlobalThreading(); listener_manager_->startWorkers(*worker_guard_dog_); initialization_timer_->complete(); // Update server stats as soon as initialization is done. diff --git a/test/extensions/filters/common/lua/lua_test.cc b/test/extensions/filters/common/lua/lua_test.cc index 0823bee4d852..27c9e35f8cfe 100644 --- a/test/extensions/filters/common/lua/lua_test.cc +++ b/test/extensions/filters/common/lua/lua_test.cc @@ -192,7 +192,9 @@ TEST_F(ThreadSafeTest, StateDestructedBeforeWorkerRun) { state_->registerType(); main_dispatcher_->run(Event::Dispatcher::RunType::Block); - tls_.startGlobalThreading(); + + // Destroy state_. + state_.reset(nullptr); // Start a new worker thread to execute the callback functions in the worker dispatcher. Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread([this]() { @@ -200,9 +202,6 @@ TEST_F(ThreadSafeTest, StateDestructedBeforeWorkerRun) { // Verify we have the expected dispatcher for the new worker thread. EXPECT_EQ(worker_dispatcher_.get(), &tls_.dispatcher()); }); - - // Destroy state_. This will block until the worker callback runs or is destroyed. - state_.reset(nullptr); thread->join(); tls_.shutdownGlobalThreading(); diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index 9e8162ae1a36..9bbd26a64465 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -22,7 +22,6 @@ class MockInstance : public Instance { // Server::ThreadLocal MOCK_METHOD(SlotPtr, allocateSlot, ()); MOCK_METHOD(void, registerThread, (Event::Dispatcher & dispatcher, bool main_thread)); - MOCK_METHOD(void, startGlobalThreading, ()); MOCK_METHOD(void, shutdownGlobalThreading, ()); MOCK_METHOD(void, shutdownThread, ()); MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); From 6bc5a3220d2252fc49d1094f6a778f27c433f46b Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Sun, 27 Sep 2020 01:35:54 +0000 Subject: [PATCH 4/8] remove ref_count_ Signed-off-by: Matt Klein --- .../common/thread_local/thread_local_impl.cc | 46 ++----------------- .../common/thread_local/thread_local_impl.h | 19 ++------ .../thread_local/thread_local_impl_test.cc | 42 ----------------- 3 files changed, 8 insertions(+), 99 deletions(-) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index 2dda90aeb69c..b6b9514caa17 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -39,20 +39,12 @@ SlotPtr InstanceImpl::allocateSlot() { } InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) - : parent_(parent), index_(index), - ref_count_(/*not used.*/ nullptr, - [index, &parent = parent_](uint32_t* /* not used */) { - // On destruction, post a cleanup callback on main thread, this could happen on - // any thread. - parent.scheduleCleanup(index); - }), - still_alive_guard_(std::make_shared(true)) {} + : parent_(parent), index_(index), still_alive_guard_(std::make_shared(true)) {} Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { // See the header file and comments for still_alive_guard_ and ref_count_ for the purpose // of these captures. - return [still_alive_guard = std::weak_ptr(still_alive_guard_), ref_count = ref_count_, - cb] { + return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { if (still_alive_guard.lock()) { cb(); } @@ -88,8 +80,7 @@ void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - parent_.runOnAllThreads( - wrapCallback([cb, main_callback, ref_count = this->ref_count_]() { cb(); }), main_callback); + parent_.runOnAllThreads(wrapCallback([cb, main_callback]() { cb(); }), main_callback); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { @@ -119,36 +110,8 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa } } -// Puts the slot into a deferred delete container, the slot will be destructed when its out-going -// callback reference count goes to 0. -void InstanceImpl::recycle(uint32_t slot) { - ASSERT(std::this_thread::get_id() == main_thread_id_); - deferred_deletes_.insert(slot); -} - -// Called by the SlotImpl ref_count destructor, the SlotImpl in the deferred deletes map can be -// destructed now. -void InstanceImpl::scheduleCleanup(uint32_t slot) { - if (shutdown_) { - // If server is shutting down, do nothing here. - // The destruction of SlotImpl has already transferred the SlotImpl to the deferred_deletes_ - // queue. No matter if this method is called from a Worker thread, the SlotImpl will be - // destructed on main thread when InstanceImpl destructs. - return; - } - if (std::this_thread::get_id() == main_thread_id_) { - // If called from main thread, save a callback. - removeSlot(slot); - return; - } - main_thread_dispatcher_->post([slot, this]() { removeSlot(slot); }); -} - void InstanceImpl::removeSlot(uint32_t slot) { ASSERT(std::this_thread::get_id() == main_thread_id_); - ASSERT(deferred_deletes_.contains(slot)); - // The slot is guaranteed to be put into the deferred_deletes_ map by SlotImpl destructor. - deferred_deletes_.erase(slot); // When shutting down, we do not post slot removals to other threads. This is because the other // threads have already shut down and the dispatcher is no longer alive. There is also no reason @@ -166,7 +129,8 @@ void InstanceImpl::removeSlot(uint32_t slot) { runOnAllThreads([slot]() -> void { // This runs on each thread and clears the slot, making it available for a new allocations. // This is safe even if a new allocation comes in, because everything happens with post() and - // will be sequenced after this removal. + // will be sequenced after this removal. It is also safe if there are callbacks pending on + // other threads because they will run first. if (slot < thread_local_data_.data_.size()) { thread_local_data_.data_[slot] = nullptr; } diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 445cc173bc2f..52fcbd58ec14 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -11,8 +11,6 @@ #include "common/common/logger.h" #include "common/common/non_copyable.h" -#include "absl/container/flat_hash_set.h" - namespace Envoy { namespace ThreadLocal { @@ -37,7 +35,7 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub // slot as callbacks drain from workers. struct SlotImpl : public Slot { SlotImpl(InstanceImpl& parent, uint32_t index); - ~SlotImpl() override { parent_.recycle(index_); } + ~SlotImpl() override { parent_.removeSlot(index_); } Event::PostCb wrapCallback(Event::PostCb cb); static bool currentThreadRegisteredWorker(uint32_t index); static ThreadLocalObjectSharedPtr getWorker(uint32_t index); @@ -53,12 +51,11 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub InstanceImpl& parent_; const uint32_t index_; - // The following is used to make sure that the slot index is not recycled until there are no - // more pending callbacks against the slot. This prevents crashes for well behaved - std::shared_ptr ref_count_; // The following is used to safely verify via weak_ptr that this slot is still alive. This // does not prevent all races if a callback does not capture appropriately, but it does fix // the common case of a slot destroyed immediately before anything is posted to a worker. + // TODO(mattklein123): Add clang-tidy analysis rule to check that "this" is not captured by + // a TLS function call. std::shared_ptr still_alive_guard_; }; @@ -67,10 +64,6 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub std::vector data_; }; - void recycle(uint32_t slot); - // Cleanup the deferred deletes queue. - void scheduleCleanup(uint32_t slot); - void removeSlot(uint32_t slot); void runOnAllThreads(Event::PostCb cb); void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); @@ -78,15 +71,9 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub static thread_local ThreadLocalData thread_local_data_; - // A indexed container for Slots that has to be deferred to delete due to out-going callbacks - // pointing to the Slot. To let the ref_count_ deleter find the SlotImpl by address, the container - // is defined as a map of SlotImpl address to the unique_ptr. - absl::flat_hash_set deferred_deletes_; - std::vector slots_; // A list of index of freed slots. std::list free_slot_indexes_; - std::list> registered_threads_; std::thread::id main_thread_id_; Event::Dispatcher* main_thread_dispatcher_{}; diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 6747fa2eae99..5d7314e49e89 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -45,7 +45,6 @@ class ThreadLocalInstanceImplTest : public testing::Test { object.reset(); return object_ref; } - int deferredDeletesMapSize() { return tls_.deferred_deletes_.size(); } int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); } InstanceImpl tls_; @@ -60,7 +59,6 @@ TEST_F(ThreadLocalInstanceImplTest, All) { EXPECT_CALL(thread_dispatcher_, post(_)); SlotPtr slot1 = tls_.allocateSlot(); slot1.reset(); - EXPECT_EQ(deferredDeletesMapSize(), 0); EXPECT_EQ(freeSlotIndexesListSize(), 1); // Create a new slot which should take the place of the old slot. ReturnPointee() is used to @@ -86,52 +84,12 @@ TEST_F(ThreadLocalInstanceImplTest, All) { slot3.reset(); slot4.reset(); EXPECT_EQ(freeSlotIndexesListSize(), 0); - EXPECT_EQ(deferredDeletesMapSize(), 2); EXPECT_CALL(object_ref4, onDestroy()); EXPECT_CALL(object_ref3, onDestroy()); tls_.shutdownThread(); } -TEST_F(ThreadLocalInstanceImplTest, DeferredRecycle) { - InSequence s; - - // Free a slot without ever calling set. - EXPECT_CALL(thread_dispatcher_, post(_)); - SlotPtr slot1 = tls_.allocateSlot(); - slot1.reset(); - // Slot destructed directly, as there is no out-going callbacks. - EXPECT_EQ(deferredDeletesMapSize(), 0); - EXPECT_EQ(freeSlotIndexesListSize(), 1); - - // Allocate a slot and set value, hold the posted callback and the slot will only be returned - // after the held callback is destructed. - { - SlotPtr slot2 = tls_.allocateSlot(); - EXPECT_EQ(freeSlotIndexesListSize(), 0); - { - Event::PostCb holder; - EXPECT_CALL(thread_dispatcher_, post(_)).WillOnce(Invoke([&](Event::PostCb cb) { - // Holds the posted callback. - holder = cb; - })); - slot2->set( - [](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return nullptr; }); - slot2.reset(); - // Not released yet, as holder has a copy of the ref_count_. - EXPECT_EQ(freeSlotIndexesListSize(), 0); - EXPECT_EQ(deferredDeletesMapSize(), 1); - // This post is called when the holder dies. - EXPECT_CALL(thread_dispatcher_, post(_)); - } - // Slot is deleted now that there holder destructs. - EXPECT_EQ(deferredDeletesMapSize(), 0); - EXPECT_EQ(freeSlotIndexesListSize(), 1); - } - - tls_.shutdownGlobalThreading(); -} - // Test that the config passed into the update callback is the previous version stored in the slot. TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) { InSequence s; From 8fc966a7778bc02de72d28a2cdfde1bde939928a Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Sun, 27 Sep 2020 02:28:03 +0000 Subject: [PATCH 5/8] fix Signed-off-by: Matt Klein --- source/common/thread_local/thread_local_impl.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index b6b9514caa17..8cc0be2401d1 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -42,8 +42,7 @@ InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) : parent_(parent), index_(index), still_alive_guard_(std::make_shared(true)) {} Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { - // See the header file and comments for still_alive_guard_ and ref_count_ for the purpose - // of these captures. + // See the header file comments for still_alive_guard_ for the purpose of this capture. return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { if (still_alive_guard.lock()) { cb(); From 6c6673c89bce98361c7b130f5d6c8cc29c00422a Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 28 Sep 2020 02:39:15 +0000 Subject: [PATCH 6/8] fix Signed-off-by: Matt Klein --- source/common/thread_local/thread_local_impl.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index 8cc0be2401d1..76259eb312be 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -30,7 +30,7 @@ SlotPtr InstanceImpl::allocateSlot() { slots_.push_back(slot.get()); return slot; } - const uint64_t idx = free_slot_indexes_.front(); + const uint32_t idx = free_slot_indexes_.front(); free_slot_indexes_.pop_front(); ASSERT(idx < slots_.size()); SlotPtr slot(new SlotImpl(*this, idx)); @@ -44,7 +44,7 @@ InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { // See the header file comments for still_alive_guard_ for the purpose of this capture. return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { - if (still_alive_guard.lock()) { + if (!still_alive_guard.expired()) { cb(); } }; @@ -75,11 +75,11 @@ void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { - parent_.runOnAllThreads(wrapCallback([cb]() { cb(); })); + parent_.runOnAllThreads(wrapCallback(cb)); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - parent_.runOnAllThreads(wrapCallback([cb, main_callback]() { cb(); }), main_callback); + parent_.runOnAllThreads(wrapCallback(cb), main_callback); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { @@ -123,7 +123,7 @@ void InstanceImpl::removeSlot(uint32_t slot) { slots_[slot] = nullptr; ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) == free_slot_indexes_.end(), - fmt::format("slot index {} already in free slot set!", index)); + fmt::format("slot index {} already in free slot set!", slot)); free_slot_indexes_.push_back(slot); runOnAllThreads([slot]() -> void { // This runs on each thread and clears the slot, making it available for a new allocations. From 73a641c5627a5af34ce1b2aca6dff58822b34470 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 28 Sep 2020 16:26:43 +0000 Subject: [PATCH 7/8] comments Signed-off-by: Matt Klein --- .../common/thread_local/thread_local_impl.cc | 18 +++-- .../common/thread_local/thread_local_impl.h | 11 ++- .../thread_local/thread_local_impl_test.cc | 79 +++++++++++++++++-- 3 files changed, 94 insertions(+), 14 deletions(-) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index 76259eb312be..ed8bf0283100 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -26,14 +26,14 @@ SlotPtr InstanceImpl::allocateSlot() { ASSERT(!shutdown_); if (free_slot_indexes_.empty()) { - SlotPtr slot(new SlotImpl(*this, slots_.size())); + SlotPtr slot = std::make_unique(*this, slots_.size()); slots_.push_back(slot.get()); return slot; } const uint32_t idx = free_slot_indexes_.front(); free_slot_indexes_.pop_front(); ASSERT(idx < slots_.size()); - SlotPtr slot(new SlotImpl(*this, idx)); + SlotPtr slot = std::make_unique(*this, idx); slots_[idx] = slot.get(); return slot; } @@ -41,8 +41,9 @@ SlotPtr InstanceImpl::allocateSlot() { InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) : parent_(parent), index_(index), still_alive_guard_(std::make_shared(true)) {} -Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) { - // See the header file comments for still_alive_guard_ for the purpose of this capture. +Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb&& cb) { + // See the header file comments for still_alive_guard_ for the purpose of this capture and the + // expired check below. return [still_alive_guard = std::weak_ptr(still_alive_guard_), cb] { if (!still_alive_guard.expired()) { cb(); @@ -66,20 +67,24 @@ ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) { ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { + // See the header file comments for still_alive_guard_ for why we capture index_. Note + // that wrapCallback() is performed by the other variant of runOnAllThreads(). runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }, complete_cb); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { + // See the header file comments for still_alive_guard_ for why we capture index_. Note + // that wrapCallback() is performed by the other variant of runOnAllThreads(). runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { - parent_.runOnAllThreads(wrapCallback(cb)); + parent_.runOnAllThreads(wrapCallback(std::move(cb))); } void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - parent_.runOnAllThreads(wrapCallback(cb), main_callback); + parent_.runOnAllThreads(wrapCallback(std::move(cb)), main_callback); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { @@ -87,6 +92,7 @@ void InstanceImpl::SlotImpl::set(InitializeCb cb) { ASSERT(!parent_.shutdown_); for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { + // See the header file comments for still_alive_guard_ for why we capture index_. dispatcher.post(wrapCallback( [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); })); } diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 52fcbd58ec14..f5c893109022 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -36,7 +36,7 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub struct SlotImpl : public Slot { SlotImpl(InstanceImpl& parent, uint32_t index); ~SlotImpl() override { parent_.removeSlot(index_); } - Event::PostCb wrapCallback(Event::PostCb cb); + Event::PostCb wrapCallback(Event::PostCb&& cb); static bool currentThreadRegisteredWorker(uint32_t index); static ThreadLocalObjectSharedPtr getWorker(uint32_t index); @@ -54,8 +54,15 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub // The following is used to safely verify via weak_ptr that this slot is still alive. This // does not prevent all races if a callback does not capture appropriately, but it does fix // the common case of a slot destroyed immediately before anything is posted to a worker. + // NOTE: The general safety model of a slot is that it is destroyed immediately on the main + // thread. This means that *all* captures must not reference the slot object directly. + // this is why index_ is captured manually in callbacks that require it. + // NOTE: When the slot is destroyed, the index is immediately recycled. This is safe because + // any new posts for a recycled index must come after any previous callbacks for the + // previous owner of the index. // TODO(mattklein123): Add clang-tidy analysis rule to check that "this" is not captured by - // a TLS function call. + // a TLS function call. This check will not prevent all bad captures, but it will at least + // make the programmer more aware of potential issues. std::shared_ptr still_alive_guard_; }; diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 5d7314e49e89..76745d23099b 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -90,6 +90,78 @@ TEST_F(ThreadLocalInstanceImplTest, All) { tls_.shutdownThread(); } +struct ThreadStatus { + uint64_t thread_local_calls_{0}; + bool all_threads_complete_ = false; +}; + +TEST_F(ThreadLocalInstanceImplTest, CallbackNotInvokedAfterDeletion) { + InSequence s; + + // Allocate a slot and invoke all callback variants. Hold all callbacks and destroy the slot. + // Make sure that recycling happens appropriately. + SlotPtr slot = tls_.allocateSlot(); + + std::list holder; + EXPECT_CALL(thread_dispatcher_, post(_)).Times(6).WillRepeatedly(Invoke([&](Event::PostCb cb) { + // Holds the posted callback. + holder.push_back(cb); + })); + + uint32_t total_callbacks = 0; + slot->set([&total_callbacks](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + // Callbacks happen on the main thread but not the workers, so track the total. + total_callbacks++; + return nullptr; + }); + slot->runOnAllThreads([&total_callbacks](ThreadLocal::ThreadLocalObjectSharedPtr) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + // Callbacks happen on the main thread but not the workers, so track the total. + total_callbacks++; + return nullptr; + }); + slot->runOnAllThreads([&total_callbacks]() { + // Callbacks happen on the main thread but not the workers, so track the total. + total_callbacks++; + }); + ThreadStatus thread_status; + slot->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, + [&thread_status]() -> void { + // Callbacks happen on the main thread but not the workers. + EXPECT_EQ(thread_status.thread_local_calls_, 1); + thread_status.all_threads_complete_ = true; + }); + EXPECT_FALSE(thread_status.all_threads_complete_); + ThreadStatus thread_status2; + slot->runOnAllThreads( + [&thread_status2]( + ThreadLocal::ThreadLocalObjectSharedPtr) -> ThreadLocal::ThreadLocalObjectSharedPtr { + ++thread_status2.thread_local_calls_; + return nullptr; + }, + [&thread_status2]() -> void { + // Callbacks happen on the main thread but not the workers. + EXPECT_EQ(thread_status2.thread_local_calls_, 1); + thread_status2.all_threads_complete_ = true; + }); + EXPECT_FALSE(thread_status2.all_threads_complete_); + + EXPECT_EQ(3, total_callbacks); + slot.reset(); + EXPECT_EQ(freeSlotIndexesListSize(), 1); + + EXPECT_CALL(main_dispatcher_, post(_)).Times(2); + while (!holder.empty()) { + holder.front()(); + holder.pop_front(); + } + EXPECT_EQ(3, total_callbacks); + EXPECT_TRUE(thread_status.all_threads_complete_); + EXPECT_TRUE(thread_status2.all_threads_complete_); + + tls_.shutdownGlobalThreading(); +} + // Test that the config passed into the update callback is the previous version stored in the slot. TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) { InSequence s; @@ -135,17 +207,12 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { EXPECT_CALL(main_dispatcher_, post(_)); // Ensure that the thread local call back and all_thread_complete call back are called. - struct { - uint64_t thread_local_calls_{0}; - bool all_threads_complete_ = false; - } thread_status; - + ThreadStatus thread_status; tlsptr->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, [&thread_status]() -> void { EXPECT_EQ(thread_status.thread_local_calls_, 2); thread_status.all_threads_complete_ = true; }); - EXPECT_TRUE(thread_status.all_threads_complete_); tls_.shutdownGlobalThreading(); From 80f94227b55d276b4a239ce6b05e28e1870d24fe Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 28 Sep 2020 20:17:30 +0000 Subject: [PATCH 8/8] fix apis Signed-off-by: Matt Klein --- include/envoy/thread_local/thread_local.h | 26 +++---- source/common/stats/thread_local_store.cc | 19 +++-- .../common/thread_local/thread_local_impl.cc | 22 ++---- .../common/thread_local/thread_local_impl.h | 2 - .../common/upstream/cluster_manager_impl.cc | 75 +++++++++++-------- source/common/upstream/cluster_manager_impl.h | 16 ++-- .../extensions/clusters/aggregate/cluster.cc | 6 +- .../dynamic_forward_proxy/dns_cache_impl.cc | 6 +- .../common/tap/extension_config_base.cc | 13 +++- source/server/overload_manager_impl.cc | 13 ++-- test/common/stats/thread_local_store_test.cc | 7 +- .../thread_local/thread_local_impl_test.cc | 52 ++++++------- test/mocks/thread_local/mocks.h | 4 - 13 files changed, 136 insertions(+), 125 deletions(-) diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 683617634a20..5c4374aff8ea 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -16,6 +16,14 @@ namespace ThreadLocal { class ThreadLocalObject { public: virtual ~ThreadLocalObject() = default; + + /** + * Return the object casted to a concrete type. See getTyped() below for comments on the casts. + */ + template T& asType() { + ASSERT(dynamic_cast(this) != nullptr); + return *static_cast(this); + } }; using ThreadLocalObjectSharedPtr = std::shared_ptr; @@ -54,27 +62,15 @@ class Slot { return *static_cast(get().get()); } - /** - * Run a callback on all registered threads. - * @param cb supplies the callback to run. - */ - virtual void runOnAllThreads(Event::PostCb cb) PURE; - - /** - * Run a callback on all registered threads with a barrier. A shutdown initiated during the - * running of the PostCBs may prevent all_threads_complete_cb from being called. - * @param cb supplies the callback to run on each thread. - * @param all_threads_complete_cb supplies the callback to run on main thread after cb has - * been run on all registered threads. - */ - virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE; - /** * Set thread local data on all threads previously registered via registerThread(). * @param initializeCb supplies the functor that will be called *on each thread*. The functor * returns the thread local object which is then stored. The storage is via * a shared_ptr. Thus, this is a flexible mechanism that can be used to share * the same data across all threads or to share different data on each thread. + * + * NOTE: The initialize callback is not supposed to capture the Slot, or its owner. As the owner + * may be destructed in main thread before the update_cb gets called in a worker thread. */ using InitializeCb = std::function; virtual void set(InitializeCb cb) PURE; diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 54d0c78eba9b..4bd4ec6a9d6a 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -206,11 +206,13 @@ void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { ASSERT(!merge_in_progress_); merge_in_progress_ = true; tls_->runOnAllThreads( - [this]() -> void { - for (const auto& id_hist : tls_->getTyped().tls_histogram_cache_) { + [](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + for (const auto& id_hist : object->asType().tls_histogram_cache_) { const TlsHistogramSharedPtr& tls_hist = id_hist.second; tls_hist->beginMerge(); } + return object; }, [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); }); } else { @@ -304,7 +306,11 @@ void ThreadLocalStoreImpl::clearScopeFromCaches(uint64_t scope_id, if (!shutting_down_) { // Perform a cache flush on all threads. tls_->runOnAllThreads( - [this, scope_id]() { tls_->getTyped().eraseScope(scope_id); }, + [scope_id](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().eraseScope(scope_id); + return object; + }, [central_cache]() { /* Holds onto central_cache until all tls caches are clear */ }); } } @@ -320,8 +326,11 @@ void ThreadLocalStoreImpl::clearHistogramFromCaches(uint64_t histogram_id) { // https://gist.github.com/jmarantz/838cb6de7e74c0970ea6b63eded0139a // contains a patch that will implement batching together to clear multiple // histograms. - tls_->runOnAllThreads( - [this, histogram_id]() { tls_->getTyped().eraseHistogram(histogram_id); }); + tls_->runOnAllThreads([histogram_id](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().eraseHistogram(histogram_id); + return object; + }); } } diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index ed8bf0283100..7ed9eeca7942 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -67,24 +67,16 @@ ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) { ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) { - // See the header file comments for still_alive_guard_ for why we capture index_. Note - // that wrapCallback() is performed by the other variant of runOnAllThreads(). - runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }, - complete_cb); + // See the header file comments for still_alive_guard_ for why we capture index_. + parent_.runOnAllThreads( + wrapCallback([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }), + complete_cb); } void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { - // See the header file comments for still_alive_guard_ for why we capture index_. Note - // that wrapCallback() is performed by the other variant of runOnAllThreads(). - runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); }); -} - -void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) { - parent_.runOnAllThreads(wrapCallback(std::move(cb))); -} - -void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { - parent_.runOnAllThreads(wrapCallback(std::move(cb)), main_callback); + // See the header file comments for still_alive_guard_ for why we capture index_. + parent_.runOnAllThreads( + wrapCallback([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); })); } void InstanceImpl::SlotImpl::set(InitializeCb cb) { diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index f5c893109022..2b83a2aebf47 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -45,8 +45,6 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub void runOnAllThreads(const UpdateCb& cb) override; void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override; bool currentThreadRegistered() override; - void runOnAllThreads(Event::PostCb cb) override; - void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override; void set(InitializeCb cb) override; InstanceImpl& parent_; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c5df7eb10f98..5f814b498028 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -648,10 +648,12 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl } void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) { - tls_->runOnAllThreads([this, new_cluster = cluster.cluster_->info(), - thread_aware_lb_factory = cluster.loadBalancerFactory()]() -> void { + tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(), + thread_aware_lb_factory = cluster.loadBalancerFactory()]( + ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { ThreadLocalClusterManagerImpl& cluster_manager = - tls_->getTyped(); + object->asType(); if (cluster_manager.thread_local_clusters_.count(new_cluster->name()) > 0) { ENVOY_LOG(debug, "updating TLS cluster {}", new_cluster->name()); @@ -665,6 +667,8 @@ void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) for (auto& cb : cluster_manager.update_callbacks_) { cb->onClusterAddOrUpdate(*thread_local_cluster); } + + return object; }); } @@ -678,9 +682,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { active_clusters_.erase(existing_active_cluster); ENVOY_LOG(info, "removing cluster {}", cluster_name); - tls_->runOnAllThreads([this, cluster_name]() -> void { + tls_->runOnAllThreads([cluster_name](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { ThreadLocalClusterManagerImpl& cluster_manager = - tls_->getTyped(); + object->asType(); ASSERT(cluster_manager.thread_local_clusters_.count(cluster_name) == 1); ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name); @@ -688,6 +693,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { cb->onClusterRemoval(cluster_name); } cluster_manager.thread_local_clusters_.erase(cluster_name); + return object; }); } @@ -902,9 +908,12 @@ ClusterManagerImpl::tcpConnPoolForCluster(const std::string& cluster, ResourcePr void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, const HostVector& hosts_removed) { - tls_->runOnAllThreads([this, name = cluster.info()->name(), hosts_removed]() { - ThreadLocalClusterManagerImpl::removeHosts(name, hosts_removed, *tls_); - }); + tls_->runOnAllThreads( + [name = cluster.info()->name(), hosts_removed](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().removeHosts(name, hosts_removed); + return object; + }); } void ClusterManagerImpl::postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority, @@ -912,19 +921,25 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(const Cluster& cluster, ui const HostVector& hosts_removed) { const auto& host_set = cluster.prioritySet().hostSetsPerPriority()[priority]; - tls_->runOnAllThreads([this, name = cluster.info()->name(), priority, + tls_->runOnAllThreads([name = cluster.info()->name(), priority, update_params = HostSetImpl::updateHostsParams(*host_set), locality_weights = host_set->localityWeights(), hosts_added, hosts_removed, - overprovisioning_factor = host_set->overprovisioningFactor()]() { - ThreadLocalClusterManagerImpl::updateClusterMembership( - name, priority, update_params, locality_weights, hosts_added, hosts_removed, *tls_, + overprovisioning_factor = host_set->overprovisioningFactor()]( + ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().updateClusterMembership( + name, priority, update_params, locality_weights, hosts_added, hosts_removed, overprovisioning_factor); + return object; }); } void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) { - tls_->runOnAllThreads( - [this, host] { ThreadLocalClusterManagerImpl::onHostHealthFailure(host, *tls_); }); + tls_->runOnAllThreads([host](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().onHostHealthFailure(host); + return object; + }); } Host::CreateConnectionData ClusterManagerImpl::tcpConnForCluster(const std::string& cluster, @@ -1160,13 +1175,10 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn( } } -void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(const std::string& name, - const HostVector& hosts_removed, - ThreadLocal::Slot& tls) { - ThreadLocalClusterManagerImpl& config = tls.getTyped(); - - ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end()); - const auto& cluster_entry = config.thread_local_clusters_[name]; +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts( + const std::string& name, const HostVector& hosts_removed) { + ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); + const auto& cluster_entry = thread_local_clusters_[name]; ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size()); // We need to go through and purge any connection pools for hosts that got deleted. @@ -1178,11 +1190,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(const std::s void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, - const HostVector& hosts_removed, ThreadLocal::Slot& tls, uint64_t overprovisioning_factor) { - ThreadLocalClusterManagerImpl& config = tls.getTyped(); - - ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end()); - const auto& cluster_entry = config.thread_local_clusters_[name]; + const HostVector& hosts_removed, uint64_t overprovisioning_factor) { + ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); + const auto& cluster_entry = thread_local_clusters_[name]; ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, hosts_added.size(), hosts_removed.size()); cluster_entry->priority_set_.updateHosts(priority, std::move(update_hosts_params), @@ -1197,7 +1207,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( - const HostSharedPtr& host, ThreadLocal::Slot& tls) { + const HostSharedPtr& host) { // Drain all HTTP connection pool connections in the case of a host health failure. If outlier/ // health is due to ECMP flow hashing issues for example, a new set of connections might do @@ -1205,9 +1215,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( // TODO(mattklein123): This function is currently very specific, but in the future when we do // more granular host set changes, we should be able to capture single host changes and make them // more targeted. - ThreadLocalClusterManagerImpl& config = tls.getTyped(); { - const auto container = config.getHttpConnPoolsContainer(host); + const auto container = getHttpConnPoolsContainer(host); if (container != nullptr) { container->pools_->drainConnections(); } @@ -1217,8 +1226,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( // connections being closed, it only prevents new connections through the pool. The // CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any // active connections. - const auto& container = config.host_tcp_conn_pool_map_.find(host); - if (container != config.host_tcp_conn_pool_map_.end()) { + const auto& container = host_tcp_conn_pool_map_.find(host); + if (container != host_tcp_conn_pool_map_.end()) { for (const auto& pair : container->second.pools_) { const Tcp::ConnectionPool::InstancePtr& pool = pair.second; if (host->cluster().features() & @@ -1247,8 +1256,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( // in the configuration documentation in cluster setting // "close_connections_on_host_health_failure". Update the docs if this if this changes. while (true) { - const auto& it = config.host_tcp_conn_map_.find(host); - if (it == config.host_tcp_conn_map_.end()) { + const auto& it = host_tcp_conn_map_.find(host); + if (it == host_tcp_conn_map_.end()) { break; } TcpConnectionsMap& container = it->second; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 1196bd13db72..b235be21d99c 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -368,15 +368,13 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& skip_predicate) { // Post the priority set to worker threads. - tls_->runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()]() { + // TODO(mattklein123): Remove "this" capture. + tls_->runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()]( + ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { PriorityContextPtr priority_context = linearizePrioritySet(skip_predicate); Upstream::ThreadLocalCluster* cluster = cluster_manager_.get(cluster_name); ASSERT(cluster != nullptr); dynamic_cast(cluster->loadBalancer()) .refresh(std::move(priority_context)); + return object; }); } diff --git a/source/extensions/common/dynamic_forward_proxy/dns_cache_impl.cc b/source/extensions/common/dynamic_forward_proxy/dns_cache_impl.cc index b2e2d5defce1..20c8a62e7bcf 100644 --- a/source/extensions/common/dynamic_forward_proxy/dns_cache_impl.cc +++ b/source/extensions/common/dynamic_forward_proxy/dns_cache_impl.cc @@ -257,8 +257,10 @@ void DnsCacheImpl::updateTlsHostsMap() { } } - tls_slot_->runOnAllThreads([this, new_host_map]() { - tls_slot_->getTyped().updateHostMap(new_host_map); + tls_slot_->runOnAllThreads([new_host_map](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().updateHostMap(new_host_map); + return object; }); } diff --git a/source/extensions/common/tap/extension_config_base.cc b/source/extensions/common/tap/extension_config_base.cc index fda84e29fbeb..6578c02fc37b 100644 --- a/source/extensions/common/tap/extension_config_base.cc +++ b/source/extensions/common/tap/extension_config_base.cc @@ -63,15 +63,22 @@ const absl::string_view ExtensionConfigBase::adminId() { } void ExtensionConfigBase::clearTapConfig() { - tls_slot_->runOnAllThreads([this] { tls_slot_->getTyped().config_ = nullptr; }); + tls_slot_->runOnAllThreads([](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().config_ = nullptr; + return object; + }); } void ExtensionConfigBase::installNewTap(envoy::config::tap::v3::TapConfig&& proto_config, Sink* admin_streamer) { TapConfigSharedPtr new_config = config_factory_->createConfigFromProto(std::move(proto_config), admin_streamer); - tls_slot_->runOnAllThreads( - [this, new_config] { tls_slot_->getTyped().config_ = new_config; }); + tls_slot_->runOnAllThreads([new_config](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + object->asType().config_ = new_config; + return object; + }); } void ExtensionConfigBase::newTapConfig(envoy::config::tap::v3::TapConfig&& proto_config, diff --git a/source/server/overload_manager_impl.cc b/source/server/overload_manager_impl.cc index c0010f819be1..3249e3808b8d 100644 --- a/source/server/overload_manager_impl.cc +++ b/source/server/overload_manager_impl.cc @@ -323,11 +323,14 @@ void OverloadManagerImpl::flushResourceUpdates() { auto shared_updates = std::make_shared>(); std::swap(*shared_updates, state_updates_to_flush_); - tls_->runOnAllThreads([this, updates = std::move(shared_updates)] { - for (const auto& [action, state] : *updates) { - tls_->getTyped().setState(action, state); - } - }); + tls_->runOnAllThreads( + [updates = std::move(shared_updates)](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + for (const auto& [action, state] : *updates) { + object->asType().setState(action, state); + } + return object; + }); } for (const auto& [cb, state] : callbacks_to_flush_) { diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 135c6b424097..5944f3bc5c5a 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -55,10 +55,11 @@ class ThreadLocalStoreTestingPeer { const std::function& num_tls_hist_cb) { auto num_tls_histograms = std::make_shared>(0); thread_local_store_impl.tls_->runOnAllThreads( - [&thread_local_store_impl, num_tls_histograms]() { - auto& tls_cache = - thread_local_store_impl.tls_->getTyped(); + [num_tls_histograms](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + auto& tls_cache = object->asType(); *num_tls_histograms += tls_cache.tls_histogram_cache_.size(); + return object; }, [num_tls_hist_cb, num_tls_histograms]() { num_tls_hist_cb(*num_tls_histograms); }); } diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 76745d23099b..a625d57002a7 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -103,7 +103,7 @@ TEST_F(ThreadLocalInstanceImplTest, CallbackNotInvokedAfterDeletion) { SlotPtr slot = tls_.allocateSlot(); std::list holder; - EXPECT_CALL(thread_dispatcher_, post(_)).Times(6).WillRepeatedly(Invoke([&](Event::PostCb cb) { + EXPECT_CALL(thread_dispatcher_, post(_)).Times(4).WillRepeatedly(Invoke([&](Event::PostCb cb) { // Holds the posted callback. holder.push_back(cb); })); @@ -120,44 +120,31 @@ TEST_F(ThreadLocalInstanceImplTest, CallbackNotInvokedAfterDeletion) { total_callbacks++; return nullptr; }); - slot->runOnAllThreads([&total_callbacks]() { - // Callbacks happen on the main thread but not the workers, so track the total. - total_callbacks++; - }); ThreadStatus thread_status; - slot->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, - [&thread_status]() -> void { - // Callbacks happen on the main thread but not the workers. - EXPECT_EQ(thread_status.thread_local_calls_, 1); - thread_status.all_threads_complete_ = true; - }); - EXPECT_FALSE(thread_status.all_threads_complete_); - ThreadStatus thread_status2; slot->runOnAllThreads( - [&thread_status2]( + [&thread_status]( ThreadLocal::ThreadLocalObjectSharedPtr) -> ThreadLocal::ThreadLocalObjectSharedPtr { - ++thread_status2.thread_local_calls_; + ++thread_status.thread_local_calls_; return nullptr; }, - [&thread_status2]() -> void { + [&thread_status]() -> void { // Callbacks happen on the main thread but not the workers. - EXPECT_EQ(thread_status2.thread_local_calls_, 1); - thread_status2.all_threads_complete_ = true; + EXPECT_EQ(thread_status.thread_local_calls_, 1); + thread_status.all_threads_complete_ = true; }); - EXPECT_FALSE(thread_status2.all_threads_complete_); + EXPECT_FALSE(thread_status.all_threads_complete_); - EXPECT_EQ(3, total_callbacks); + EXPECT_EQ(2, total_callbacks); slot.reset(); EXPECT_EQ(freeSlotIndexesListSize(), 1); - EXPECT_CALL(main_dispatcher_, post(_)).Times(2); + EXPECT_CALL(main_dispatcher_, post(_)); while (!holder.empty()) { holder.front()(); holder.pop_front(); } - EXPECT_EQ(3, total_callbacks); + EXPECT_EQ(2, total_callbacks); EXPECT_TRUE(thread_status.all_threads_complete_); - EXPECT_TRUE(thread_status2.all_threads_complete_); tls_.shutdownGlobalThreading(); } @@ -202,20 +189,29 @@ TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) { // Validate ThreadLocal::runOnAllThreads behavior with all_thread_complete call back. TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { SlotPtr tlsptr = tls_.allocateSlot(); + TestThreadLocalObject& object_ref = setObject(*tlsptr); EXPECT_CALL(thread_dispatcher_, post(_)); EXPECT_CALL(main_dispatcher_, post(_)); // Ensure that the thread local call back and all_thread_complete call back are called. ThreadStatus thread_status; - tlsptr->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, - [&thread_status]() -> void { - EXPECT_EQ(thread_status.thread_local_calls_, 2); - thread_status.all_threads_complete_ = true; - }); + tlsptr->runOnAllThreads( + [&thread_status](ThreadLocal::ThreadLocalObjectSharedPtr object) + -> ThreadLocal::ThreadLocalObjectSharedPtr { + ++thread_status.thread_local_calls_; + return object; + }, + [&thread_status]() -> void { + EXPECT_EQ(thread_status.thread_local_calls_, 2); + thread_status.all_threads_complete_ = true; + }); EXPECT_TRUE(thread_status.all_threads_complete_); tls_.shutdownGlobalThreading(); + tlsptr.reset(); + EXPECT_EQ(freeSlotIndexesListSize(), 0); + EXPECT_CALL(object_ref, onDestroy()); tls_.shutdownThread(); } diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index 9bbd26a64465..dc6518c5068a 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -60,10 +60,6 @@ class MockInstance : public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; } bool currentThreadRegistered() override { return parent_.registered_; } - void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } - void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { - parent_.runOnAllThreads(cb, main_callback); - } void runOnAllThreads(const UpdateCb& cb) override { parent_.runOnAllThreads([cb, this]() { parent_.data_[index_] = cb(parent_.data_[index_]); }); }