Skip to content

Commit

Permalink
[yugabyte#19090] Fix a memory leak in tracked outbound calls in Reactor
Browse files Browse the repository at this point in the history
Summary:
Fix a memory leak in tracked_outbound_calls_ in Reactor by cleaning up that data structure more aggressively:
- When traversing tracked calls in the order of increasing next check time, even if it already greater than the current time, stay in the loop and continue removing entries where the callback has been called or the weak pointer has expired.
- When a callback is invoked, notify the reactor through a multi-producer single-consumer queue that is accessed from the OutboundCall via a weak pointer.

Also remove the FinalizeTrackedCall function that was likely useless because it was called concurrently with the callback being handled by the thread pool.

In addition, we are switching OutboundCall allocation away from using make_shared to prevent long-lived weak OutboundCall pointers from consuming memory.

Test Plan:
Jenkins

Local sysbench testing:

```
./yb_build.sh release --no-tests --skip-java
bin/yb-ctl wipe_restart --tserver_flag="memory_limit_hard_bytes=$(( 4 * 1024 * 1024 * 1024 ))"
bin/yb-ctl add_node
bin/yb-ctl add_node
```

Verify we have 3 nodes:

```
bin/yb-ctl status
```

Run sysbench with a table size of 1000000 using a modified version of the run_sysbench.sh script ( https://gist.githubusercontent.com/mbautin/6f0d441b28479232bd2af9b434bed242/raw )

```
./run_sysbench.sh --tablesize 1000000
```

Wait until the run completes. Check the logs for errors (there should be none):

```
find ~/yugabyte-data -name "yb-tserver.INFO" -exec grep -E "Stuck|large number" {} \;
```

Reviewers: sergei

Reviewed By: sergei

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D28531
  • Loading branch information
mbautin committed Sep 17, 2023
1 parent 97ef8af commit 0648341
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 74 deletions.
3 changes: 0 additions & 3 deletions src/yb/rpc/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ void ActiveCallExpired(
auto erase = false;
if (!call->IsFinished()) {
call->SetTimedOut();
reactor->FinalizeTrackedCall(call);
if (handle != kUnknownCallHandle) {
erase = stream->Cancelled(handle);
}
Expand Down Expand Up @@ -216,7 +215,6 @@ void Connection::Shutdown(const Status& provided_status) {
if (v.call) {
if (!v.call->IsFinished()) {
v.call->SetFailed(status);
reactor_->FinalizeTrackedCall(v.call);
}
v.call->SetActiveCallState(ActiveCallState::kErasedOnConnectionShutdown);
}
Expand Down Expand Up @@ -685,7 +683,6 @@ void Connection::ForceCallExpiration(const OutboundCallPtr& call) {
ActiveCallExpired(active_calls_, it, reactor_, stream_.get());
} else if (!call->IsFinished()) {
call->SetTimedOut();
reactor_->FinalizeTrackedCall(call);
}
}

Expand Down
41 changes: 40 additions & 1 deletion src/yb/rpc/outbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,32 @@ void InvokeCallbackTask::Done(const Status& status) {
call_ = nullptr;
}

///
/// CompletedCallQueue
///

void CompletedCallQueue::AddCompletedCall(int32_t call_id) {
if (!stopping_.load(std::memory_order_acquire)) {
completed_calls_.Push(new CompletedCallEntry(call_id));
}
}

std::optional<int32_t> CompletedCallQueue::Pop() {
auto entry = std::unique_ptr<CompletedCallEntry>(completed_calls_.Pop());
if (!entry) {
return std::nullopt;
}
auto call_id = entry->call_id;
return call_id;
}

void CompletedCallQueue::Shutdown() {
// Using sequential consistency because we don't want queue draining operations to be reordered
// before setting stopping_ to true, which could have happened with memory_order_release.
stopping_ = true;
completed_calls_.Drain();
}

///
/// OutboundCall
///
Expand Down Expand Up @@ -457,11 +483,24 @@ void OutboundCall::InvokeCallbackSync(std::optional<CoarseTimePoint> now_optiona
// Could be destroyed during callback. So reset it.
controller_ = nullptr;
response_ = nullptr;

auto completed_call_queue = completed_call_queue_.lock();
if (completed_call_queue) {
completed_call_queue->AddCompletedCall(call_id_);
}
}

void OutboundCall::SetConnection(const ConnectionPtr& connection) {
if (!connection_weak_.Set(connection)) {
LOG_WITH_PREFIX(ERROR) << "Failed to set connection to " << AsString(connection);
LOG(WARNING) << "Failed to set connection to " << AsString(connection) << " on "
<< DebugString();
}
}

void OutboundCall::SetCompletedCallQueue(
const std::shared_ptr<CompletedCallQueue>& completed_call_queue) {
if (!completed_call_queue_.Set(completed_call_queue)) {
LOG(WARNING) << "Failed to set completed call queue on " << DebugString();
}
}

Expand Down
27 changes: 27 additions & 0 deletions src/yb/rpc/outbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,31 @@ class InvokeCallbackTask : public rpc::ThreadPoolTask {
OutboundCallPtr call_;
};

class CompletedCallQueue {
public:
CompletedCallQueue() {}
virtual ~CompletedCallQueue() {}

// Called when a callback finishes running for a particular call.
void AddCompletedCall(int32_t call_id);

std::optional<int32_t> Pop() ON_REACTOR_THREAD;

void Shutdown();

private:
struct CompletedCallEntry : MPSCQueueEntry<CompletedCallEntry> {
explicit CompletedCallEntry(int call_id_) : call_id(call_id_) {}
int32_t call_id;
};

// We use this queue to notify the reactor thread that calls have completed so we would stop
// tracking them.
MPSCQueue<CompletedCallEntry> completed_calls_;

std::atomic<bool> stopping_{false};
};

// Tracks the state of this OutboundCall in relation to the active_calls_ structure in Connection.
// Needed for debugging of stuck OutboundCalls where the callback never gets called.
YB_DEFINE_ENUM(ActiveCallState,
Expand Down Expand Up @@ -327,6 +352,7 @@ class OutboundCall : public RpcCall {
}

void SetConnection(const ConnectionPtr& connection);
void SetCompletedCallQueue(const std::shared_ptr<CompletedCallQueue>& completed_call_queue);

void SetInvalidStateTransition(RpcCallState old_state, RpcCallState new_state);

Expand Down Expand Up @@ -532,6 +558,7 @@ class OutboundCall : public RpcCall {

// This is used in Reactor-based timeout enforcement and for logging.
std::atomic<CoarseTimePoint> expires_at_{CoarseTimePoint::max()};
WriteOnceWeakPtr<CompletedCallQueue> completed_call_queue_;

// ----------------------------------------------------------------------------------------------
// Fields with custom synchronization
Expand Down
6 changes: 4 additions & 2 deletions src/yb/rpc/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,12 @@ void Proxy::AsyncRemoteCall(
const RemoteMethod* method, std::shared_ptr<const OutboundMethodMetrics> method_metrics,
AnyMessageConstPtr req, AnyMessagePtr resp, RpcController* controller,
ResponseCallback callback, const bool force_run_callback_on_reactor) {
controller->call_ = std::make_shared<OutboundCall>(
// Do not use make_shared to allow for long-lived weak OutboundCall pointers without wasting
// memory.
controller->call_ = std::shared_ptr<OutboundCall>(new OutboundCall(
*method, outbound_call_metrics_, std::move(method_metrics), resp, controller,
context_->rpc_metrics(), std::move(callback),
GetCallbackThreadPool(force_run_callback_on_reactor, controller->invoke_callback_mode()));
GetCallbackThreadPool(force_run_callback_on_reactor, controller->invoke_callback_mode())));
if (!PrepareCall(req, controller)) {
return;
}
Expand Down
78 changes: 40 additions & 38 deletions src/yb/rpc/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ DEFINE_RUNTIME_int32(stuck_outbound_call_default_timeout_sec, 120,
"Default timeout for reporting purposes for the Reactor-based stuck OutboundCall tracking and "
"expiration mechanism. That mechanism itself is controlled by the "
"reactor_based_outbound_call_expiration_delay_ms flag. Note that this flag does not force a "
"call to be timed out, it just specifies the interval after which the call is reported.");
"call to be timed out, it just specifies the interval after which the call is logged.");
TAG_FLAG(stuck_outbound_call_default_timeout_sec, advanced);

DEFINE_RUNTIME_int32(stuck_outbound_call_check_interval_sec, 60,
DEFINE_RUNTIME_int32(stuck_outbound_call_check_interval_sec, 30,
"Check and report each stuck outbound call at most once per this number of seconds.");
TAG_FLAG(stuck_outbound_call_check_interval_sec, advanced);

DEFINE_RUNTIME_int32(reactor_based_outbound_call_expiration_delay_ms, 0,
DEFINE_RUNTIME_int32(reactor_based_outbound_call_expiration_delay_ms, 1000,
"Expire OutboundCalls using Reactor-level logic with this delay after the timeout, as an "
"additional layer of protection against stuck outbound calls. This safety mechanism is "
"disabled if this flag is set to 0.");
Expand Down Expand Up @@ -258,6 +258,7 @@ Reactor::Reactor(Messenger* messenger,
MakeFunctorReactorTask(std::bind(&Reactor::ProcessOutboundQueue, this),
SOURCE_LOCATION())),
num_connections_to_server_(bld.num_connections_to_server()),
completed_call_queue_(std::make_shared<CompletedCallQueue>()),
cur_time_(CoarseMonoClock::Now()) {
static std::once_flag libev_once;
std::call_once(libev_once, DoInitLibEv);
Expand Down Expand Up @@ -391,9 +392,10 @@ void Reactor::ShutdownInternal() {
}
for (auto& call : processing_outbound_queue_) {
call->Transferred(aborted, /* conn= */ nullptr);
FinalizeTrackedCall(call);
}
processing_outbound_queue_.clear();

completed_call_queue_->Shutdown();
}

Status Reactor::GetMetrics(ReactorMetrics *metrics) {
Expand Down Expand Up @@ -556,15 +558,14 @@ ConnectionPtr Reactor::AssignOutboundCall(const OutboundCallPtr& call) {
}

call->SetConnection(conn);
call->SetCompletedCallQueue(completed_call_queue_);
conn->QueueOutboundCall(call);

if (ShouldTrackOutboundCalls()) {
auto expires_at = call->expires_at();
tracked_outbound_calls_.insert(TrackedOutboundCall {
.call_raw = call.get(),
.call_id = call->call_id(),
.call_weak = call,
// For calls with a timeout, we will check on them shortly after the timeout. Otherwise, we
// will check after a most likely conservatively large "default timeout".
.next_check_time = expires_at == CoarseTimePoint::max()
? call->start_time() + FLAGS_stuck_outbound_call_default_timeout_sec * 1s
: expires_at + FLAGS_reactor_based_outbound_call_expiration_delay_ms * 1ms
Expand Down Expand Up @@ -648,52 +649,63 @@ void Reactor::ScanForStuckOutboundCalls(CoarseTimePoint now) {
if (!ShouldTrackOutboundCalls()) {
return;
}
auto& index = tracked_outbound_calls_.get<NextCheckTimeTag>();

while (!index.empty() && index.begin()->next_check_time <= now) {
auto& top = *index.begin();
auto call = top.call_weak.lock();
auto& index_by_call_id = tracked_outbound_calls_.get<CallIdTag>();
while (auto call_id_opt = completed_call_queue_->Pop()) {
index_by_call_id.erase(*call_id_opt);
}

auto& index_by_next_check_time = tracked_outbound_calls_.get<NextCheckTimeTag>();
while (!index_by_next_check_time.empty()) {
auto& entry = *index_by_next_check_time.begin();
// It is useful to check the next entry even if its scheduled next check time is later than
// now, to erase entries corresponding to completed calls as soon as possible. This alone,
// even without the completed call queue, mostly solves #19090.
auto call = entry.call_weak.lock();
if (!call || call->callback_invoked()) {
index.erase(index.begin());
index_by_next_check_time.erase(index_by_next_check_time.begin());
continue;
}

auto expires_at = call->expires_at();
const auto expiration_enforcement_time = ExpirationEnforcementTime(expires_at);
const bool expired = now >= expiration_enforcement_time;

bool forced_expiration = false;
if (expired && !call->callback_triggered()) {
// Normally, timeout should be enforced at the connection level. Here, we will catch failures
// to do that.
forced_expiration = true;
if (entry.next_check_time > now) {
break;
}

// Normally, timeout should be enforced at the connection level. Here, we will catch failures
// to do that.
const bool forcing_timeout =
!call->callback_triggered() && now >= ExpirationEnforcementTime(call->expires_at());

auto call_str = call->DebugString();

auto conn = call->connection();

LOG_WITH_PREFIX(WARNING) << "Stuck OutboundCall: " << call_str
<< (forced_expiration ? " (forcing a timeout)" : "");
<< (forcing_timeout ? " (forcing a timeout)" : "");
IncrementCounter(messenger_.rpc_metrics()->outbound_calls_stuck);

if (forced_expiration) {
if (forcing_timeout) {
// Only do this after we've logged the call, so that the log would capture the call state
// before the forced timeout.
if (conn) {
// This calls SetTimeOut so we don't need to do it directly.
// This calls SetTimedOut so we don't need to do it directly.
conn->ForceCallExpiration(call);
} else {
LOG_WITH_PREFIX(WARNING) << "Connection is not set for a call that is being forcefuly "
<< "expired: " << call->DebugString();
LOG_WITH_PREFIX(WARNING) << "Connection is not set for a call that is being forcefully "
<< "expired: " << call_str;
call->SetTimedOut();
}
}

index.modify(index.begin(), [now](auto& tracked_call) {
index_by_next_check_time.modify(index_by_next_check_time.begin(), [now](auto& tracked_call) {
tracked_call.next_check_time = now + FLAGS_stuck_outbound_call_check_interval_sec * 1s;
});
}

if (tracked_outbound_calls_.size() >= 1000) {
YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1)
<< "tracked_outbound_calls_ has a large number of entries: "
<< tracked_outbound_calls_.size();
}
}

bool Reactor::IsCurrentThread() const {
Expand Down Expand Up @@ -961,12 +973,6 @@ void Reactor::RegisterInboundSocket(
<< ": " << scheduling_status;
}

void Reactor::FinalizeTrackedCall(const OutboundCallPtr &call) {
if (ShouldTrackOutboundCalls() && call->callback_invoked()) {
tracked_outbound_calls_.erase(call.get());
}
}

Status Reactor::ScheduleReactorTask(ReactorTaskPtr task, bool even_if_not_running) {
// task should never be null, so not using an SCHECK here.
CHECK_NOTNULL(task);
Expand Down Expand Up @@ -1007,9 +1013,5 @@ Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_loca
return task->Wait();
}

// Initialize with the current time so that it would be usable even before any reactor thread
// starts running.
std::atomic<CoarseTimePoint> Reactor::global_cur_time_{CoarseMonoClock::Now()};

} // namespace rpc
} // namespace yb
Loading

0 comments on commit 0648341

Please sign in to comment.