Skip to content

Commit

Permalink
rpc stream: do not abort stream queue if stream connection was closed…
Browse files Browse the repository at this point in the history
… without error

queue::abort() drops all queued packets and report an error to a
consumer. If stream connection completes normally we want the consumer
to get all the data without errors, so abort the queue only in case of
an error. Otherwise the queue will wait to be consumed. Since closing
the stream involves sending a special EOS packet the consumer should not
hang since the queue will not be empty.

Fixes: #2612

Message-Id: <[email protected]>
  • Loading branch information
Gleb Natapov authored and tgrabiec committed Jan 15, 2025
1 parent 96098fb commit 27f834e
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/rpc/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,9 @@ namespace rpc {
log_exception(*this, log_level::debug, "fail to connect", ep);
}
}
_stream_queue.abort(ep);
}
_error = true;
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
_outstanding.clear();
Expand Down Expand Up @@ -1242,10 +1242,10 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
ep = f.get_exception();
log_exception(*this, log_level::error,
format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep);
_stream_queue.abort(ep);
}
_fd.shutdown_input();
_error = true;
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
get_server()._conns.erase(get_connection_id());
Expand Down

0 comments on commit 27f834e

Please sign in to comment.