Skip to content

Commit

Permalink
fix: assign threadlocal data structures during connection migration
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Nov 30, 2023
1 parent d15bcf8 commit a96d4db
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
39 changes: 28 additions & 11 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
DCHECK(dispatch_q_.empty());

service_->OnClose(cc_.get());

stats_->read_buf_capacity -= io_buf_.Capacity();

// Update num_replicas if this was a replica connection.
if (cc_->replica_conn) {
--stats_->num_replicas;
}
ReduceStatsOnClose();

// We wait for dispatch_fb to finish writing the previous replies before replying to the last
// offending request.
Expand Down Expand Up @@ -657,8 +651,6 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec
<< " " << ec.message();
}

--stats_->num_conns;
}

void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) {
Expand Down Expand Up @@ -826,10 +818,25 @@ void Connection::HandleMigrateRequest() {
// handles. We can't check above, as the queue might have contained a subscribe request.
if (cc_->subscriptions == 0) {
migration_request_ = nullptr;

ReduceStatsOnClose();

auto update_tl_vars = [this] __attribute__((noinline)) {
asm volatile("");
queue_backpressure_ = &tl_queue_backpressure_;

stats_ = service_->GetThreadLocalConnectionStats();
++stats_->num_conns;
stats_->read_buf_capacity += io_buf_.Capacity();
if (cc_->replica_conn) {
++stats_->num_replicas;
}
};
this->Migrate(dest);

// We're now running in `dest` thread
queue_backpressure_ = &tl_queue_backpressure_;
// We're now running in `dest` thread. We use non-inline lambda to make sure that
// thread local variables have not been loaded into a register before we migrated.
update_tl_vars();
}

DCHECK(dispatch_q_.empty());
Expand Down Expand Up @@ -1333,4 +1340,14 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
};
}

void Connection::ReduceStatsOnClose() {
stats_->read_buf_capacity -= io_buf_.Capacity();

// Update num_replicas if this was a replica connection.
if (cc_->replica_conn) {
--stats_->num_replicas;
}
--stats_->num_conns;
}

} // namespace facade
2 changes: 2 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ class Connection : public util::Connection {
std::unique_ptr<ConnectionContext> cc_; // Null for http connections

private:
void ReduceStatsOnClose();

std::deque<MessageHandle> dispatch_q_; // dispatch queue
dfly::EventCount evc_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
Expand Down

0 comments on commit a96d4db

Please sign in to comment.