Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Builder framework #171

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions src/driver/accept.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::driver::op::{self, Completable};
use crate::driver::op::{self, Buildable, Completable};
use crate::driver::{Op, SharedFd, Socket};
use std::net::SocketAddr;
use std::{boxed::Box, io};
Expand All @@ -10,27 +10,34 @@ pub(crate) struct Accept {

impl Op<Accept> {
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Op<Accept>> {
use io_uring::{opcode, types};

let socketaddr = Box::new((
unsafe { std::mem::zeroed() },
std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t,
));
Op::submit_with(
Accept {
fd: fd.clone(),
socketaddr,
},
|accept| {
opcode::Accept::new(
types::Fd(accept.fd.raw_fd()),
&mut accept.socketaddr.0 as *mut _ as *mut _,
&mut accept.socketaddr.1,
)
.flags(libc::O_CLOEXEC)
.build()
},
Accept {
fd: fd.clone(),
socketaddr,
}
.submit()
}
}

impl Buildable for Accept
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

fn create_sqe(&mut self) -> io_uring::squeue::Entry {
use io_uring::{opcode, types};

opcode::Accept::new(
types::Fd(self.fd.raw_fd()),
&mut self.socketaddr.0 as *mut _ as *mut _,
&mut self.socketaddr.1,
)
.flags(libc::O_CLOEXEC)
.build()
}
}

Expand Down
18 changes: 13 additions & 5 deletions src/driver/close.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::driver::Op;

use crate::driver::op::{self, Completable};
use crate::driver::op::{self, Buildable, Completable};
use std::io;
use std::os::unix::io::RawFd;

Expand All @@ -10,11 +10,19 @@ pub(crate) struct Close {

impl Op<Close> {
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
use io_uring::{opcode, types};
Close { fd }.submit()
}
}

Op::submit_with(Close { fd }, |close| {
opcode::Close::new(types::Fd(close.fd)).build()
})
impl Buildable for Close
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

fn create_sqe(&mut self) -> io_uring::squeue::Entry {
use io_uring::{opcode, types};
opcode::Close::new(types::Fd(self.fd)).build()
}
}

Expand Down
35 changes: 21 additions & 14 deletions src/driver/connect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::driver::op::{self, Completable};
use crate::driver::op::{self, Buildable, Completable};
use crate::driver::{Op, SharedFd};
use socket2::SockAddr;
use std::io;
Expand All @@ -14,22 +14,29 @@ pub(crate) struct Connect {
impl Op<Connect> {
/// Submit a request to connect.
pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result<Op<Connect>> {
Connect {
fd: fd.clone(),
socket_addr: Box::new(socket_addr),
}
.submit()
}
}

impl Buildable for Connect
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

fn create_sqe(&mut self) -> io_uring::squeue::Entry {
use io_uring::{opcode, types};

Op::submit_with(
Connect {
fd: fd.clone(),
socket_addr: Box::new(socket_addr),
},
|connect| {
opcode::Connect::new(
types::Fd(connect.fd.raw_fd()),
connect.socket_addr.as_ptr(),
connect.socket_addr.len(),
)
.build()
},
opcode::Connect::new(
types::Fd(self.fd.raw_fd()),
self.socket_addr.as_ptr(),
self.socket_addr.len(),
)
.build()
}
}

Expand Down
38 changes: 29 additions & 9 deletions src/driver/fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,46 @@ use crate::driver::{Op, SharedFd};

use std::io;

use crate::driver::op::{self, Completable};
use crate::driver::op::{self, Buildable, Completable};
use io_uring::{opcode, types};

pub(crate) struct Fsync {
fd: SharedFd,
flags: Option<types::FsyncFlags>,
}

impl Op<Fsync> {
pub(crate) fn fsync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build()
})
Fsync {
fd: fd.clone(),
flags: None,
}
.submit()
}

pub(crate) fn datasync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd()))
.flags(types::FsyncFlags::DATASYNC)
.build()
})
Fsync {
fd: fd.clone(),
flags: Some(types::FsyncFlags::DATASYNC),
}
.submit()
}
}

impl Buildable for Fsync
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

fn create_sqe(&mut self) -> io_uring::squeue::Entry {
let opcode = opcode::Fsync::new(types::Fd(self.fd.raw_fd()));

if let Some(flags) = self.flags {
opcode.flags(flags).build()
} else {
opcode.build()
}
}
}

Expand Down
15 changes: 12 additions & 3 deletions src/driver/noop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::driver::{
op::{self, Completable},
op::{self, Buildable, Completable},
Op,
};
use std::io;
Expand All @@ -11,9 +11,18 @@ pub struct NoOp {}

impl Op<NoOp> {
pub fn no_op() -> io::Result<Op<NoOp>> {
use io_uring::opcode;
NoOp {}.submit()
}
}

impl Buildable for NoOp
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

Op::submit_with(NoOp {}, |_| opcode::Nop::new().build())
fn create_sqe(&mut self) -> io_uring::squeue::Entry {
io_uring::opcode::Nop::new().build()
}
}

Expand Down
41 changes: 29 additions & 12 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ mod slab_list;
use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};

use crate::driver;
use crate::runtime::CONTEXT;
use crate::util::PhantomUnsendUnsync;

Expand Down Expand Up @@ -88,30 +87,39 @@ where
T: Completable,
{
/// Create a new operation
fn new(data: T, inner: &mut driver::Driver) -> Self {
fn new(data: T, index: usize) -> Self {
Op {
index: inner.ops.insert(),
index,
data: Some(data),
_cqe_type: PhantomData,
_phantom: PhantomData,
}
}
}

pub(crate) trait Buildable
where
Self: 'static + Sized + Completable,
{
// The CqeType type which results from Submission
type CqeType;

fn create_sqe(&mut self) -> squeue::Entry;

/// Submit an operation to uring.
/// Build an Operation, and submit to uring.
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
fn submit(mut self) -> io::Result<Op<Self, <Self as Buildable>::CqeType>> {
CONTEXT.with(|cx| {
cx.with_driver_mut(|driver| {
// Create the operation
let mut op = Op::new(data, driver);
let index = driver.ops.insert();

// Configure the SQE
let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _);
let sqe = self.create_sqe().user_data(index as _);

// Create the operation
let op = Op::new(self, index);

// Push the new operation
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
Expand All @@ -123,6 +131,12 @@ where
})
})
}

// Submit an operation, linking it to the following entry
// fn link<S: Buildable>(self, s: S) -> Link<Op<Self, <Self as Buildable>::CqeType>, S> {
// let t = self.submit();
// Link {t,s}
// }
}

impl<T> Future for Op<T, SingleCQE>
Expand Down Expand Up @@ -430,7 +444,10 @@ mod test {
let op = CONTEXT.with(|cx| {
cx.set_driver(driver);

cx.with_driver_mut(|driver| Op::new(data.clone(), driver))
cx.with_driver_mut(|driver| {
let index = driver.ops.insert();
Op::new(data.clone(), index)
})
});

(op, data)
Expand Down
42 changes: 29 additions & 13 deletions src/driver/open.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::driver::{self, Op, SharedFd};
use crate::fs::{File, OpenOptions};

use crate::driver::op::{self, Completable};
use crate::driver::op::{self, Buildable, Completable};
use std::ffi::CString;
use std::io;
use std::path::Path;
Expand All @@ -11,29 +11,45 @@ use std::path::Path;
pub(crate) struct Open {
pub(crate) path: CString,
pub(crate) flags: libc::c_int,
options: OpenOptions,
}

impl Op<Open> {
/// Submit a request to open a file.
pub(crate) fn open(path: &Path, options: &OpenOptions) -> io::Result<Op<Open>> {
use io_uring::{opcode, types};
let path = driver::util::cstr(path)?;
let flags = libc::O_CLOEXEC
| options.access_mode()?
| options.creation_mode()?
| (options.custom_flags & !libc::O_ACCMODE);

Op::submit_with(Open { path, flags }, |open| {
// Get a reference to the memory. The string will be held by the
// operation state and will not be accessed again until the operation
// completes.
let p_ref = open.path.as_c_str().as_ptr();

opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
.flags(flags)
.mode(options.mode)
.build()
})
Open {
path,
flags,
options: options.clone(),
}
.submit()
}
}

impl Buildable for Open
where
Self: 'static + Sized,
{
type CqeType = op::SingleCQE;

fn create_sqe(&mut self) -> io_uring::squeue::Entry {
use io_uring::{opcode, types};

// Get a reference to the memory. The string will be held by the
// operation state and will not be accessed again until the operation
// completes.
let p_ref = self.path.as_c_str().as_ptr();

opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
.flags(self.flags)
.mode(self.options.mode)
.build()
}
}

Expand Down
Loading