diff --git a/src/driver/accept.rs b/src/driver/accept.rs index 0dc2d505..8f5db5e3 100644 --- a/src/driver/accept.rs +++ b/src/driver/accept.rs @@ -1,4 +1,4 @@ -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::driver::{Op, SharedFd, Socket}; use std::net::SocketAddr; use std::{boxed::Box, io}; @@ -10,27 +10,34 @@ pub(crate) struct Accept { impl Op { pub(crate) fn accept(fd: &SharedFd) -> io::Result> { - use io_uring::{opcode, types}; - let socketaddr = Box::new(( unsafe { std::mem::zeroed() }, std::mem::size_of::() as libc::socklen_t, )); - Op::submit_with( - Accept { - fd: fd.clone(), - socketaddr, - }, - |accept| { - opcode::Accept::new( - types::Fd(accept.fd.raw_fd()), - &mut accept.socketaddr.0 as *mut _ as *mut _, - &mut accept.socketaddr.1, - ) - .flags(libc::O_CLOEXEC) - .build() - }, + Accept { + fd: fd.clone(), + socketaddr, + } + .submit() + } +} + +impl Buildable for Accept +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + opcode::Accept::new( + types::Fd(self.fd.raw_fd()), + &mut self.socketaddr.0 as *mut _ as *mut _, + &mut self.socketaddr.1, ) + .flags(libc::O_CLOEXEC) + .build() } } diff --git a/src/driver/close.rs b/src/driver/close.rs index 8b942c2a..9f772023 100644 --- a/src/driver/close.rs +++ b/src/driver/close.rs @@ -1,6 +1,6 @@ use crate::driver::Op; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use std::io; use std::os::unix::io::RawFd; @@ -10,11 +10,19 @@ pub(crate) struct Close { impl Op { pub(crate) fn close(fd: RawFd) -> io::Result> { - use io_uring::{opcode, types}; + Close { fd }.submit() + } +} - Op::submit_with(Close { fd }, |close| { - opcode::Close::new(types::Fd(close.fd)).build() - }) +impl Buildable for Close +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + opcode::Close::new(types::Fd(self.fd)).build() } } diff --git a/src/driver/connect.rs b/src/driver/connect.rs index 07270c09..d16d586d 100644 --- a/src/driver/connect.rs +++ b/src/driver/connect.rs @@ -1,4 +1,4 @@ -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::driver::{Op, SharedFd}; use socket2::SockAddr; use std::io; @@ -14,22 +14,29 @@ pub(crate) struct Connect { impl Op { /// Submit a request to connect. pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result> { + Connect { + fd: fd.clone(), + socket_addr: Box::new(socket_addr), + } + .submit() + } +} + +impl Buildable for Connect +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { use io_uring::{opcode, types}; - Op::submit_with( - Connect { - fd: fd.clone(), - socket_addr: Box::new(socket_addr), - }, - |connect| { - opcode::Connect::new( - types::Fd(connect.fd.raw_fd()), - connect.socket_addr.as_ptr(), - connect.socket_addr.len(), - ) - .build() - }, + opcode::Connect::new( + types::Fd(self.fd.raw_fd()), + self.socket_addr.as_ptr(), + self.socket_addr.len(), ) + .build() } } diff --git a/src/driver/fsync.rs b/src/driver/fsync.rs index 4e0faf80..5485d5d4 100644 --- a/src/driver/fsync.rs +++ b/src/driver/fsync.rs @@ -2,26 +2,46 @@ use crate::driver::{Op, SharedFd}; use std::io; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use io_uring::{opcode, types}; pub(crate) struct Fsync { fd: SharedFd, + flags: Option, } impl Op { pub(crate) fn fsync(fd: &SharedFd) -> io::Result> { - Op::submit_with(Fsync { fd: fd.clone() }, |fsync| { - opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build() - }) + Fsync { + fd: fd.clone(), + flags: None, + } + .submit() } pub(crate) fn datasync(fd: &SharedFd) -> io::Result> { - Op::submit_with(Fsync { fd: fd.clone() }, |fsync| { - opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())) - .flags(types::FsyncFlags::DATASYNC) - .build() - }) + Fsync { + fd: fd.clone(), + flags: Some(types::FsyncFlags::DATASYNC), + } + .submit() + } +} + +impl Buildable for Fsync +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + let opcode = opcode::Fsync::new(types::Fd(self.fd.raw_fd())); + + if let Some(flags) = self.flags { + opcode.flags(flags).build() + } else { + opcode.build() + } } } diff --git a/src/driver/noop.rs b/src/driver/noop.rs index 0d1cdb98..a088e234 100644 --- a/src/driver/noop.rs +++ b/src/driver/noop.rs @@ -1,5 +1,5 @@ use crate::driver::{ - op::{self, Completable}, + op::{self, Buildable, Completable}, Op, }; use std::io; @@ -11,9 +11,18 @@ pub struct NoOp {} impl Op { pub fn no_op() -> io::Result> { - use io_uring::opcode; + NoOp {}.submit() + } +} + +impl Buildable for NoOp +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; - Op::submit_with(NoOp {}, |_| opcode::Nop::new().build()) + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + io_uring::opcode::Nop::new().build() } } diff --git a/src/driver/op.rs b/src/driver/op.rs index 24603c81..97892598 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -11,7 +11,6 @@ mod slab_list; use slab::Slab; use slab_list::{SlabListEntry, SlabListIndices}; -use crate::driver; use crate::runtime::CONTEXT; use crate::util::PhantomUnsendUnsync; @@ -88,30 +87,39 @@ where T: Completable, { /// Create a new operation - fn new(data: T, inner: &mut driver::Driver) -> Self { + fn new(data: T, index: usize) -> Self { Op { - index: inner.ops.insert(), + index, data: Some(data), _cqe_type: PhantomData, _phantom: PhantomData, } } +} + +pub(crate) trait Buildable +where + Self: 'static + Sized + Completable, +{ + // The CqeType type which results from Submission + type CqeType; + + fn create_sqe(&mut self) -> squeue::Entry; - /// Submit an operation to uring. + /// Build an Operation, and submit to uring. /// /// `state` is stored during the operation tracking any state submitted to /// the kernel. - pub(super) fn submit_with(data: T, f: F) -> io::Result - where - F: FnOnce(&mut T) -> squeue::Entry, - { + fn submit(mut self) -> io::Result::CqeType>> { CONTEXT.with(|cx| { cx.with_driver_mut(|driver| { - // Create the operation - let mut op = Op::new(data, driver); + let index = driver.ops.insert(); // Configure the SQE - let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _); + let sqe = self.create_sqe().user_data(index as _); + + // Create the operation + let op = Op::new(self, index); // Push the new operation while unsafe { driver.uring.submission().push(&sqe).is_err() } { @@ -123,6 +131,12 @@ where }) }) } + + // Submit an operation, linking it to the following entry + // fn link(self, s: S) -> Link::CqeType>, S> { + // let t = self.submit(); + // Link {t,s} + // } } impl Future for Op @@ -430,7 +444,10 @@ mod test { let op = CONTEXT.with(|cx| { cx.set_driver(driver); - cx.with_driver_mut(|driver| Op::new(data.clone(), driver)) + cx.with_driver_mut(|driver| { + let index = driver.ops.insert(); + Op::new(data.clone(), index) + }) }); (op, data) diff --git a/src/driver/open.rs b/src/driver/open.rs index 0d4cc8db..fdb31c6b 100644 --- a/src/driver/open.rs +++ b/src/driver/open.rs @@ -1,7 +1,7 @@ use crate::driver::{self, Op, SharedFd}; use crate::fs::{File, OpenOptions}; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -11,29 +11,45 @@ use std::path::Path; pub(crate) struct Open { pub(crate) path: CString, pub(crate) flags: libc::c_int, + options: OpenOptions, } impl Op { /// Submit a request to open a file. pub(crate) fn open(path: &Path, options: &OpenOptions) -> io::Result> { - use io_uring::{opcode, types}; let path = driver::util::cstr(path)?; let flags = libc::O_CLOEXEC | options.access_mode()? | options.creation_mode()? | (options.custom_flags & !libc::O_ACCMODE); - Op::submit_with(Open { path, flags }, |open| { - // Get a reference to the memory. The string will be held by the - // operation state and will not be accessed again until the operation - // completes. - let p_ref = open.path.as_c_str().as_ptr(); - - opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref) - .flags(flags) - .mode(options.mode) - .build() - }) + Open { + path, + flags, + options: options.clone(), + } + .submit() + } +} + +impl Buildable for Open +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + // Get a reference to the memory. The string will be held by the + // operation state and will not be accessed again until the operation + // completes. + let p_ref = self.path.as_c_str().as_ptr(); + + opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref) + .flags(self.flags) + .mode(self.options.mode) + .build() } } diff --git a/src/driver/read.rs b/src/driver/read.rs index 0d2e7887..64333bcf 100644 --- a/src/driver/read.rs +++ b/src/driver/read.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use std::io; pub(crate) struct Read { @@ -13,26 +13,36 @@ pub(crate) struct Read { /// Reference to the in-flight buffer. pub(crate) buf: T, + + offset: u64, } impl Op> { pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { + Read { + fd: fd.clone(), + buf, + offset, + } + .submit() + } +} + +impl Buildable for Read +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { use io_uring::{opcode, types}; - Op::submit_with( - Read { - fd: fd.clone(), - buf, - }, - |read| { - // Get raw buffer info - let ptr = read.buf.stable_mut_ptr(); - let len = read.buf.bytes_total(); - opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) + // Get raw buffer info + let ptr = self.buf.stable_mut_ptr(); + let len = self.buf.bytes_total(); + opcode::Read::new(types::Fd(self.fd.raw_fd()), ptr, len as _) + .offset(self.offset as _) + .build() } } diff --git a/src/driver/readv.rs b/src/driver/readv.rs index 3921724c..0b60c7fc 100644 --- a/src/driver/readv.rs +++ b/src/driver/readv.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use libc::iovec; use std::io; @@ -16,6 +16,8 @@ pub(crate) struct Readv { pub(crate) bufs: Vec, /// Parameter for `io_uring::op::readv`, referring `bufs`. iovs: Vec, + + offset: u64, } impl Op> { @@ -24,8 +26,6 @@ impl Op> { mut bufs: Vec, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; - // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. let iovs: Vec = bufs .iter_mut() @@ -36,22 +36,32 @@ impl Op> { }) .collect(); - Op::submit_with( - Readv { - fd: fd.clone(), - bufs, - iovs, - }, - |read| { - opcode::Readv::new( - types::Fd(fd.raw_fd()), - read.iovs.as_ptr(), - read.iovs.len() as u32, - ) - .offset(offset as _) - .build() - }, + Readv { + fd: fd.clone(), + bufs, + iovs, + offset, + } + .submit() + } +} + +impl Buildable for Readv +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + opcode::Readv::new( + types::Fd(self.fd.raw_fd()), + self.iovs.as_ptr(), + self.iovs.len() as u32, ) + .offset(self.offset as _) + .build() } } diff --git a/src/driver/recv_from.rs b/src/driver/recv_from.rs index 5f2aee02..ffe89085 100644 --- a/src/driver/recv_from.rs +++ b/src/driver/recv_from.rs @@ -1,4 +1,4 @@ -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::{ buf::IoBufMut, driver::{Op, SharedFd}, @@ -21,8 +21,6 @@ pub(crate) struct RecvFrom { impl Op> { pub(crate) fn recv_from(fd: &SharedFd, mut buf: T) -> io::Result>> { - use io_uring::{opcode, types}; - let mut io_slices = vec![IoSliceMut::new(unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) })]; @@ -35,22 +33,27 @@ impl Op> { msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; msghdr.msg_namelen = socket_addr.len(); - Op::submit_with( - RecvFrom { - fd: fd.clone(), - buf, - io_slices, - socket_addr, - msghdr, - }, - |recv_from| { - opcode::RecvMsg::new( - types::Fd(recv_from.fd.raw_fd()), - recv_from.msghdr.as_mut() as *mut _, - ) - .build() - }, - ) + RecvFrom { + fd: fd.clone(), + buf, + io_slices, + socket_addr, + msghdr, + } + .submit() + } +} + +impl Buildable for RecvFrom +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), self.msghdr.as_mut() as *mut _).build() } } diff --git a/src/driver/rename_at.rs b/src/driver/rename_at.rs index c7ddfc8e..5ff27438 100644 --- a/src/driver/rename_at.rs +++ b/src/driver/rename_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -12,32 +12,42 @@ use std::path::Path; pub(crate) struct RenameAt { pub(crate) from: CString, pub(crate) to: CString, + flags: u32, } impl Op { /// Submit a request to rename a specified path to a new name with /// the provided flags. pub(crate) fn rename_at(from: &Path, to: &Path, flags: u32) -> io::Result> { - use io_uring::{opcode, types}; - let from = driver::util::cstr(from)?; let to = driver::util::cstr(to)?; - Op::submit_with(RenameAt { from, to }, |rename| { - // Get a reference to the memory. The string will be held by the - // operation state and will not be accessed again until the operation - // completes. - let from_ref = rename.from.as_c_str().as_ptr(); - let to_ref = rename.to.as_c_str().as_ptr(); - opcode::RenameAt::new( - types::Fd(libc::AT_FDCWD), - from_ref, - types::Fd(libc::AT_FDCWD), - to_ref, - ) - .flags(flags) - .build() - }) + RenameAt { from, to, flags }.submit() + } +} + +impl Buildable for RenameAt +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + // Get a reference to the memory. The string will be held by the + // operation state and will not be accessed again until the operation + // completes. + let from_ref = self.from.as_c_str().as_ptr(); + let to_ref = self.to.as_c_str().as_ptr(); + opcode::RenameAt::new( + types::Fd(libc::AT_FDCWD), + from_ref, + types::Fd(libc::AT_FDCWD), + to_ref, + ) + .flags(self.flags) + .build() } } diff --git a/src/driver/send_to.rs b/src/driver/send_to.rs index 3659942f..f080593c 100644 --- a/src/driver/send_to.rs +++ b/src/driver/send_to.rs @@ -1,5 +1,5 @@ use crate::buf::IoBuf; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::driver::{Op, SharedFd}; use crate::BufResult; use socket2::SockAddr; @@ -23,8 +23,6 @@ impl Op> { buf: T, socket_addr: SocketAddr, ) -> io::Result>> { - use io_uring::{opcode, types}; - let io_slices = vec![IoSlice::new(unsafe { std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) })]; @@ -37,22 +35,31 @@ impl Op> { msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; msghdr.msg_namelen = socket_addr.len(); - Op::submit_with( - SendTo { - fd: fd.clone(), - buf, - io_slices, - socket_addr, - msghdr, - }, - |send_to| { - opcode::SendMsg::new( - types::Fd(send_to.fd.raw_fd()), - send_to.msghdr.as_ref() as *const _, - ) - .build() - }, + SendTo { + fd: fd.clone(), + buf, + io_slices, + socket_addr, + msghdr, + } + .submit() + } +} + +impl op::Buildable for SendTo +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + opcode::SendMsg::new( + types::Fd(self.fd.raw_fd()), + self.msghdr.as_ref() as *const _, ) + .build() } } diff --git a/src/driver/unlink_at.rs b/src/driver/unlink_at.rs index fbebfb67..7c1511c9 100644 --- a/src/driver/unlink_at.rs +++ b/src/driver/unlink_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -8,6 +8,8 @@ use std::path::Path; /// Unlink a path relative to the current working directory of the caller's process. pub(crate) struct Unlink { pub(crate) path: CString, + + flags: i32, } impl Op { @@ -23,19 +25,28 @@ impl Op { /// Submit a request to unlink a specifed path with provided flags. pub(crate) fn unlink(path: &Path, flags: i32) -> io::Result> { - use io_uring::{opcode, types}; - let path = driver::util::cstr(path)?; - Op::submit_with(Unlink { path }, |unlink| { - // Get a reference to the memory. The string will be held by the - // operation state and will not be accessed again until the operation - // completes. - let p_ref = unlink.path.as_c_str().as_ptr(); - opcode::UnlinkAt::new(types::Fd(libc::AT_FDCWD), p_ref) - .flags(flags) - .build() - }) + Unlink { path, flags }.submit() + } +} + +impl op::Buildable for Unlink +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + // Get a reference to the memory. The string will be held by the + // operation state and will not be accessed again until the operation + // completes. + let p_ref = self.path.as_c_str().as_ptr(); + opcode::UnlinkAt::new(types::Fd(libc::AT_FDCWD), p_ref) + .flags(self.flags) + .build() } } diff --git a/src/driver/write.rs b/src/driver/write.rs index 6a5b708a..b080ba62 100644 --- a/src/driver/write.rs +++ b/src/driver/write.rs @@ -1,4 +1,4 @@ -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -13,27 +13,37 @@ pub(crate) struct Write { fd: SharedFd, pub(crate) buf: T, + + offset: u64, } impl Op> { pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { + Write { + fd: fd.clone(), + buf, + offset, + } + .submit() + } +} + +impl op::Buildable for Write +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { use io_uring::{opcode, types}; - Op::submit_with( - Write { - fd: fd.clone(), - buf, - }, - |write| { - // Get raw buffer info - let ptr = write.buf.stable_ptr(); - let len = write.buf.bytes_init(); - - opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) + // Get raw buffer info + let ptr = self.buf.stable_ptr(); + let len = self.buf.bytes_init(); + + opcode::Write::new(types::Fd(self.fd.raw_fd()), ptr, len as _) + .offset(self.offset as _) + .build() } } diff --git a/src/driver/writev.rs b/src/driver/writev.rs index 24954a02..115e8632 100644 --- a/src/driver/writev.rs +++ b/src/driver/writev.rs @@ -1,4 +1,4 @@ -use crate::driver::op::{self, Completable}; +use crate::driver::op::{self, Buildable, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -17,6 +17,8 @@ pub(crate) struct Writev { /// Parameter for `io_uring::op::readv`, referring `bufs`. iovs: Vec, + + offset: u64, } impl Op> { @@ -25,8 +27,6 @@ impl Op> { mut bufs: Vec, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; - // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. let iovs: Vec = bufs .iter_mut() @@ -36,22 +36,32 @@ impl Op> { }) .collect(); - Op::submit_with( - Writev { - fd: fd.clone(), - bufs, - iovs, - }, - |write| { - opcode::Writev::new( - types::Fd(fd.raw_fd()), - write.iovs.as_ptr(), - write.iovs.len() as u32, - ) - .offset(offset as _) - .build() - }, + Writev { + fd: fd.clone(), + bufs, + iovs, + offset, + } + .submit() + } +} + +impl Buildable for Writev +where + Self: 'static + Sized, +{ + type CqeType = op::SingleCQE; + + fn create_sqe(&mut self) -> io_uring::squeue::Entry { + use io_uring::{opcode, types}; + + opcode::Writev::new( + types::Fd(self.fd.raw_fd()), + self.iovs.as_ptr(), + self.iovs.len() as u32, ) + .offset(self.offset as _) + .build() } }