Skip to content

Commit

Permalink
Refactor IO::Syscall as IO::Evented (#7505)
Browse files Browse the repository at this point in the history
Evented is closer to what the module is trying to achieve (abstract
event loop details) than Syscall. Reuses the method namings from
IO::Buffered (e.g. `evented_read` instead of `read_syscall_helper`).

Avoids some duplication found in IO::FileDescriptor (system/unix)
and Socket.

Isolates implementation details back into IO::Evented, instead of
having direct accesses to internal implementation details (such as
`@writers`) of IO::Evented.
  • Loading branch information
ysbaddaden authored and straight-shoota committed Mar 15, 2019
1 parent d0dc06b commit f9fde1a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 151 deletions.
48 changes: 10 additions & 38 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,32 @@ module Crystal::EventLoop
end
end

def self.create_fd_write_event(io : IO::FileDescriptor, edge_triggered : Bool = false)
def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(io.fd, flags, io) do |s, flags, data|
fd_io = data.as(IO::FileDescriptor)
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.resume_write(timed_out: true)
end
end
event
end

def self.create_fd_write_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Write)
sock_ref.resume_write
io_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_write(timed_out: true)
io_ref.resume_write(timed_out: true)
end
end
event
end

def self.create_fd_read_event(io : IO::FileDescriptor, edge_triggered : Bool = false)
def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(io.fd, flags, io) do |s, flags, data|
fd_io = data.as(IO::FileDescriptor)
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.resume_read(timed_out: true)
end
end
event
end

def self.create_fd_read_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Read)
sock_ref.resume_read
io_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_read(timed_out: true)
io_ref.resume_read(timed_out: true)
end
end
event
end

private def self.dns_base
Expand Down
40 changes: 7 additions & 33 deletions src/crystal/system/unix/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
require "c/fcntl"
require "io/evented"

# :nodoc:
module Crystal::System::FileDescriptor
include IO::Syscall
include IO::Evented

@fd : Int32

@read_event : Crystal::Event?
@write_event : Crystal::Event?

private def unbuffered_read(slice : Bytes)
read_syscall_helper(slice, "Error reading file") do
# `to_i32` is acceptable because `Slice#size` is a Int32
LibC.read(@fd, slice, slice.size).to_i32
evented_read(slice, "Error reading file") do
LibC.read(@fd, slice, slice.size)
end
end

private def unbuffered_write(slice : Bytes)
write_syscall_helper(slice, "Error writing file") do |slice|
evented_write(slice, "Error writing file") do |slice|
LibC.write(@fd, slice, slice.size).tap do |return_code|
if return_code == -1 && Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
Expand Down Expand Up @@ -109,25 +106,7 @@ module Crystal::System::FileDescriptor
# Mark the handle open, since we had to have dup'd a live handle.
@closed = false

# We are now pointing to a new file descriptor, we need to re-register
# events with libevent and enqueue readers and writers again.
@read_event.try &.free
@read_event = nil

@write_event.try &.free
@write_event = nil

reschedule_waiting
end

private def add_read_event(timeout = @read_timeout) : Nil
event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self)
event.add timeout
end

private def add_write_event(timeout = @write_timeout) : Nil
event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self)
event.add timeout
evented_reopen
end

private def system_close
Expand All @@ -140,12 +119,7 @@ module Crystal::System::FileDescriptor
end
end
ensure
@read_event.try &.free
@read_event = nil
@write_event.try &.free
@write_event = nil

reschedule_waiting
evented_close
end

def self.pipe(read_blocking, write_blocking)
Expand Down
79 changes: 54 additions & 25 deletions src/io/syscall.cr → src/io/evented.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% skip_file if flag?(:win32) %}

module IO::Syscall
module IO::Evented
@read_timed_out = false
@write_timed_out = false

Expand All @@ -10,6 +10,9 @@ module IO::Syscall
@readers : Deque(Fiber)?
@writers : Deque(Fiber)?

@read_event : Crystal::Event?
@write_event : Crystal::Event?

# Returns the time to wait when reading before raising an `IO::Timeout`.
def read_timeout : Time::Span?
@read_timeout
Expand Down Expand Up @@ -42,11 +45,12 @@ module IO::Syscall
write_timeout
end

def read_syscall_helper(slice : Bytes, errno_msg : String) : Int32
def evented_read(slice : Bytes, errno_msg : String) : Int32
loop do
bytes_read = yield
bytes_read = yield slice
if bytes_read != -1
return bytes_read
# `to_i32` is acceptable because `Slice#size` is an Int32
return bytes_read.to_i32
end

if Errno.value == Errno::EAGAIN
Expand All @@ -56,12 +60,10 @@ module IO::Syscall
end
end
ensure
if (readers = @readers) && !readers.empty?
add_read_event
end
resume_pending_readers
end

def write_syscall_helper(slice : Bytes, errno_msg : String) : Nil
def evented_write(slice : Bytes, errno_msg : String) : Nil
return if slice.empty?

begin
Expand All @@ -79,12 +81,19 @@ module IO::Syscall
end
end
ensure
if (writers = @writers) && !writers.empty?
add_write_event
end
resume_pending_writers
end
end

def evented_send(slice : Bytes, errno_msg : String) : Int32
bytes_written = yield slice
raise Errno.new(errno_msg) if bytes_written == -1
# `to_i32` is acceptable because `Slice#size` is an Int32
bytes_written.to_i32
ensure
resume_pending_writers
end

# :nodoc:
def resume_read(timed_out = false)
@read_timed_out = timed_out
Expand All @@ -103,13 +112,11 @@ module IO::Syscall
end
end

# :nodoc:
def wait_readable(timeout = @read_timeout)
protected def wait_readable(timeout = @read_timeout)
wait_readable(timeout: timeout) { |err| raise err }
end

# :nodoc:
def wait_readable(timeout = @read_timeout)
protected def wait_readable(timeout = @read_timeout) : Nil
readers = (@readers ||= Deque(Fiber).new)
readers << Fiber.current
add_read_event(timeout)
Expand All @@ -119,19 +126,18 @@ module IO::Syscall
@read_timed_out = false
yield Timeout.new("Read timed out")
end

nil
end

private abstract def add_read_event(timeout = @read_timeout)
private def add_read_event(timeout = @read_timeout) : Nil
event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self)
event.add timeout
end

# :nodoc:
def wait_writable(timeout = @write_timeout)
protected def wait_writable(timeout = @write_timeout)
wait_writable(timeout: timeout) { |err| raise err }
end

# :nodoc:
def wait_writable(timeout = @write_timeout)
protected def wait_writable(timeout = @write_timeout) : Nil
writers = (@writers ||= Deque(Fiber).new)
writers << Fiber.current
add_write_event(timeout)
Expand All @@ -141,13 +147,24 @@ module IO::Syscall
@write_timed_out = false
yield Timeout.new("Write timed out")
end
end

private def add_write_event(timeout = @write_timeout) : Nil
event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self)
event.add timeout
end

nil
def evented_reopen
evented_close
end

private abstract def add_write_event(timeout = @write_timeout)
def evented_close
@read_event.try &.free
@read_event = nil

@write_event.try &.free
@write_event = nil

private def reschedule_waiting
if readers = @readers
Crystal::Scheduler.enqueue readers
readers.clear
Expand All @@ -158,4 +175,16 @@ module IO::Syscall
writers.clear
end
end

private def resume_pending_readers
if (readers = @readers) && !readers.empty?
add_read_event
end
end

private def resume_pending_writers
if (writers = @writers) && !writers.empty?
add_write_event
end
end
end
1 change: 0 additions & 1 deletion src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require "./syscall"
require "crystal/system/file_descriptor"

# An `IO` over a file descriptor.
Expand Down
Loading

0 comments on commit f9fde1a

Please sign in to comment.