Skip to content

Commit

Permalink
feat: Retry when some of batch operations are failing (#1840)
Browse files Browse the repository at this point in the history
* feat: Retry when some of batch operations are failing

Also added a test for retry on batch operation.

This implementation is not so elegant as far as I'm considering. Batch
operation could possibly be retried even when result is `Ok`. So I had
to wrap it in a `Result<(), Result<RpBatch>>` type. And the `errors`
method on `BatchedResult` requires dynamic dispatching.

* fix: Fix clippy warnings
  • Loading branch information
nasen23 authored Apr 2, 2023
1 parent 021985a commit 5c3ce7d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 7 deletions.
102 changes: 95 additions & 7 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,24 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
{ || self.inner.batch(args.clone()) }
{ || self.inner.batch(args.clone()).map(Err::<(), _>) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
warn!(
.when(|res| match res {
Ok(rp) => rp.results().errors().any(|e| e.is_temporary()),
Err(err) => err.is_temporary(),
})
.notify(|res, dur| match res {
Ok(rp) => warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Batch, dur.as_secs_f64(), err)
Operation::Batch, dur.as_secs_f64(),
rp.results().errors().filter(|e| e.is_temporary()).collect::<Vec<_>>()),
Err(err) => warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Batch, dur.as_secs_f64(), err),
})
.map(|v| v.map_err(|e| e.set_persistent()))
.map(|v| v.unwrap_err().map_err(|e| e.set_persistent()))
.await
}

Expand Down Expand Up @@ -809,7 +817,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_capabilities(AccessorCapability::List);
am.set_capabilities(AccessorCapability::List | AccessorCapability::Batch);
am.set_hints(AccessorHint::ReadStreamable);

am
Expand All @@ -829,6 +837,61 @@ mod tests {
let pager = MockPager::default();
Ok((RpList::default(), pager))
}

async fn batch(&self, op: OpBatch) -> Result<RpBatch> {
let mut attempt = self.attempt.lock().unwrap();
*attempt += 1;

use BatchOperations::*;
match op.into_operation() {
Delete(v) => match *attempt {
1 => Err(
Error::new(ErrorKind::Unexpected, "retryable_error from reader")
.set_temporary(),
),
2 => Ok(RpBatch::new(BatchedResults::Delete(
v.into_iter()
.map(|(s, _)| {
(
s,
Err(Error::new(
ErrorKind::Unexpected,
"retryable_error from reader",
)
.set_temporary()),
)
})
.collect(),
))),
3 => Ok(RpBatch::new(BatchedResults::Delete(
v.into_iter()
.enumerate()
.map(|(i, (s, _))| {
(
s,
match i {
0 => Err(Error::new(
ErrorKind::Unexpected,
"retryable_error from reader",
)
.set_temporary()),
_ => Ok(RpDelete {}),
},
)
})
.collect(),
))),
4 => Err(
Error::new(ErrorKind::Unexpected, "retryable_error from reader")
.set_temporary(),
),
5 => Ok(RpBatch::new(BatchedResults::Delete(
v.into_iter().map(|(s, _)| (s, Ok(RpDelete {}))).collect(),
))),
_ => unreachable!(),
},
}
}
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -971,4 +1034,29 @@ mod tests {

assert_eq!(actual, expected);
}

#[tokio::test]
async fn test_retry_batch() {
let _ = env_logger::try_init();

let builder = MockBuilder::default();
// set to a lower delay to make it run faster
let op = Operator::new(builder.clone())
.unwrap()
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_secs_f32(0.1))
.with_max_times(5),
)
.finish();

let paths = vec![
"hello".into(),
"world".into(),
"test".into(),
"batch".into(),
];
op.remove(paths).await.expect("batch must succeed");
assert_eq!(*builder.attempt.lock().unwrap(), 5);
}
}
8 changes: 8 additions & 0 deletions core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ impl BatchedResults {
Delete(v) => v.iter().filter(|v| v.1.is_err()).count(),
}
}

/// Return an iterator over error results.
pub fn errors(&self) -> Box<dyn Iterator<Item = &Error> + '_> {
use BatchedResults::*;
match self {
Delete(v) => Box::new(v.iter().filter_map(|v| v.1.as_ref().err())),
}
}
}

/// Reply for `stat` operation.
Expand Down

0 comments on commit 5c3ce7d

Please sign in to comment.