diff --git a/lib/async/http/body/writable.rb b/lib/async/http/body/writable.rb index a86d4a8a..0847cc27 100644 --- a/lib/async/http/body/writable.rb +++ b/lib/async/http/body/writable.rb @@ -25,6 +25,7 @@ def initialize(length = nil, queue: Async::Queue.new) @count = 0 + # Whether there is any more data to read from this body: @finished = false @closed = false @@ -66,6 +67,11 @@ def read unless chunk = @queue.dequeue @finished = true + + # If the queue was closed, and there was an error, raise it. + if @closed and @error + raise(@error) + end end return chunk diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 901d25c3..55800f21 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -99,14 +99,14 @@ def each(task: Task.current) # Gracefully finish reading the request body if it was not already done so. request&.each{} - - # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. - task.yield rescue => error raise ensure body&.close(error) end + + # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. + task.yield end end diff --git a/test/async/http/protocol/graceful_stop.rb b/test/async/http/protocol/graceful_stop.rb new file mode 100644 index 00000000..d7036431 --- /dev/null +++ b/test/async/http/protocol/graceful_stop.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2023, by Samuel Williams. +# Copyright, 2020, by Igor Sidorov. + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' +require 'async/http/body/hijack' +require 'tempfile' + +require 'async/http/protocol/http10' +require 'sus/fixtures/async/http/server_context' + +AGracefulStop = Sus::Shared("a graceful stop") do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:chunks) {Async::Queue.new} + + with 'a streaming server (defered stop body)' do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + body = ::Async::HTTP::Body::Writable.new + + Async do |task| + task.defer_stop do + while chunk = chunks.dequeue + body.write(chunk) + end + end + ensure + body.close($!) + end + + ::Protocol::HTTP::Response[200, {}, body] + end + end + + it "should stop gracefully" do + response = client.get("/") + expect(response).to be(:success?) + + @server_task.stop + + chunks.enqueue("Hello, World!") + expect(response.body.read).to be == "Hello, World!" + chunks.enqueue(nil) + ensure + response&.close + end + end + + with 'a streaming server' do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + body = ::Async::HTTP::Body::Writable.new + + Async do |task| + while chunk = chunks.dequeue + body.write(chunk) + end + ensure + body.close($!) + end + + ::Protocol::HTTP::Response[200, {}, body] + end + end + + it "should stop gracefully" do + response = client.get("/") + expect(response).to be(:success?) + + @server_task.stop + + chunks.enqueue("Hello, World!") + expect do + response.read + end.to raise_exception(EOFError) + end + end +end + +describe Async::HTTP::Protocol::HTTP11 do + it_behaves_like AGracefulStop +end diff --git a/test/async/http/protocol/http10.rb b/test/async/http/protocol/http10.rb index 26ae0be4..7d5197c3 100644 --- a/test/async/http/protocol/http10.rb +++ b/test/async/http/protocol/http10.rb @@ -5,6 +5,7 @@ require 'async/http/protocol/http10' require 'async/http/a_protocol' +require 'async/http/a_graceful_stop' describe Async::HTTP::Protocol::HTTP10 do it_behaves_like Async::HTTP::AProtocol