Skip to content

Commit

Permalink
refactor(services/sftp): Impl parse_error instead of From<Error> (#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
G-XD authored Jan 4, 2024
1 parent 45270f8 commit dd2e68f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 66 deletions.
50 changes: 30 additions & 20 deletions core/src/services/sftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use serde::Deserialize;

use super::error::is_not_found;
use super::error::is_sftp_protocol_error;
use super::error::parse_sftp_error;
use super::error::parse_ssh_error;
use super::lister::SftpLister;
use super::writer::SftpWriter;
use crate::raw::*;
Expand Down Expand Up @@ -291,7 +293,7 @@ impl Accessor for SftpBackend {
if let Err(e) = res {
// ignore error if dir already exists
if !is_sftp_protocol_error(&e) {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
fs.set_cwd(&current);
Expand All @@ -305,7 +307,7 @@ impl Accessor for SftpBackend {
let mut fs = client.fs();
fs.set_cwd(&self.root);

let meta: Metadata = fs.metadata(path).await?.into();
let meta: Metadata = fs.metadata(path).await.map_err(parse_sftp_error)?.into();

Ok(RpStat::new(meta))
}
Expand All @@ -315,9 +317,12 @@ impl Accessor for SftpBackend {

let mut fs = client.fs();
fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await?;
let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;

let f = client.open(path.as_path()).await?;
let f = client
.open(path.as_path())
.await
.map_err(parse_sftp_error)?;

// Sorry for the ugly code...
//
Expand All @@ -339,7 +344,7 @@ impl Accessor for SftpBackend {

let mut fs = client.fs();
fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await?;
let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;

let mut option = client.options();
option.create(true);
Expand All @@ -349,7 +354,7 @@ impl Accessor for SftpBackend {
option.write(true);
}

let file = option.open(path).await?;
let file = option.open(path).await.map_err(parse_sftp_error)?;

Ok((RpWrite::new(), SftpWriter::new(file)))
}
Expand All @@ -368,15 +373,15 @@ impl Accessor for SftpBackend {
if is_not_found(&e) {
return Ok(RpDelete::default());
} else {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
}
.read_dir()
.boxed();

while let Some(file) = dir.next().await {
let file = file?;
let file = file.map_err(parse_sftp_error)?;
let file_name = file.filename().to_str();
if file_name == Some(".") || file_name == Some("..") {
continue;
Expand All @@ -394,14 +399,14 @@ impl Accessor for SftpBackend {

match fs.remove_dir(path).await {
Err(e) if !is_not_found(&e) => {
return Err(e.into());
return Err(parse_sftp_error(e));
}
_ => {}
}
} else {
match fs.remove_file(path).await {
Err(e) if !is_not_found(&e) => {
return Err(e.into());
return Err(parse_sftp_error(e));
}
_ => {}
}
Expand All @@ -423,7 +428,7 @@ impl Accessor for SftpBackend {
if is_not_found(&e) {
return Ok((RpList::default(), None));
} else {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
}
Expand All @@ -445,12 +450,15 @@ impl Accessor for SftpBackend {
self.create_dir(dir, OpCreateDir::default()).await?;
}

let src = fs.canonicalize(from).await?;
let dst = fs.canonicalize(to).await?;
let mut src_file = client.open(&src).await?;
let mut dst_file = client.create(dst).await?;
let src = fs.canonicalize(from).await.map_err(parse_sftp_error)?;
let dst = fs.canonicalize(to).await.map_err(parse_sftp_error)?;
let mut src_file = client.open(&src).await.map_err(parse_sftp_error)?;
let mut dst_file = client.create(dst).await.map_err(parse_sftp_error)?;

src_file.copy_all_to(&mut dst_file).await?;
src_file
.copy_all_to(&mut dst_file)
.await
.map_err(parse_sftp_error)?;

Ok(RpCopy::default())
}
Expand All @@ -464,7 +472,7 @@ impl Accessor for SftpBackend {
if let Some((dir, _)) = to.rsplit_once('/') {
self.create_dir(dir, OpCreateDir::default()).await?;
}
fs.rename(from, to).await?;
fs.rename(from, to).await.map_err(parse_sftp_error)?;

Ok(RpRename::default())
}
Expand Down Expand Up @@ -517,9 +525,11 @@ async fn connect_sftp(

session.known_hosts_check(known_hosts_strategy);

let session = session.connect(&endpoint).await?;
let session = session.connect(&endpoint).await.map_err(parse_ssh_error)?;

let sftp = Sftp::from_session(session, SftpOptions::default()).await?;
let sftp = Sftp::from_session(session, SftpOptions::default())
.await
.map_err(parse_sftp_error)?;

if !root.is_empty() {
let mut fs = sftp.fs();
Expand All @@ -533,7 +543,7 @@ async fn connect_sftp(
if let Err(e) = res {
// ignore error if dir already exists
if !is_sftp_protocol_error(&e) {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
fs.set_cwd(&current);
Expand Down
58 changes: 13 additions & 45 deletions core/src/services/sftp/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,23 @@ use openssh_sftp_client::Error as SftpClientError;
use crate::Error;
use crate::ErrorKind;

#[derive(Debug)]
pub enum SftpError {
SftpClientError(SftpClientError),
SshError(SshError),
}

impl From<SftpClientError> for Error {
fn from(e: SftpClientError) -> Self {
let kind = match &e {
SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported,
SftpClientError::SftpError(kind, _msg) => match kind {
SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
_ => ErrorKind::Unexpected,
},
pub fn parse_sftp_error(e: SftpClientError) -> Error {
let kind = match &e {
SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported,
SftpClientError::SftpError(kind, _msg) => match kind {
SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
_ => ErrorKind::Unexpected,
};

Error::new(kind, "sftp error").set_source(e)
}
}

/// REMOVE ME: it's not allowed to impl `<T>` for Error.
impl From<SshError> for Error {
fn from(e: SshError) -> Self {
Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
}
}

impl From<SftpClientError> for SftpError {
fn from(e: SftpClientError) -> Self {
SftpError::SftpClientError(e)
}
}
},
_ => ErrorKind::Unexpected,
};

impl From<SshError> for SftpError {
fn from(e: SshError) -> Self {
SftpError::SshError(e)
}
Error::new(kind, "sftp error").set_source(e)
}

impl From<SftpError> for Error {
fn from(e: SftpError) -> Self {
match e {
SftpError::SftpClientError(e) => e.into(),
SftpError::SshError(e) => e.into(),
}
}
pub fn parse_ssh_error(e: SshError) -> Error {
Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
}

pub(super) fn is_not_found(e: &SftpClientError) -> bool {
Expand Down
6 changes: 5 additions & 1 deletion core/src/services/sftp/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use openssh_sftp_client::fs::ReadDir;
use crate::raw::oio;
use crate::Result;

use super::error::parse_sftp_error;

pub struct SftpLister {
dir: Pin<Box<ReadDir>>,
prefix: String,
Expand All @@ -47,7 +49,9 @@ impl SftpLister {
#[async_trait]
impl oio::List for SftpLister {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
let item = ready!(self.dir.poll_next_unpin(cx)).transpose()?;
let item = ready!(self.dir.poll_next_unpin(cx))
.transpose()
.map_err(parse_sftp_error)?;

match item {
Some(e) => {
Expand Down

0 comments on commit dd2e68f

Please sign in to comment.