Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tls: simplify implementation and fix one class of crashing bug #12833

Merged
merged 16 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 53 additions & 104 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,79 +26,73 @@ SlotPtr InstanceImpl::allocateSlot() {
ASSERT(!shutdown_);

if (free_slot_indexes_.empty()) {
SlotImplPtr slot(new SlotImpl(*this, slots_.size()));
auto wrapper = std::make_unique<Bookkeeper>(*this, std::move(slot));
slots_.push_back(wrapper->slot_.get());
return wrapper;
SlotPtr slot(new SlotImpl(*this, slots_.size()));
slots_.push_back(slot.get());
return slot;
}
const uint32_t idx = free_slot_indexes_.front();
free_slot_indexes_.pop_front();
ASSERT(idx < slots_.size());
SlotImplPtr slot(new SlotImpl(*this, idx));
SlotPtr slot(new SlotImpl(*this, idx));
slots_[idx] = slot.get();
return std::make_unique<Bookkeeper>(*this, std::move(slot));
return slot;
}

bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return thread_local_data_.data_.size() > index_;
}
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
: parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); });
Event::PostCb InstanceImpl::SlotImpl::wrapCallback(Event::PostCb cb) {
// See the header file comments for still_alive_guard_ for the purpose of this capture.
return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb] {
if (!still_alive_guard.expired()) {
cb();
}
};
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb);
bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
return thread_local_data_.data_.size() > index;
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
ASSERT(currentThreadRegistered());
return thread_local_data_.data_[index_];
bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return currentThreadRegisteredWorker(index_);
}

InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, SlotImplPtr&& slot)
: parent_(parent), slot_(std::move(slot)),
ref_count_(/*not used.*/ nullptr,
[slot = slot_.get(), &parent = this->parent_](uint32_t* /* not used */) {
// On destruction, post a cleanup callback on main thread, this could happen on
// any thread.
parent.scheduleCleanup(slot);
}) {}

ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot_->get(); }

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
slot_->runOnAllThreads(
[cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
},
complete_cb);
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
ASSERT(currentThreadRegisteredWorker(index));
return thread_local_data_.data_[index];
}

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb) {
slot_->runOnAllThreads([cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
});
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); },
complete_cb);
}

bool InstanceImpl::Bookkeeper::currentThreadRegistered() {
return slot_->currentThreadRegistered();
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
runOnAllThreads([cb, index = index_]() { setThreadLocal(index, cb(getWorker(index))); });
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) {
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, ref_count = this->ref_count_]() { cb(); });
void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb) {
parent_.runOnAllThreads(wrapCallback(cb));
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, main_callback, ref_count = this->ref_count_]() { cb(); },
main_callback);
void InstanceImpl::SlotImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
parent_.runOnAllThreads(wrapCallback(cb), main_callback);
}

void InstanceImpl::Bookkeeper::set(InitializeCb cb) {
slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher)
-> ThreadLocalObjectSharedPtr { return cb(dispatcher); });
void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(!parent_.shutdown_);

for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
dispatcher.post(wrapCallback(
[index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
}

// Handle main thread.
setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
Expand All @@ -115,39 +109,7 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
}
}

// Puts the slot into a deferred delete container, the slot will be destructed when its out-going
// callback reference count goes to 0.
void InstanceImpl::recycle(SlotImplPtr&& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(slot != nullptr);
auto* slot_addr = slot.get();
deferred_deletes_.insert({slot_addr, std::move(slot)});
}

// Called by the Bookkeeper ref_count destructor, the SlotImpl in the deferred deletes map can be
// destructed now.
void InstanceImpl::scheduleCleanup(SlotImpl* slot) {
if (shutdown_) {
// If server is shutting down, do nothing here.
// The destruction of Bookkeeper has already transferred the SlotImpl to the deferred_deletes_
// queue. No matter if this method is called from a Worker thread, the SlotImpl will be
// destructed on main thread when InstanceImpl destructs.
return;
}
if (std::this_thread::get_id() == main_thread_id_) {
// If called from main thread, save a callback.
ASSERT(deferred_deletes_.contains(slot));
deferred_deletes_.erase(slot);
return;
}
main_thread_dispatcher_->post([slot, this]() {
ASSERT(deferred_deletes_.contains(slot));
// The slot is guaranteed to be put into the deferred_deletes_ map by Bookkeeper destructor.
deferred_deletes_.erase(slot);
});
}

void InstanceImpl::removeSlot(SlotImpl& slot) {
void InstanceImpl::removeSlot(uint32_t slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);

// When shutting down, we do not post slot removals to other threads. This is because the other
Expand All @@ -158,18 +120,18 @@ void InstanceImpl::removeSlot(SlotImpl& slot) {
return;
}

const uint64_t index = slot.index_;
slots_[index] = nullptr;
ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), index) ==
slots_[slot] = nullptr;
ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) ==
free_slot_indexes_.end(),
fmt::format("slot index {} already in free slot set!", index));
free_slot_indexes_.push_back(index);
runOnAllThreads([index]() -> void {
fmt::format("slot index {} already in free slot set!", slot));
free_slot_indexes_.push_back(slot);
runOnAllThreads([slot]() -> void {
// This runs on each thread and clears the slot, making it available for a new allocations.
// This is safe even if a new allocation comes in, because everything happens with post() and
// will be sequenced after this removal.
if (index < thread_local_data_.data_.size()) {
thread_local_data_.data_[index] = nullptr;
// will be sequenced after this removal. It is also safe if there are callbacks pending on
// other threads because they will run first.
if (slot < thread_local_data_.data_.size()) {
thread_local_data_.data_[slot] = nullptr;
}
});
}
Expand Down Expand Up @@ -205,19 +167,6 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_c
}
}

void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(!parent_.shutdown_);

for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
const uint32_t index = index_;
dispatcher.post([index, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); });
}

// Handle main thread.
setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}

void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
if (thread_local_data_.data_.size() <= index) {
thread_local_data_.data_.resize(index + 1);
Expand Down
58 changes: 17 additions & 41 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
#include "common/common/logger.h"
#include "common/common/non_copyable.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace ThreadLocal {

Expand All @@ -32,32 +30,15 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
Event::Dispatcher& dispatcher() override;

private:
// On destruction returns the slot index to the deferred delete queue (detaches it). This allows
// a slot to be destructed on the main thread while controlling the lifetime of the underlying
// slot as callbacks drain from workers.
struct SlotImpl : public Slot {
SlotImpl(InstanceImpl& parent, uint64_t index) : parent_(parent), index_(index) {}
~SlotImpl() override { parent_.removeSlot(*this); }

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
bool currentThreadRegistered() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override;
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
void set(InitializeCb cb) override;

InstanceImpl& parent_;
const uint64_t index_;
};

using SlotImplPtr = std::unique_ptr<SlotImpl>;

// A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue
// (detaches it).
struct Bookkeeper : public Slot {
Bookkeeper(InstanceImpl& parent, SlotImplPtr&& slot);
~Bookkeeper() override { parent_.recycle(std::move(slot_)); }
SlotImpl(InstanceImpl& parent, uint32_t index);
~SlotImpl() override { parent_.removeSlot(index_); }
Event::PostCb wrapCallback(Event::PostCb cb);
static bool currentThreadRegisteredWorker(uint32_t index);
static ThreadLocalObjectSharedPtr getWorker(uint32_t index);

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
Expand All @@ -69,35 +50,30 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
void set(InitializeCb cb) override;

InstanceImpl& parent_;
SlotImplPtr slot_;
std::shared_ptr<uint32_t> ref_count_;
const uint32_t index_;
// The following is used to safely verify via weak_ptr that this slot is still alive. This
// does not prevent all races if a callback does not capture appropriately, but it does fix
// the common case of a slot destroyed immediately before anything is posted to a worker.
// TODO(mattklein123): Add clang-tidy analysis rule to check that "this" is not captured by
// a TLS function call.
std::shared_ptr<bool> still_alive_guard_;
};

struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};

void recycle(SlotImplPtr&& slot);
// Cleanup the deferred deletes queue.
void scheduleCleanup(SlotImpl* slot);

void removeSlot(SlotImpl& slot);
void removeSlot(uint32_t slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;

// A indexed container for Slots that has to be deferred to delete due to out-going callbacks
// pointing to the Slot. To let the ref_count_ deleter find the SlotImpl by address, the container
// is defined as a map of SlotImpl address to the unique_ptr<SlotImpl>.
absl::flat_hash_map<SlotImpl*, SlotImplPtr> deferred_deletes_;

std::vector<SlotImpl*> slots_;
std::vector<Slot*> slots_;
// A list of index of freed slots.
std::list<uint32_t> free_slot_indexes_;

std::list<std::reference_wrapper<Event::Dispatcher>> registered_threads_;
std::thread::id main_thread_id_;
Event::Dispatcher* main_thread_dispatcher_{};
Expand Down
42 changes: 0 additions & 42 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class ThreadLocalInstanceImplTest : public testing::Test {
object.reset();
return object_ref;
}
int deferredDeletesMapSize() { return tls_.deferred_deletes_.size(); }
int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); }
InstanceImpl tls_;

Expand All @@ -60,7 +59,6 @@ TEST_F(ThreadLocalInstanceImplTest, All) {
EXPECT_CALL(thread_dispatcher_, post(_));
SlotPtr slot1 = tls_.allocateSlot();
slot1.reset();
EXPECT_EQ(deferredDeletesMapSize(), 0);
EXPECT_EQ(freeSlotIndexesListSize(), 1);

// Create a new slot which should take the place of the old slot. ReturnPointee() is used to
Expand All @@ -86,52 +84,12 @@ TEST_F(ThreadLocalInstanceImplTest, All) {
slot3.reset();
slot4.reset();
EXPECT_EQ(freeSlotIndexesListSize(), 0);
EXPECT_EQ(deferredDeletesMapSize(), 2);

EXPECT_CALL(object_ref4, onDestroy());
EXPECT_CALL(object_ref3, onDestroy());
tls_.shutdownThread();
}

TEST_F(ThreadLocalInstanceImplTest, DeferredRecycle) {
InSequence s;

// Free a slot without ever calling set.
EXPECT_CALL(thread_dispatcher_, post(_));
SlotPtr slot1 = tls_.allocateSlot();
slot1.reset();
// Slot destructed directly, as there is no out-going callbacks.
EXPECT_EQ(deferredDeletesMapSize(), 0);
EXPECT_EQ(freeSlotIndexesListSize(), 1);

// Allocate a slot and set value, hold the posted callback and the slot will only be returned
// after the held callback is destructed.
{
SlotPtr slot2 = tls_.allocateSlot();
EXPECT_EQ(freeSlotIndexesListSize(), 0);
{
Event::PostCb holder;
EXPECT_CALL(thread_dispatcher_, post(_)).WillOnce(Invoke([&](Event::PostCb cb) {
// Holds the posted callback.
holder = cb;
}));
slot2->set(
[](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return nullptr; });
slot2.reset();
// Not released yet, as holder has a copy of the ref_count_.
EXPECT_EQ(freeSlotIndexesListSize(), 0);
EXPECT_EQ(deferredDeletesMapSize(), 1);
// This post is called when the holder dies.
EXPECT_CALL(thread_dispatcher_, post(_));
}
// Slot is deleted now that there holder destructs.
EXPECT_EQ(deferredDeletesMapSize(), 0);
EXPECT_EQ(freeSlotIndexesListSize(), 1);
}

tls_.shutdownGlobalThreading();
}

// Test that the config passed into the update callback is the previous version stored in the slot.
TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) {
InSequence s;
Expand Down