Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(services/ftp): Impl parse_error instead of From<Error> #3891

Merged
merged 2 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions core/src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use suppaftp::ImplAsyncFtpStream;
use suppaftp::Status;
use tokio::sync::OnceCell;

use super::err::parse_error;
use super::lister::FtpLister;
use super::util::FtpReader;
use super::writer::FtpWriter;
Expand Down Expand Up @@ -334,7 +335,7 @@ impl Accessor for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}
}
Expand All @@ -351,24 +352,35 @@ impl Accessor for FtpBackend {
let br = args.range();
let r: Box<dyn AsyncRead + Send + Unpin> = match (br.offset(), br.size()) {
(Some(offset), Some(size)) => {
ftp_stream.resume_transfer(offset as usize).await?;
let ds = ftp_stream.retr_as_stream(path).await?.take(size);
ftp_stream
.resume_transfer(offset as usize)
.await
.map_err(parse_error)?;
let ds = ftp_stream
.retr_as_stream(path)
.await
.map_err(parse_error)?
.take(size);
Box::new(ds)
}
(Some(offset), None) => {
ftp_stream.resume_transfer(offset as usize).await?;
let ds = ftp_stream.retr_as_stream(path).await?;
ftp_stream
.resume_transfer(offset as usize)
.await
.map_err(parse_error)?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
(None, Some(size)) => {
ftp_stream
.resume_transfer((meta.size() as u64 - size) as usize)
.await?;
let ds = ftp_stream.retr_as_stream(path).await?;
.await
.map_err(parse_error)?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
(None, None) => {
let ds = ftp_stream.retr_as_stream(path).await?;
let ds = ftp_stream.retr_as_stream(path).await.map_err(parse_error)?;
Box::new(ds)
}
};
Expand All @@ -384,6 +396,7 @@ impl Accessor for FtpBackend {
// TODO: we can optimize this by checking dir existence first.
let mut ftp_stream = self.ftp_connect(Operation::Write).await?;
let mut curr_path = String::new();

for path in paths {
curr_path.push_str(path);
match ftp_stream.mkdir(&curr_path).await {
Expand All @@ -394,7 +407,7 @@ impl Accessor for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}
}
Expand Down Expand Up @@ -439,7 +452,7 @@ impl Accessor for FtpBackend {
}))
| Ok(_) => (),
Err(e) => {
return Err(e.into());
return Err(parse_error(e));
}
}

Expand All @@ -450,7 +463,7 @@ impl Accessor for FtpBackend {
let mut ftp_stream = self.ftp_connect(Operation::List).await?;

let pathname = if path == "/" { None } else { Some(path) };
let files = ftp_stream.list(pathname).await?;
let files = ftp_stream.list(pathname).await.map_err(parse_error)?;

Ok((
RpList::default(),
Expand All @@ -475,10 +488,11 @@ impl FtpBackend {
})
.await
})
.await?;
.await
.map_err(parse_error)?;

pool.get_owned().await.map_err(|err| match err {
RunError::User(err) => err.into(),
RunError::User(err) => parse_error(err),
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
}
Expand All @@ -492,7 +506,7 @@ impl FtpBackend {

let pathname = if parent == "/" { None } else { Some(parent) };

let resp = ftp_stream.list(pathname).await?;
let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;

// Get stat of file.
let mut files = resp
Expand Down
42 changes: 20 additions & 22 deletions core/src/services/ftp/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,27 @@ use suppaftp::Status;
use crate::Error;
use crate::ErrorKind;

impl From<FtpError> for Error {
fn from(e: FtpError) -> Self {
let (kind, retryable) = match e {
// Allow retry for error
//
// `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }`
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => {
(ErrorKind::Unexpected, true)
}
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => {
(ErrorKind::NotFound, false)
}
// Allow retry bad response.
FtpError::BadResponse => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

let mut err = Error::new(kind, "ftp error").set_source(e);

if retryable {
err = err.set_temporary();
pub fn parse_error(err: FtpError) -> Error {
let (kind, retryable) = match err {
// Allow retry for error
//
// `{ status: NotAvailable, body: "421 There are too many connections from your internet address." }`
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::NotAvailable => {
(ErrorKind::Unexpected, true)
}
FtpError::UnexpectedResponse(ref resp) if resp.status == Status::FileUnavailable => {
(ErrorKind::NotFound, false)
}
// Allow retry bad response.
FtpError::BadResponse => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

err
let mut err = Error::new(kind, "ftp error").set_source(err);

if retryable {
err = err.set_temporary();
}

err
}
4 changes: 3 additions & 1 deletion core/src/services/ftp/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use suppaftp::Status;

use super::backend::Manager;
use crate::raw::*;
use crate::services::ftp::err::parse_error;
use crate::*;

/// Wrapper for ftp data stream and command stream.
Expand Down Expand Up @@ -75,7 +76,8 @@ impl oio::Read for FtpReader {
Status::ClosingDataConnection,
Status::RequestedFileActionOk,
])
.await?;
.await
.map_err(parse_error)?;

Ok(())
};
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/ftp/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::AsyncWriteExt;
use super::backend::FtpBackend;
use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::services::ftp::err::parse_error;
use crate::*;

pub type FtpWriters = oio::OneShotWriter<FtpWriter>;
Expand Down Expand Up @@ -53,12 +54,18 @@ impl oio::OneShotWrite for FtpWriter {
let bs = bs.bytes(size);

let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
let mut data_stream = ftp_stream
.append_with_stream(&self.path)
.await
.map_err(parse_error)?;
data_stream.write_all(&bs).await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err)
})?;

ftp_stream.finalize_put_stream(data_stream).await?;
ftp_stream
.finalize_put_stream(data_stream)
.await
.map_err(parse_error)?;
Ok(())
}
}
Loading