Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/monoiofs): append, create_dir, copy and rename #5041

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions core/src/services/monoiofs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use chrono::DateTime;
use monoio::fs::OpenOptions;

use super::core::MonoiofsCore;
use super::core::BUFFER_SIZE;
use super::reader::MonoiofsReader;
use super::writer::MonoiofsWriter;
use crate::raw::*;
Expand Down Expand Up @@ -114,7 +116,11 @@ impl Access for MonoiofsBackend {
stat: true,
read: true,
write: true,
write_can_append: true,
delete: true,
rename: true,
create_dir: true,
copy: true,
..Default::default()
});
am.into()
Expand Down Expand Up @@ -150,10 +156,9 @@ impl Access for MonoiofsBackend {
Ok((RpRead::default(), reader))
}

async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
// TODO: create parent directory before write
let path = self.core.prepare_path(path);
let writer = MonoiofsWriter::new(self.core.clone(), path).await?;
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.prepare_write_path(path).await?;
let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
Ok((RpWrite::default(), writer))
}

Expand Down Expand Up @@ -186,4 +191,87 @@ impl Access for MonoiofsBackend {
Err(err) => Err(new_std_io_error(err)),
}
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch(move || monoio::fs::rename(from, to))
.await
.map_err(new_std_io_error)?;
Ok(RpRename::default())
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);
self.core
.dispatch(move || monoio::fs::create_dir_all(path))
.await
.map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch({
let core = self.core.clone();
move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(to)
.await?;

// AsyncReadRent and AsyncWriteRent is not implemented
// for File, so we can't write this:
// monoio::io::copy(&mut from, &mut to).await?;

let mut pos = 0;
// allocate and resize buffer
let mut buf = core.buf_pool.get();
// set capacity of buf to exact size to avoid excessive read
buf.reserve(BUFFER_SIZE);
let _ = buf.split_off(BUFFER_SIZE);

loop {
let result;
(result, buf) = from.read_at(buf, pos).await;
if result? == 0 {
// EOF
break;
}
let result;
(result, buf) = to.write_all_at(buf, pos).await;
result?;
pos += buf.len() as u64;
buf.clear();
}
core.buf_pool.put(buf);
Ok(())
}
})
.await
.map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
}
20 changes: 20 additions & 0 deletions core/src/services/monoiofs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,30 @@ impl MonoiofsCore {
&self.root
}

/// join root and path
pub fn prepare_path(&self, path: &str) -> PathBuf {
self.root.join(path.trim_end_matches('/'))
}

/// join root and path, create parent dirs
pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
let path = self.prepare_path(path);
let parent = path
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", path.to_string_lossy())
})?
.to_path_buf();
self.dispatch(move || monoio::fs::create_dir_all(parent))
.await
.map_err(new_std_io_error)?;
Ok(path)
}

/// entrypoint of each worker thread, sets up monoio runtimes and channels
fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
let mut rt = RuntimeBuilder::<FusionDriver>::new()
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/monoiofs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ This service can be used to:
- [x] stat
- [x] read
- [x] write
- [ ] append
- [ ] create_dir
- [x] append
- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [x] copy
- [x] rename
- [ ] list
- [ ] ~~presign~~
- [ ] blocking
Expand Down
8 changes: 5 additions & 3 deletions core/src/services/monoiofs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct MonoiofsWriter {
}

impl MonoiofsWriter {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf) -> Result<Self> {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
let (open_result_tx, open_result_rx) = oneshot::channel();
let (tx, rx) = mpsc::unbounded();
core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx))
core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
.await;
core.unwrap(open_result_rx.await)?;
Ok(Self { core, tx, pos: 0 })
Expand All @@ -57,13 +57,15 @@ impl MonoiofsWriter {
/// entrypoint of worker task that runs in context of monoio
async fn worker_entrypoint(
path: PathBuf,
append: bool,
mut rx: mpsc::UnboundedReceiver<WriterRequest>,
open_result_tx: oneshot::Sender<Result<()>>,
) {
let result = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.append(append)
.truncate(!append)
.open(path)
.await;
// [`monoio::fs::File`] is non-Send, hence it is kept within
Expand Down
Loading