Skip to content

Commit

Permalink
Support io-endpoint gem.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 26, 2024
1 parent f681cd4 commit 404771c
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 87 deletions.
2 changes: 1 addition & 1 deletion async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.0"

spec.add_dependency "async", ">= 1.25"
spec.add_dependency "async-io", ">= 1.28"
spec.add_dependency "async-pool", ">= 0.2"
spec.add_dependency "io-endpoint", "~> 0.6"
spec.add_dependency "protocol-http", "~> 0.26.0"
spec.add_dependency "protocol-http1", "~> 0.18.0"
spec.add_dependency "protocol-http2", "~> 0.16.0"
Expand Down
2 changes: 0 additions & 2 deletions examples/google/codeotaku.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
URL = "https://www.codeotaku.com/index"
ENDPOINT = Async::HTTP::Endpoint.parse(URL)

Console.logger.enable(Async::IO::Stream, Console::Logger::DEBUG)

if count = ENV['COUNT']&.to_i
terms = terms.first(count)
end
Expand Down
2 changes: 0 additions & 2 deletions examples/google/search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
URL = "https://www.google.com/search"
ENDPOINT = Async::HTTP::Endpoint.parse(URL)

# Console.logger.enable(Async::IO::Stream, Console::Logger::DEBUG)

class Google < Protocol::HTTP::Middleware
def search(term)
Console.logger.info(self) {"Searching for #{term}..."}
Expand Down
44 changes: 27 additions & 17 deletions fixtures/async/http/a_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@ def after

it "can't get /" do
expect do
client.get("/")
end.to raise_exception(Async::TimeoutError)
Console.debug(self) {"Connecting to #{endpoint.inspect}"}
response = client.get("/")
Console.debug(self) {"Got response #{response.inspect}"}
end.to raise_exception(::IO::TimeoutError)
end
end

Expand Down Expand Up @@ -542,33 +544,41 @@ def around
Console.logger.level = current
end

def timeout = nil

it "doesn't cancel all requests" do
tasks = []
task = Async::Task.current
tasks = []
stopped = []

10.times do
tasks << task.async {
begin
loop do
client.get('http://127.0.0.1:8080/a').finish
end
task.async do |child|
tasks << child

loop do
response = client.get('/a')
response.finish
ensure
stopped << 'a'
response&.close
end
}
ensure
stopped << 'a'
end
end

10.times do
tasks << task.async {
begin
loop do
client.get('http://127.0.0.1:8080/b').finish
end
task.async do |child|
tasks << child

loop do
response = client.get('/b')
response.finish
ensure
stopped << 'b'
response&.close
end
}
ensure
stopped << 'b'
end
end

tasks.each do |child|
Expand Down
5 changes: 3 additions & 2 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
gemspec

# gem "async", path: "../async"
# gem "async-io", path: "../async-io"
# gem "traces", path: "../traces"
gem "io-endpoint", path: "../io-endpoint"
# gem "sus-fixtures-async-http", path: "../sus-fixtures-async-http"

# gem "protocol-http", path: "../protocol-http"
# gem "protocol-http1", path: "../protocol-http1"
Expand All @@ -28,7 +29,7 @@
gem "covered"
gem "sus"
gem "sus-fixtures-async"
gem "sus-fixtures-async-http", "~> 0.7"
gem "sus-fixtures-async-http", "~> 0.8"
gem "sus-fixtures-openssl"

gem "bake"
Expand Down
7 changes: 2 additions & 5 deletions lib/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ def initialize(input, output = Writable.new, task: Task.current)
@input = input
@output = output

head, tail = IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)

@head = IO::Stream.new(head)
@tail = tail
@head, @tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)

@reader = nil
@writer = nil
Expand Down Expand Up @@ -68,7 +65,7 @@ def writer(task)

task.annotate "#{self.class} writer."

while chunk = @head.read_partial
while chunk = @head.readpartial(1024)
@output.write(chunk)
end
ensure
Expand Down
5 changes: 3 additions & 2 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def call(request)

# This signals that the ensure block below should not try to release the connection, because it's bound into the response which will be returned:
connection = nil

return response
rescue Protocol::RequestFailed
# This is a specific case where the entire request wasn't sent before a failure occurred. So, we can even resend non-idempotent requests.
Expand All @@ -119,7 +118,9 @@ def call(request)
raise
end
ensure
@pool.release(connection) if connection
if connection
@pool.release(connection)
end
end
end

Expand Down
14 changes: 7 additions & 7 deletions lib/async/http/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
# Copyright, 2019-2023, by Samuel Williams.
# Copyright, 2021-2022, by Adam Daniels.

require 'async/io/host_endpoint'
require 'async/io/ssl_endpoint'
require 'async/io/ssl_socket'
require 'async/io/shared_endpoint'
require 'io/endpoint'
require 'io/endpoint/host_endpoint'
require 'io/endpoint/ssl_endpoint'
require 'io/endpoint/shared_endpoint'

require_relative 'protocol/http1'
require_relative 'protocol/https'

module Async
module HTTP
# Represents a way to connect to a remote HTTP server.
class Endpoint < Async::IO::Endpoint
class Endpoint < ::IO::Endpoint::Generic
def self.parse(string, endpoint = nil, **options)
url = URI.parse(string).normalize

Expand Down Expand Up @@ -164,7 +164,7 @@ def build_endpoint(endpoint = nil)

if secure?
# Wrap it in SSL:
return Async::IO::SSLEndpoint.new(endpoint,
return ::IO::Endpoint::SSLEndpoint.new(endpoint,
ssl_context: self.ssl_context,
hostname: @url.hostname,
timeout: self.timeout,
Expand Down Expand Up @@ -226,7 +226,7 @@ def tcp_options
end

def tcp_endpoint
Async::IO::Endpoint.tcp(self.hostname, port, **tcp_options)
::IO::Endpoint.tcp(self.hostname, port, **tcp_options)
end
end
end
Expand Down
8 changes: 2 additions & 6 deletions lib/async/http/protocol/http1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Client.new(stream, VERSION)
return HTTP1::Client.new(peer, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Server.new(stream, VERSION)
return HTTP1::Server.new(peer, VERSION)
end

def self.names
Expand Down
12 changes: 8 additions & 4 deletions lib/async/http/protocol/http1/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require_relative 'request'
require_relative 'response'

require 'io/connected'

module Async
module HTTP
module Protocol
Expand All @@ -31,15 +33,17 @@ def http2?
end

def read_line?
@stream.read_until(CRLF)
@stream.gets(CRLF, chomp: true)
rescue Errno::ECONNRESET
nil
end

def read_line
@stream.read_until(CRLF) or raise EOFError, "Could not read line!"
read_line? or raise EOFError, "Could not read line!"
end

def peer
@stream.io
@stream
end

attr :count
Expand All @@ -50,7 +54,7 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@ready && @stream&.connected?
@ready && @stream.connected?
end

def reusable?
Expand Down
2 changes: 2 additions & 0 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def fail_request(status)
end

def next_request
Console.debug(self, "Reading request...", persistent: @persistent)

# The default is true.
return unless @persistent

Expand Down
8 changes: 2 additions & 6 deletions lib/async/http/protocol/http10.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Client.new(stream, VERSION)
return HTTP1::Client.new(peer, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Server.new(stream, VERSION)
return HTTP1::Server.new(peer, VERSION)
end

def self.names
Expand Down
8 changes: 2 additions & 6 deletions lib/async/http/protocol/http11.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@ def self.trailer?
end

def self.client(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Client.new(stream, VERSION)
return HTTP1::Client.new(peer, VERSION)
end

def self.server(peer)
stream = IO::Stream.new(peer, sync: true)

return HTTP1::Server.new(stream, VERSION)
return HTTP1::Server.new(peer, VERSION)
end

def self.names
Expand Down
8 changes: 4 additions & 4 deletions lib/async/http/protocol/http2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def self.trailer?
}

def self.client(peer, settings = CLIENT_SETTINGS)
stream = IO::Stream.new(peer, sync: true)
peer.sync = true

client = Client.new(stream)
client = Client.new(peer)

client.send_connection_preface(settings)
client.start_connection
Expand All @@ -46,9 +46,9 @@ def self.client(peer, settings = CLIENT_SETTINGS)
end

def self.server(peer, settings = SERVER_SETTINGS)
stream = IO::Stream.new(peer, sync: true)
peer.sync = true

server = Server.new(stream)
server = Server.new(peer)

server.read_connection_preface(settings)
server.start_connection
Expand Down
8 changes: 6 additions & 2 deletions lib/async/http/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

require 'async/semaphore'

require 'io/connected'

module Async
module HTTP
module Protocol
Expand Down Expand Up @@ -92,10 +94,12 @@ def read_in_background(parent: Task.current)
begin
while !self.closed?
self.consume_window
Console.debug(self, @framer) {"Reading frame..."}
self.read_frame
end
rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, Async::Wrapper::Cancelled
rescue SocketError, EOFError, Errno::ECONNRESET, Errno::EPIPE, Async::Wrapper::Cancelled => error
# Ignore.
Console.warn(self, error)
rescue ::Protocol::HTTP2::GoawayError => error
# Error is raised if a response is actively reading from the
# connection. The connection is silently closed if GOAWAY is
Expand All @@ -115,7 +119,7 @@ def read_in_background(parent: Task.current)
attr :promises

def peer
@stream.io
@stream
end

attr :count
Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def accept(peer, address, task: Task.current)
ensure
connection&.close
end

def run
@endpoint.accept(&self.method(:accept))
end
Expand Down
5 changes: 3 additions & 2 deletions test/async/http/body.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require 'sus/fixtures/openssl'
require 'sus/fixtures/async/http'
require 'localhost/authority'
require 'io/endpoint/ssl_endpoint'

ABody = Sus::Shared("a body") do
with 'echo server' do
Expand Down Expand Up @@ -97,11 +98,11 @@
let(:client_context) {authority.client_context}

def make_server_endpoint(bound_endpoint)
Async::IO::SSLEndpoint.new(super, ssl_context: server_context)
::IO::Endpoint::SSLEndpoint.new(super, ssl_context: server_context)
end

def make_client_endpoint(bound_endpoint)
Async::IO::SSLEndpoint.new(super, ssl_context: client_context)
::IO::Endpoint::SSLEndpoint.new(super, ssl_context: client_context)
end

it_behaves_like ABody
Expand Down
2 changes: 1 addition & 1 deletion test/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def aftrer
end

it "returns an io socket" do
expect(io).to be_a(Async::IO::Socket)
expect(io).to be_a(::Socket)
expect(io.read).to be == data
end

Expand Down
Loading

0 comments on commit 404771c

Please sign in to comment.