Skip to content

Commit

Permalink
feat(services/monoiofs): append, create_dir, copy and rename (#5041)
Browse files Browse the repository at this point in the history
feat: append, create_dir, copy and rename
  • Loading branch information
NKID00 authored Aug 24, 2024
1 parent 89179e5 commit c152360
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 11 deletions.
96 changes: 92 additions & 4 deletions core/src/services/monoiofs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use chrono::DateTime;
use monoio::fs::OpenOptions;

use super::core::MonoiofsCore;
use super::core::BUFFER_SIZE;
use super::reader::MonoiofsReader;
use super::writer::MonoiofsWriter;
use crate::raw::*;
Expand Down Expand Up @@ -114,7 +116,11 @@ impl Access for MonoiofsBackend {
stat: true,
read: true,
write: true,
write_can_append: true,
delete: true,
rename: true,
create_dir: true,
copy: true,
..Default::default()
});
am.into()
Expand Down Expand Up @@ -150,10 +156,9 @@ impl Access for MonoiofsBackend {
Ok((RpRead::default(), reader))
}

async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
// TODO: create parent directory before write
let path = self.core.prepare_path(path);
let writer = MonoiofsWriter::new(self.core.clone(), path).await?;
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.prepare_write_path(path).await?;
let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
Ok((RpWrite::default(), writer))
}

Expand Down Expand Up @@ -186,4 +191,87 @@ impl Access for MonoiofsBackend {
Err(err) => Err(new_std_io_error(err)),
}
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch(move || monoio::fs::rename(from, to))
.await
.map_err(new_std_io_error)?;
Ok(RpRename::default())
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);
self.core
.dispatch(move || monoio::fs::create_dir_all(path))
.await
.map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch({
let core = self.core.clone();
move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(to)
.await?;

// AsyncReadRent and AsyncWriteRent is not implemented
// for File, so we can't write this:
// monoio::io::copy(&mut from, &mut to).await?;

let mut pos = 0;
// allocate and resize buffer
let mut buf = core.buf_pool.get();
// set capacity of buf to exact size to avoid excessive read
buf.reserve(BUFFER_SIZE);
let _ = buf.split_off(BUFFER_SIZE);

loop {
let result;
(result, buf) = from.read_at(buf, pos).await;
if result? == 0 {
// EOF
break;
}
let result;
(result, buf) = to.write_all_at(buf, pos).await;
result?;
pos += buf.len() as u64;
buf.clear();
}
core.buf_pool.put(buf);
Ok(())
}
})
.await
.map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
}
20 changes: 20 additions & 0 deletions core/src/services/monoiofs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,30 @@ impl MonoiofsCore {
&self.root
}

/// join root and path
pub fn prepare_path(&self, path: &str) -> PathBuf {
self.root.join(path.trim_end_matches('/'))
}

/// join root and path, create parent dirs
pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
let path = self.prepare_path(path);
let parent = path
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", path.to_string_lossy())
})?
.to_path_buf();
self.dispatch(move || monoio::fs::create_dir_all(parent))
.await
.map_err(new_std_io_error)?;
Ok(path)
}

/// entrypoint of each worker thread, sets up monoio runtimes and channels
fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
let mut rt = RuntimeBuilder::<FusionDriver>::new()
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/monoiofs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ This service can be used to:
- [x] stat
- [x] read
- [x] write
- [ ] append
- [ ] create_dir
- [x] append
- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [x] copy
- [x] rename
- [ ] list
- [ ] ~~presign~~
- [ ] blocking
Expand Down
8 changes: 5 additions & 3 deletions core/src/services/monoiofs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct MonoiofsWriter {
}

impl MonoiofsWriter {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf) -> Result<Self> {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
let (open_result_tx, open_result_rx) = oneshot::channel();
let (tx, rx) = mpsc::unbounded();
core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx))
core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
.await;
core.unwrap(open_result_rx.await)?;
Ok(Self { core, tx, pos: 0 })
Expand All @@ -57,13 +57,15 @@ impl MonoiofsWriter {
/// entrypoint of worker task that runs in context of monoio
async fn worker_entrypoint(
path: PathBuf,
append: bool,
mut rx: mpsc::UnboundedReceiver<WriterRequest>,
open_result_tx: oneshot::Sender<Result<()>>,
) {
let result = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.append(append)
.truncate(!append)
.open(path)
.await;
// [`monoio::fs::File`] is non-Send, hence it is kept within
Expand Down

0 comments on commit c152360

Please sign in to comment.