Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fs add concurrent write #4817

Merged
merged 1 commit into from
Jul 9, 2024
Merged

feat: fs add concurrent write #4817

merged 1 commit into from
Jul 9, 2024

Conversation

hoslo
Copy link
Contributor

@hoslo hoslo commented Jun 27, 2024

Part #4526

@hoslo hoslo requested a review from Xuanwo as a code owner June 27, 2024 03:51
@github-actions github-actions bot requested review from ClSlaid and Young-Flash June 27, 2024 03:51
@hoslo hoslo force-pushed the fs-concurrent-write branch from 4047e39 to 29de2d8 Compare June 27, 2024 03:55
@hoslo
Copy link
Contributor Author

hoslo commented Jul 4, 2024

@Xuanwo I don't understand why the behavior test is failing. Can you help me look at it?

@hoslo hoslo force-pushed the fs-concurrent-write branch 8 times, most recently from 0c8a26a to acbc5e7 Compare July 8, 2024 08:16
@@ -49,6 +49,12 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static {
offset: u64,
buf: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;

/// close is used to close the underlying storage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close will close the file instead of the storage.

/// close is used to close the underlying storage.
fn close(&self, offset: u64, buf: Buffer) -> impl Future<Output = Result<()>> + MaybeSend;

/// abort is used to abort the underlying storage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same.

@@ -148,13 +154,16 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {

if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
self.w.write_all_at(offset, buffer).await?;
self.w.close(offset, buffer).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why close will carry a buffer?


tokio::task::spawn_blocking(move || {
let mut offset = offset;
let mut buf: &[u8] = &buf.to_bytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_bytes will copy the entire buffer to bytes which is slow. Maybe we can use:

while !buf.is_empty() {
    match f.seek_write(buf.chunk(), offset) {
        Ok(n) => {
            buf.advance(n);
            offset += n as u64
        }
        Err(e) => return Err(e).map_err(new_std_io_error),
    }
}

.await;

tokio::task::spawn_blocking(move || {
f.write_all_at(&buf.to_bytes(), offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use to_bytes whenever possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what method should i use?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use buf.chunk() and buf.advance(). I think we can implement a write_at under different target so we don't need to remove the write impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this?

@hoslo hoslo force-pushed the fs-concurrent-write branch 5 times, most recently from 00964ed to 5ce79e3 Compare July 8, 2024 11:08
@@ -149,12 +155,16 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
self.w.write_all_at(offset, buffer).await?;
self.w.as_ref().close().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call close no matter there are cache or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

f.seek_write(buf, offset).map_err(new_std_io_error)
}

#[cfg(not(target_os = "windows"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use cfg(unix) and cfg(windows)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


#[cfg(target_os = "windows")]
fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
f.seek_write(buf, offset).map_err(new_std_io_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use use here to avoid leave them too away to find.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@hoslo hoslo force-pushed the fs-concurrent-write branch from 5ce79e3 to 93d5a6d Compare July 8, 2024 11:25
@hoslo
Copy link
Contributor Author

hoslo commented Jul 9, 2024

@Xuanwo any else problem?

Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Xuanwo Xuanwo merged commit b71c924 into main Jul 9, 2024
254 checks passed
@Xuanwo Xuanwo deleted the fs-concurrent-write branch July 9, 2024 03:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants