Skip to content

Commit

Permalink
Improved compatibility with async-http/async-io. (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Jan 1, 2024
1 parent 1f78c43 commit 6140678
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 38 deletions.
27 changes: 27 additions & 0 deletions lib/io/connected.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
require 'socket'

class IO
def connected?
!closed?
end
end

class Socket
def connected?
# Is it likely that the socket is still connected?
# May return false positive, but won't return false negative.
return false unless super

# If we can wait for the socket to become readable, we know that the socket may still be open.
result = to_io.recv_nonblock(1, MSG_PEEK, exception: false)

# No data was available - newer Ruby can return nil instead of empty string:
return false if result.nil?

# Either there was some data available, or we can wait to see if there is data avaialble.
return !result.empty? || result == :wait_readable
rescue Errno::ECONNRESET
# This might be thrown by recv_nonblock.
return false
end
end
6 changes: 3 additions & 3 deletions lib/io/endpoint/composite_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ def each(&block)
end
end

def connect(&block)
def connect(wrapper = Wrapper.default, &block)
last_error = nil

@endpoints.each do |endpoint|
begin
return endpoint.connect(&block)
return endpoint.connect(wrapper, &block)
rescue => last_error
end
end

raise last_error
end

def bind(&block)
def bind(wrapper = Wrapper.default, &block)
if block_given?
@endpoints.each do |endpoint|
endpoint.bind(&block)
Expand Down
16 changes: 4 additions & 12 deletions lib/io/endpoint/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,13 @@ def each

# Accept connections from the specified endpoint.
# @param backlog [Integer] the number of connections to listen for.
def accept(backlog: Socket::SOMAXCONN, &block)
bind do |server|
server.listen(backlog) if backlog

while true
socket, address = server.accept

Fiber.schedule do
yield socket, address
end
end
def accept(wrapper = Wrapper.default, *arguments, **options, &block)
bind(wrapper, *arguments, **options) do |server|
wrapper.accept(server, &block)
end
end

# Create an Endpoint instance by URI scheme. The host and port of the URI will be passed to the Endpoint factory method, along with any options.\
# Create an Endpoint instance by URI scheme. The host and port of the URI will be passed to the Endpoint factory method, along with any options.
#
# You should not use untrusted input as it may execute arbitrary code.
#
Expand Down
20 changes: 10 additions & 10 deletions lib/io/endpoint/shared_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **opt

# Create a new `SharedEndpoint` by connecting to the given endpoint.
def self.connected(endpoint, close_on_exec: false)
wrapper = endpoint.connect
socket = endpoint.connect

wrapper.close_on_exec = close_on_exec
socket.close_on_exec = close_on_exec

return self.new(endpoint, [wrapper])
return self.new(endpoint, [socket])
end

def initialize(endpoint, sockets, **options)
Expand All @@ -53,18 +53,18 @@ def initialize(endpoint, sockets, **options)

def local_address_endpoint(**options)
endpoints = @sockets.map do |wrapper|
AddressEndpoint.new(wrapper.to_io.local_address)
AddressEndpoint.new(wrapper.to_io.local_address, **options)
end

return CompositeEndpoint.new(endpoints, **options)
return CompositeEndpoint.new(endpoints)
end

def remote_address_endpoint(**options)
endpoints = @sockets.map do |wrapper|
AddressEndpoint.new(wrapper.to_io.remote_address)
AddressEndpoint.new(wrapper.to_io.remote_address, **options)
end

return CompositeEndpoint.new(endpoints, **options)
return CompositeEndpoint.new(endpoints)
end

# Close all the internal sockets.
Expand Down Expand Up @@ -113,9 +113,9 @@ def connect(wrapper = Wrapper.default, &block)
end
end

def accept(**options, &block)
bind do |server|
server.accept(&block)
def accept(wrapper = Wrapper.default, &block)
bind(wrapper) do |server|
wrapper.accept(server, &block)
end
end

Expand Down
12 changes: 6 additions & 6 deletions lib/io/endpoint/ssl_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ def context
# Connect to the underlying endpoint and establish a SSL connection.
# @yield [Socket] the socket which is being connected
# @return [Socket] the connected socket
def bind
def bind(*arguments, **options, &block)
if block_given?
@endpoint.bind do |server|
yield ::OpenSSL::SSL::SSLServer.new(server, context)
@endpoint.bind(*arguments, **options) do |server|
yield ::OpenSSL::SSL::SSLServer.new(server, self.context)
end
else
@endpoint.bind.map do |server|
::OpenSSL::SSL::SSLServer.new(server, context)
@endpoint.bind(*arguments, **options).map do |server|
::OpenSSL::SSL::SSLServer.new(server, self.context)
end
end
end
Expand All @@ -107,7 +107,7 @@ def bind
# @yield [Socket] the socket which is being connected
# @return [Socket] the connected socket
def connect(&block)
socket = ::OpenSSL::SSL::SSLSocket.new(@endpoint.connect, context)
socket = ::OpenSSL::SSL::SSLSocket.new(@endpoint.connect, self.context)

if hostname = self.hostname
socket.hostname = hostname
Expand Down
13 changes: 6 additions & 7 deletions lib/io/endpoint/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def set_buffered(socket, buffered)
# On Darwin, sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is enabled despite this result.
rescue Errno::EOPNOTSUPP
# Some platforms may simply not support the operation.
# Console.logger.warn(self) {"Unable to set sync=#{value}!"}
rescue Errno::ENOPROTOOPT
# It may not be supported by the protocol (e.g. UDP). ¯\_(ツ)_/¯
end
Expand Down Expand Up @@ -131,14 +130,14 @@ def bind(local_address, protocol: 0, **options, &block)
end

# Bind to a local address and accept connections in a loop.
def accept(*arguments, backlog: SOMAXCONN, **options, &block)
bind(*arguments, **options) do |server|
server.listen(backlog) if backlog
def accept(server, timeout: server.timeout, &block)
while true
socket, address = server.accept

socket.timeout = timeout if timeout != false

async do
while true
server.accept(&block)
end
yield socket, address
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions test/io/endpoint.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require 'io/endpoint'

describe IO::Endpoint do
with ".file_descriptor_limit" do
it "has a file descriptor limit" do
expect(IO::Endpoint.file_descriptor_limit).to be_a Integer
end
end
end
10 changes: 10 additions & 0 deletions test/io/endpoint/socket_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
let(:path) {File.join(temporary_directory, "test.ipc")}
let(:internal_endpoint) {IO::Endpoint::UNIXEndpoint.new(path)}

with "#to_s" do
it "can convert to string" do
internal_endpoint.bind do |internal_socket|
endpoint = subject.new(internal_socket)

expect(endpoint.to_s).to be_a(String)
end
end
end

it "can bind to address" do
internal_endpoint.bind do |internal_socket|
endpoint = subject.new(internal_socket)
Expand Down

0 comments on commit 6140678

Please sign in to comment.