Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc stream source does not emit empty optional when the sender is done #2612

Closed
asias opened this issue Jan 9, 2025 · 5 comments
Closed
Assignees

Comments

@asias
Copy link
Contributor

asias commented Jan 9, 2025

  1. rx side:
while (!stop) {
    auto opt = source();
    if (opt) {
        break;
    }
    process(opt);
}
  1. tx side
while (!done) {                     
    sink(data);
}
sink.close();
  • tx side finishes sending all the data
  • tx side close the sink
  • tx side calls source() to get all the data tx sends
  • tx side calls source() and get seastar::rpc::stream_closed before it gets an
    empty opt which indicates there is no more data. (I verified this happened in
    practice in the test)

This makes rx side think there is an seastar::rpc::stream_closed error incorrectly.

Instead source() should always return an empty opt to indicate there is no more data.

This bug happens only in debug build.

It was found in file_stream_test here:

scylladb/scylladb#22034 (comment)
https://github.com/scylladb/scylla-enterprise/issues/4295#issuecomment-2179901124

@bhalevy
Copy link
Member

bhalevy commented Jan 9, 2025

Instead source() should always return an empty opt to indicate there is no more data.

How can source() know that?
What if the sender closed the stream prematurely?
Isn't the bug on the tx side for not sending an empty frame to indicate it's done?

@asias
Copy link
Contributor Author

asias commented Jan 9, 2025

Instead source() should always return an empty opt to indicate there is no more data.

How can source() know that?

When the sink is closed by the peer and all data has been read out.

What if the sender closed the stream prematurely? Isn't the bug on the tx side for not sending an empty frame to indicate it's done?

We have the end of frame message in user level to tell the rx side there is no more data. The above example skipped it for simplicity.

@asias
Copy link
Contributor Author

asias commented Jan 9, 2025

This is what the doc say:

Basic element of streaming API is rpc::sink and rpc::source. The former
is used to send data and the later is to receive it. Client and server
has their own pair of sink and source. rpc::sink and rpc::source are
templated classes where template parameters describe a type of the data
that is sent/received. For instance the sink that is used to send messages
containing int and long will be of a type rpc::sink<int, long>. The
opposite end of the stream will have a source of the type rpc::source<int, long>
which will be used to receive those messages. Messages are received at a
source as std::optional containing an actual message as an std::tuple. Unengaged
optional means EOS (end of stream) - the stream was closed by a peer. If
error happen before EOS is received a receiver cannot be sure it received all
the data.

To send the data using rpc::source<int, long> one can write (assuming seastar::async context):

      while (has_data()) {
          int data1 = get_data1();
          long data2 = get_data2();
          sink(data1, data2).get(); // sends data
      }
      sink.close().get(); // closes stream

To receive:

      while (true) {
          std:optional<std::tuple<int, long>> data = source().get();
          if (!data) {
             // unengaged optional means EOS
             break;
          } else {
             auto [data1, data2] = *data;
             // process data
          }
      }

@bhalevy
Copy link
Member

bhalevy commented Jan 9, 2025

I concur

@gleb-cloudius
Copy link
Contributor

This bug happens only in debug build.

This hints at a continuation reordering somewhere. Or problem with cross shard stream sending (IIRC debug disables shard to shard optimization).

avikivity pushed a commit that referenced this issue Feb 2, 2025
… without error

queue::abort() drops all queued packets and report an error to a
consumer. If stream connection completes normally we want the consumer
to get all the data without errors, so abort the queue only in case of
an error. Otherwise the queue will wait to be consumed. Since closing
the stream involves sending a special EOS packet the consumer should not
hang since the queue will not be empty.

Fixes: #2612

Message-ID: <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants