Skip to content

Commit

Permalink
[yugabyte#18685] Track stuck OutboundCalls and enforce timeouts at Re…
Browse files Browse the repository at this point in the history
…actor level

Summary:
Track all outbound calls submitted to a Reactor in a multi-index container in that reactor. Remove them from the container as soon as we learn the callback has been called. Report stuck calls with detailed debug information. A call is considered stuck if it is a certain time past its expiration (specified by reactor_based_outbound_call_expiration_delay_ms -- set it to 0 to disable this feature). If the call does not have a timeout, the default value stuck_outbound_call_default_timeout_sec is used for reporting only, but not for timeout enforcement.

OutboundCall now keeps track of its expiration deadline, and an additional field active_call_state_, which indicates whether the call was added to the connection's active_calls_ in the corresponding connection, or removed from it, and the removal reason.

If in the OutboundCall destructor the state of the call is not final, or the callback has not been called, we will log that even in release build.

In OutboundCall::SetState, if the current state is already finished, treat this the same as any other invalid state transition. Logically, there should never be a race between e.g. processing a call response and a call timeout, because both events happen on the reactor thread.

Fixing error handling in DoQueueOutboundData. If there is an error sending the call, we destroy the connection.

Introduce a typedef CallHandle and a special value kUnknownCallHandle instead of just using the max value of size_t in case the call handle is unknown or not being used. Change the error handling logic in Connection::DoQueueOutboundData to avoid returning kUnknownCallHandle in case of errors, and make sure the callback is called on the call in case of those errors. The connection was already being shut down.

Update the YB_STRUCT_TO_STRING macro implementation to always add parentheses around an explicitly specified field value.

We introduce multiple ways to simulate stuck outbound calls for testing.
- TEST_simulated_sent_stuck_call_probability specifies the probability of pretending a call is sent to the remote server in `Connection::QueueOutboundCall`, but instead just transitioning it to a SENT state. This is similar to the situation that we have observed.
- TEST_simulated_failure_to_send_call_probability specifies the probability of a network error in `stream_->Send()` called from `Connection::DoQueueOutboundData`. This will cause the connection to be closed. Prior to this revision, this kind of an error would cause `Connection::QueueOutboundCall` to attempt to schedule a timer in a connection that has already been shut down.
- TEST_outbound_call_skip_callback_probability specifies the probability of skipping calling the callback on a particular remote RPC call. We don't do this for local calls.

Also replacing a DFATAL with an WARNING log in InboundCall::QueueResponse to avoid crashing tests during shutdown. Failures to queue a response on the server side should just result in timeouts on the client side in the worst case.

Test Plan:
Jenkins

Manual testing details below.

First test mode
===============

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_simulated_sent_stuck_call_probability=0.0001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3

java -jar yb-sample-apps.jar  --workload CassandraBatchKeyValue --nodes 127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042 --num_threads_read 16 --num_threads_write 16 --num_reads 1000000000000 --num_writes 1000000000000
```

Look for output like this in the logs:

```
W0907 06:27:34.604096 54296 connection.cc:302] Connection (0x0000147cbdb4de18) client 127.0.0.1:51949 => 127.0.0.2:9100: Simulating a call stuck in SENT state: RPC call 0x0000147cba7f4de0: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 5 protocol: 0x00007f276aefeb60 -> tcpc }, id: 7840, state: SENT, transfer_state: PENDING, start_time: 3491.468s (0.001s ago), sent_time: -inf, trigger_callback_time: -inf, invoke_callback_time: -inf, expiration_time: +inf, now: 3491.469s, connection: Connection (0x0000147cbdb4de18) client 127.0.0.1:51949 => 127.0.0.2:9100, active_call_state: kNotAdded
...
W0907 06:27:38.667785 54296 reactor.cc:676] TabletServer_R010: Stuck OutboundCall: RPC call 0x0000147cba7f4de0: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 5 protocol: 0x00007f276aefeb60 -> tcpc }, id: 7840, state: SENT, transfer_state: PENDING, start_time: 3491.468s (4.064s ago), sent_time: -inf, trigger_callback_time: -inf, invoke_callback_time: -inf, expiration_time: 3494.469s (1.063s ago), now: 3495.532s, connection: Connection (0x0000147cbdb4de18) client 127.0.0.1:51949 => 127.0.0.2:9100, active_call_state: kNotAdded (forcing a timeout)
W0907 06:27:38.667878 54602 consensus_peers.cc:603] T 49b1706cfba3483b8da3a286fd2115b1 P 02949377cfb94b308697b79fd5581fcf -> Peer fb5b7429056348329f3af97271a630ac ([host: "127.0.0.2" port: 9100], []): Couldn't send request.  Status: Timed out (yb/rpc/outbound_call.cc:603): UpdateConsensus RPC (request call id 7840) to 127.0.0.2:9100 timed out after 3.000s. Retrying in the next heartbeat period. Already tried 1 times. State: 2
```

Ensure that all stuck outbound calls are eventually detected. After stopping the workload, the output of the command below should gradually decrease to zero after a couple of minutes:

```
log_path="$HOME/yugabyte-data/node-1/disk-1/yb-data/tserver/logs/yb-tserver.INFO"; grep "Simulating a call stuck in SENT state" "$log_path" | egrep -Eo 'id: [0-9]+' | awk '{print $NF}' | sort -n | while read c; do if ! grep "id: $c," "$log_path" | grep -q "Stuck OutboundCall"; then echo "Undetected stuck OutboundCall: id=$c"; fi; done | wc -l
```

Also, these stuck calls should stop appearing in the log because they would be forcibly timed out. If they were not, they would be logged every minute.

Second test mode
================

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_simulated_failure_to_send_call_probability=0.00001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3
```

Same workload as above.

Look for "Simulated failure to send outbound data" in the tablet server log:

```
W0907 06:35:47.248174 58709 connection.cc:391] Simulated network failure: Network error (yb/rpc/connection.cc:390): Simulated failure to send outbound data for 0x0000068b3935f500 -> Call yb.tserver.TabletServerService.Write 127.0.0.2:47425 => 127.0.0.1:9100 (request call id 6511747)
...
W0907 06:35:47.248219 58709 inbound_call.cc:95] 0x0000068b3935f500 -> Call yb.tserver.TabletServerService.Write 127.0.0.2:47425 => 127.0.0.1:9100 (request call id 6511747): Connection torn down before Call yb.tserver.TabletServerService.Write 127.0.0.2:47425 => 127.0.0.1:9100 (request call id 6511747) could send its response: Network error (yb/rpc/connection.cc:390): Simulated failure to send outbound data for 0x0000068b3935f500 -> Call yb.tserver.TabletServerService.Write 127.0.0.2:47425 => 127.0.0.1:9100 (request call id 6511747)
```

There should be no stuck outbound calls in the log, because these simulated failures will just result in the connection being closed. Depending on the value TEST_simulated_failure_to_send_call_probability, the workload may succeed or fail. With the value 0.00001, it manages to make progress in my experience.

Third test mode
===============

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_outbound_call_skip_callback_probability=0.0001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3
```

Same workload as above.

Look for "Skipping OutboundCall callback as a test" and "Stuck OutboundCall" in the log.

Output:
```
W0907 06:38:05.760689 60982 outbound_call.cc:422] OutboundCall@0x00000774ff2a3ba0: Skipping OutboundCall callback as a test: RPC call 0x00000774ff2a3ba0: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 7 protocol: 0x00007f776d81eb60 -> tcpc }, id: 703, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 4122.626s (now), sent_time: 4122.626s (now), trigger_callback_time: 4122.626s (now), invoke_callback_time: -inf, expiration_time: 4125.626s (3.000s from now), now: 4122.626s, connection: Connection (0x00000774fd76a158) client 127.0.0.1:46355 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
...
W0907 06:38:09.768546 60605 reactor.cc:676] TabletServer_R012: Stuck OutboundCall: RPC call 0x00000774ff2a3ba0: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 7 protocol: 0x00007f776d81eb60 -> tcpc }, id: 703, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 4122.626s (4.008s ago), sent_time: 4122.626s (4.008s ago), trigger_callback_time: 4122.626s (4.008s ago), invoke_callback_time: -inf, expiration_time: 4125.626s (1.008s ago), now: 4126.634s, connection: Connection (0x00000774fd76a158) client 127.0.0.1:46355 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
...
W0907 06:39:09.867923 60605 reactor.cc:676] TabletServer_R012: Stuck OutboundCall: RPC call 0x00000774ff2a3ba0: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 7 protocol: 0x00007f776d81eb60 -> tcpc }, id: 703, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 4122.626s (64.106s ago), sent_time: 4122.626s (64.106s ago), trigger_callback_time: 4122.626s (64.106s ago), invoke_callback_time: -inf, expiration_time: 4125.626s (61.106s ago), now: 4186.733s, connection: Connection (0x00000774fd76a158) client 127.0.0.1:46355 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
```

Similarly to the first test, wait for all stuck outbound calls to get reported:
```
log_path="$HOME/yugabyte-data/node-1/disk-1/yb-data/tserver/logs/yb-tserver.INFO"; grep "Skipping OutboundCall callback as a test" "$log_path" | egrep -Eo 'id: [0-9]+' | awk '{print $NF}' | sort -n | while read c; do if ! grep "id: $c," "$log_path" | grep -q "Stuck OutboundCall"; then echo "Undetected stuck OutboundCall: id=$c"; fi; done | wc -l
```

In this mode, because we are skipping callbacks but allowing state transitions to the finished state, forced expiration at reactor level does not work.

Reviewers: sergei, kpopali

Reviewed By: sergei

Subscribers: bogdan, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D27735
  • Loading branch information
mbautin committed Sep 7, 2023
1 parent e7c5fce commit 1fbff49
Show file tree
Hide file tree
Showing 18 changed files with 642 additions and 237 deletions.
256 changes: 186 additions & 70 deletions src/yb/rpc/connection.cc

Large diffs are not rendered by default.

46 changes: 36 additions & 10 deletions src/yb/rpc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
~Connection();

CoarseTimePoint last_activity_time() const {
return last_activity_time_;
return last_activity_time_.load(std::memory_order_acquire);
}

void UpdateLastActivity() override;
Expand Down Expand Up @@ -178,8 +178,9 @@ class Connection final : public StreamContext, public std::enable_shared_from_th

Status DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) ON_REACTOR_THREAD;

// Do appropriate actions after adding outbound call.
void OutboundQueued() ON_REACTOR_THREAD;
// Do appropriate actions after adding outbound call. If the connection is shutting down,
// returns the connection's shutdown status.
Status OutboundQueued() ON_REACTOR_THREAD;

// An incoming packet has completed on the client side. This parses the
// call response, looks up the CallAwaitingResponse, and calls the
Expand All @@ -199,21 +200,35 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
return rpc_metrics_;
}

CoarseTimePoint last_activity_time() {
return last_activity_time_.load(std::memory_order_acquire);
// Returns the connection's shutdown status, or OK if shutdown has not happened yet.
Status ShutdownStatus() const;

bool shutdown_initiated() const {
return shutdown_initiated_.load(std::memory_order_acquire);
}

bool shutdown_completed() const {
return shutdown_completed_.load(std::memory_order_acquire);
}

// Used in Reactor-based stuck outbound call monitoring mechanism.
void ForceCallExpiration(const OutboundCallPtr& call) ON_REACTOR_THREAD;

private:
Status DoWrite();
// Marks the given call as failed and schedules destruction of the connection.
void FailCallAndDestroyConnection(const OutboundDataPtr& outbound_data,
const Status& status) ON_REACTOR_THREAD;

void ScheduleDestroyConnection(const Status& status) ON_REACTOR_THREAD;

// Does actual outbound data queuing.
//
// If the `batch` argument is false, calls the OutboundQueued function at the end. See
// QueueOutboundDataBatch for how this is used.
//
// Returns the handle corresponding to the queued call, or std::numeric_limits<size_t>::max() in
// case the connection is shutting down.
size_t DoQueueOutboundData(OutboundDataPtr call, bool batch) ON_REACTOR_THREAD;
// case the handle is unknown, or an error in case the connection is shutting down.
Result<size_t> DoQueueOutboundData(OutboundDataPtr call, bool batch) ON_REACTOR_THREAD;

void ProcessResponseQueue() ON_REACTOR_THREAD;

Expand Down Expand Up @@ -270,7 +285,15 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
int32_t id = 0; // Call id.
OutboundCallPtr call; // Call object, null if call has expired.
CoarseTimePoint expires_at; // Expiration time, kMax when call has expired.
size_t handle = 0; // Call handle in outbound stream.
CallHandle handle = 0; // Call handle in outbound stream.

std::string ToString(std::optional<CoarseTimePoint> now = std::nullopt) const {
return YB_STRUCT_TO_STRING(
id,
(call, AsString(pointer_cast<const void*>(call.get()))),
(expires_at, yb::ToStringRelativeToNow(expires_at, now)),
handle);
}
};

class ExpirationTag;
Expand Down Expand Up @@ -299,7 +322,7 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
// Fields protected by outbound_data_queue_mtx_
// ----------------------------------------------------------------------------------------------

simple_spinlock outbound_data_queue_mtx_;
mutable simple_spinlock outbound_data_queue_mtx_;

// Responses we are going to process.
std::vector<OutboundDataPtr> outbound_data_to_process_ GUARDED_BY(outbound_data_queue_mtx_);
Expand All @@ -323,6 +346,9 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
std::atomic<CoarseTimePoint> last_activity_time_{CoarseTimePoint::min()};

std::atomic<bool> queued_destroy_connection_{false};

std::atomic<bool> shutdown_initiated_{false};
std::atomic<bool> shutdown_completed_{false};
};

} // namespace rpc
Expand Down
4 changes: 3 additions & 1 deletion src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ void InboundCall::QueueResponse(bool is_success) {
if (responded_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
auto queuing_status =
connection()->context().QueueResponse(connection(), shared_from(this));
LOG_IF_WITH_PREFIX(DFATAL, !queuing_status.ok())
// Do not DFATAL here because it is a normal situation during reactor shutdown. The client
// should detect and handle the error.
LOG_IF_WITH_PREFIX(WARNING, !queuing_status.ok())
<< "Could not queue response to an inbound call: " << queuing_status;
} else {
LOG_WITH_PREFIX(DFATAL) << "Response already queued";
Expand Down
Loading

0 comments on commit 1fbff49

Please sign in to comment.