Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eio_linux: add support for integration with Async #221

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ module Effect = Eio.Private.Effect
module Private = struct
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty

module Async = struct
type t = {
lock : unit -> unit;
unlock : unit -> unit;
}
end

type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t
| Await_writable : Unix.file_descr -> unit Effect.t
| Get_system_clock : Eio.Time.clock Effect.t
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> < Eio.Flow.two_way; Eio.Flow.close > Effect.t
| Set_async_integration : Async.t option -> unit Effect.t
end

let await_readable fd = Effect.perform (Private.Await_readable fd)
Expand Down
9 changes: 9 additions & 0 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,21 @@ module Private : sig
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty
(** See {!FD}. *)

(** For Async_eio integration, we need to give Eio access to Async's lock. *)
module Async : sig
type t = {
lock : unit -> unit;
unlock : unit -> unit;
}
end

type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *)
| Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *)
| Get_system_clock : Eio.Time.clock Effect.t (** See {!sleep} *)
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr ->
< Eio.Flow.two_way; Eio.Flow.close > Effect.t (** See {!FD.as_socket} *)
| Set_async_integration : Async.t option -> unit Effect.t
end

module Ctf = Ctf_unix
31 changes: 29 additions & 2 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ type t = {

sleep_q: Zzz.t;
mutable io_jobs: int;

(* Optional integration with the Async library. *)
mutable async : Eio_unix.Private.Async.t option;
}

let wake_buffer =
Expand Down Expand Up @@ -476,7 +479,15 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Ctf.(note_hiatus Wait_for_work);
let result = Uring.wait ?timeout uring in
let result =
match st.async with
| None -> Uring.wait ?timeout uring
| Some async ->
async.unlock ();
let r = Uring.wait ?timeout uring in
async.lock ();
r
in
Ctf.note_resume system_thread;
Atomic.set st.need_wakeup false;
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
Expand Down Expand Up @@ -1200,7 +1211,19 @@ let rec run ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fal
let io_q = Queue.create () in
let mem_q = Queue.create () in
let eventfd = FD.placeholder ~seekable:false ~close_unix:false in
let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; io_jobs = 0 } in
let st = {
mem;
uring;
run_q;
eventfd_mutex;
io_q;
mem_q;
eventfd;
need_wakeup = Atomic.make false;
sleep_q;
io_jobs = 0;
async = None;
} in
Log.debug (fun l -> l "starting main thread");
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Expand Down Expand Up @@ -1294,6 +1317,10 @@ let rec run ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fal
let fd = FD.of_unix ~sw ~seekable:false ~close_unix fd in
continue k (flow fd :> < Eio.Flow.two_way; Eio.Flow.close >)
)
| Eio_unix.Private.Set_async_integration async -> Some (fun k ->
st.async <- async;
continue k ()
)
| Low_level.Alloc -> Some (fun k ->
match st.mem with
| None -> continue k None
Expand Down