Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamedRequest.sink should be a StreamSink #727

Closed
pingbird opened this issue Jun 28, 2022 · 2 comments · Fixed by #965
Closed

StreamedRequest.sink should be a StreamSink #727

pingbird opened this issue Jun 28, 2022 · 2 comments · Fixed by #965
Labels
type-enhancement A request for a change that isn't a bug

Comments

@pingbird
Copy link

pingbird commented Jun 28, 2022

Right now StreamedRequest.sink is declared as an EventSink<List<int>>:

  /// The sink to which to write data that will be sent as the request body.
  ///
  /// This may be safely written to before the request is sent; the data will be
  /// buffered.
  ///
  /// Closing this signals the end of the request.
  EventSink<List<int>> get sink => _controller.sink;

However, the sink of a StreamController is a child class StreamSink, which has useful methods like addStream: https://api.dart.dev/stable/2.17.5/dart-async/StreamConsumer/addStream.html

Being able to add a stream using this method is essential if you need the stream to be paused when the send buffer is full, e.g. if you are reading from disk faster than you can send it over the network.

Here's a simple example showing how a StreamController is notified when the send buffer is full:

void main() async {
  final server = await HttpServer.bind('127.0.0.1', 8456);
  server.listen((event) {
    print('server got client');
    final sub = event.listen((event) {});
    // Pause it on the server end, Dart will no longer read from the socket
    // causing new data to be buffered by the OS
    sub.pause();
  });

  final request = StreamedRequest('post', Uri.parse('http://127.0.0.1:8456'));

  final controller = StreamController<List<int>>(
    onPause: () {
      print('paused');
    },
    onResume: () {
      print('resumed');
    },
  );

  Timer.periodic(
    const Duration(seconds: 1),
    (timer) {
      if (controller.isPaused) {
        print('not sending (paused)');
        return;
      }
      print('sending 512K');
      controller.add(Uint8List(512 * 1024));
    },
  );

  // Cast request.sink to a StreamSink so we can call addStream
  (request.sink as StreamSink<List<int>>).addStream(controller.stream);

  await request.send();
}

Outputs:

paused
resumed
sending 512K
paused
server got client
resumed
sending 512K
paused
resumed
sending 512K
paused
resumed
sending 512K
paused
resumed
sending 512K
paused
resumed
sending 512K
paused
not sending - paused
not sending - paused
not sending - paused
not sending - paused
not sending - paused

My proposal is just a 1 line change to the getter:

-  EventSink<List<int>> get sink => _controller.sink;
+  StreamSink<List<int>> get sink => _controller.sink;

This would also allow us to get upload progress events without adding any callbacks to post / send:

final progress = ValueNotifier<int>(0);
request.sink.addStream(
  file.openRead().map((e) {
    progress.value += e.length;
    return e;
  }),
);
@natebosch natebosch added the type-enhancement A request for a change that isn't a bug label Jul 6, 2022
@natebosch
Copy link
Member

@brianquinlan - this is technically breaking if any use case has an @override, but I don't see any of those on github. I'm tempted to roll this one out as non-breaking. WDYT?

natebosch added a commit that referenced this issue Jun 16, 2023
Closes #727

This class was not designed for extension, but there is at least one
extending implementation. I don't see any overrides of this field, so no
existing usage should be broken.
@brianquinlan
Copy link
Collaborator

Seems fine to me...I probably should have made StreamedRequest final and we wouldn't have this issue.

Maybe note the potential breakage in the changelog?

natebosch added a commit that referenced this issue Jun 20, 2023
Closes #727

This class was not designed for extension, but there is at least one
extending implementation. I don't see any overrides of this field, so no
existing usage should be broken.

Some new lints may surface for unawaited futures - the returned futures
should not be awaited.
781flyingdutchman added a commit to 781flyingdutchman/background_downloader that referenced this issue Oct 20, 2023
Caused by buffering of the stream, giving the impression the entire file is uploaded very quickly. Changed to a different streaming mechanism using StreamSink instead of EventSink, see dart-lang/http#727
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type-enhancement A request for a change that isn't a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants