Skip to content

Commit

Permalink
Use io-stream for buffered streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 23, 2024
1 parent 44f922d commit fb578ea
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 37 deletions.
3 changes: 2 additions & 1 deletion async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.1"

spec.add_dependency "async", ">= 1.25"
spec.add_dependency "async-io", ">= 1.28"
spec.add_dependency "async-io", ">= 1.43.1"
spec.add_dependency "async-pool", ">= 0.6.1"
spec.add_dependency "io-stream", "~> 0.1.1"
spec.add_dependency "protocol-http", "~> 0.26.0"
spec.add_dependency "protocol-http1", "~> 0.19.0"
spec.add_dependency "protocol-http2", "~> 0.17.0"
Expand Down
7 changes: 6 additions & 1 deletion fixtures/async/http/a_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ module HTTP
end

it "disconnects slow clients" do
# We won't be able to disconnect slow clients if IO#timeout is not available:
skip_unless_method_defined(:timeout, IO)

response = client.get("/")
response.read

Expand Down Expand Up @@ -478,9 +481,11 @@ def after
end

it "can't get /" do
skip_unless_method_defined(:timeout, IO)

expect do
client.get("/")
end.to raise_exception(Async::TimeoutError)
end.to raise_exception(::IO::TimeoutError)
end
end

Expand Down
1 change: 1 addition & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# gem "async", path: "../async"
# gem "async-io", path: "../async-io"
# gem "io-stream", path: "../io-stream"
# gem "traces", path: "../traces"

# gem "protocol-http", path: "../protocol-http"
Expand Down
9 changes: 3 additions & 6 deletions lib/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2020, by Bruno Sutic.

require 'async/io/socket'
require 'async/io/stream'

require_relative 'writable'

module Async
Expand All @@ -18,9 +15,9 @@ def initialize(input, output = Writable.new, task: Task.current)
@input = input
@output = output

head, tail = IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)

@head = IO::Stream.new(head)
@head = ::IO::Stream::Buffered.new(head)
@tail = tail

@reader = nil
Expand Down
8 changes: 5 additions & 3 deletions lib/async/http/protocol/http1.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2023, by Samuel Williams.
# Copyright, 2017-2024, by Samuel Williams.

require_relative 'http1/client'
require_relative 'http1/server'

require 'io/stream/buffered'

module Async
module HTTP
module Protocol
Expand All @@ -21,13 +23,13 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Server.new(stream, VERSION)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/protocol/http1/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@ready && @stream&.connected?
@ready && @stream&.readable?
end

def reusable?
Expand Down
6 changes: 3 additions & 3 deletions lib/async/http/protocol/http10.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2023, by Samuel Williams.
# Copyright, 2017-2024, by Samuel Williams.

require_relative 'http1'

Expand All @@ -20,13 +20,13 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Server.new(stream, VERSION)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/async/http/protocol/http11.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2017-2023, by Samuel Williams.
# Copyright, 2017-2024, by Samuel Williams.
# Copyright, 2018, by Janko Marohnić.

require_relative 'http1'
Expand All @@ -21,13 +21,13 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Server.new(stream, VERSION)
end
Expand Down
10 changes: 5 additions & 5 deletions lib/async/http/protocol/http2.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2023, by Samuel Williams.
# Copyright, 2018-2024, by Samuel Williams.

require_relative 'http2/client'
require_relative 'http2/server'

require 'io/stream/buffered'

module Async
module HTTP
module Protocol
Expand Down Expand Up @@ -35,8 +37,7 @@ def self.trailer?
}

def self.client(peer, settings = CLIENT_SETTINGS)
stream = IO::Stream.new(peer, sync: true)

stream = ::IO::Stream::Buffered.wrap(peer)
client = Client.new(stream)

client.send_connection_preface(settings)
Expand All @@ -46,8 +47,7 @@ def self.client(peer, settings = CLIENT_SETTINGS)
end

def self.server(peer, settings = SERVER_SETTINGS)
stream = IO::Stream.new(peer, sync: true)

stream = ::IO::Stream::Buffered.wrap(peer)
server = Server.new(stream)

server.read_connection_preface(settings)
Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@stream.connected?
@stream&.readable?
end

def reusable?
Expand Down
2 changes: 1 addition & 1 deletion test/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def aftrer
end

it "returns an io socket" do
expect(io).to be_a(Async::IO::Socket)
expect(io).to be_a(::Socket)
expect(io.read).to be == data
end

Expand Down
22 changes: 10 additions & 12 deletions test/async/http/proxy.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.
# Copyright, 2019-2024, by Samuel Williams.
# Copyright, 2020, by Sam Shadwell.

require 'async'
Expand Down Expand Up @@ -86,12 +86,10 @@
expect(proxy.client.pool).to be(:empty?)

proxy.connect do |peer|
stream = Async::IO::Stream.new(peer)
peer.write(data)
peer.close_write

stream.write(data)
stream.close_write

expect(stream.read).to be == data
expect(peer.read).to be == data
end

proxy.close
Expand All @@ -102,14 +100,14 @@
proxy = Async::HTTP::Proxy.tcp(client, "localhost", 1)
expect(proxy.client.pool).to be(:empty?)

stream = Async::IO::Stream.new(proxy.connect)
peer = proxy.connect

stream.write(data)
stream.close_write
peer.write(data)
peer.close_write

expect(stream.read).to be == data
expect(peer.read).to be == data

stream.close
peer.close
proxy.close

expect(proxy.client.pool).to be(:empty?)
Expand All @@ -131,7 +129,7 @@
Console.logger.debug(self) {"Making connection to #{endpoint}..."}

Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
upstream = Async::IO::Stream.new(endpoint.connect)
upstream = ::IO::Stream::Buffered.wrap(endpoint.connect)
Console.logger.debug(self) {"Connected to #{upstream}..."}

reader = Async do |task|
Expand Down

0 comments on commit fb578ea

Please sign in to comment.