From 9ccbcfd205e92669e06115c34e375780d1501eb4 Mon Sep 17 00:00:00 2001 From: Sen Na Date: Sun, 2 Apr 2023 17:17:03 +0800 Subject: [PATCH 1/2] feat: Retry when some of batch operations are failing Also added a test for retry on batch operation. This implementation is not so elegant as far as I'm considering. Batch operation could possibly be retried even when result is `Ok`. So I had to wrap it in a `Result<(), Result>` type. And the `errors` method on `BatchedResult` requires dynamic dispatching. --- core/src/layers/retry.rs | 107 ++++++++++++++++++++++++++++++++++++--- core/src/raw/rps.rs | 8 +++ 2 files changed, 108 insertions(+), 7 deletions(-) diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 0a6a347c3285..42944f7dd18a 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -288,16 +288,29 @@ impl LayeredAccessor for RetryAccessor { } async fn batch(&self, args: OpBatch) -> Result { - { || self.inner.batch(args.clone()) } + { || self.inner.batch(args.clone()).map(|res| Err::<(), _>(res)) } .retry(&self.builder) - .when(|e| e.is_temporary()) - .notify(|err, dur| { - warn!( + .when(|res| match res { + Ok(rp) => rp + .results() + .errors() + .filter(|e| e.is_temporary()) + .next() + .is_some(), + Err(err) => err.is_temporary(), + }) + .notify(|res, dur| match res { + Ok(rp) => warn!( target: "opendal::service", "operation={} -> retry after {}s: error={:?}", - Operation::Batch, dur.as_secs_f64(), err) + Operation::Batch, dur.as_secs_f64(), + rp.results().errors().filter(|e| e.is_temporary()).collect::>()), + Err(err) => warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Batch, dur.as_secs_f64(), err), }) - .map(|v| v.map_err(|e| e.set_persistent())) + .map(|v| v.unwrap_err().map_err(|e| e.set_persistent())) .await } @@ -809,7 +822,7 @@ mod tests { fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); - am.set_capabilities(AccessorCapability::List); + am.set_capabilities(AccessorCapability::List | AccessorCapability::Batch); am.set_hints(AccessorHint::ReadStreamable); am @@ -829,6 +842,61 @@ mod tests { let pager = MockPager::default(); Ok((RpList::default(), pager)) } + + async fn batch(&self, op: OpBatch) -> Result { + let mut attempt = self.attempt.lock().unwrap(); + *attempt += 1; + + use BatchOperations::*; + match op.into_operation() { + Delete(v) => match *attempt { + 1 => Err( + Error::new(ErrorKind::Unexpected, "retryable_error from reader") + .set_temporary(), + ), + 2 => Ok(RpBatch::new(BatchedResults::Delete( + v.into_iter() + .map(|(s, _)| { + ( + s, + Err(Error::new( + ErrorKind::Unexpected, + "retryable_error from reader", + ) + .set_temporary()), + ) + }) + .collect(), + ))), + 3 => Ok(RpBatch::new(BatchedResults::Delete( + v.into_iter() + .enumerate() + .map(|(i, (s, _))| { + ( + s, + match i { + 0 => Err(Error::new( + ErrorKind::Unexpected, + "retryable_error from reader", + ) + .set_temporary()), + _ => Ok(RpDelete {}), + }, + ) + }) + .collect(), + ))), + 4 => Err( + Error::new(ErrorKind::Unexpected, "retryable_error from reader") + .set_temporary(), + ), + 5 => Ok(RpBatch::new(BatchedResults::Delete( + v.into_iter().map(|(s, _)| (s, Ok(RpDelete {}))).collect(), + ))), + _ => unreachable!(), + }, + } + } } #[derive(Debug, Clone, Default)] @@ -971,4 +1039,29 @@ mod tests { assert_eq!(actual, expected); } + + #[tokio::test] + async fn test_retry_batch() { + let _ = env_logger::try_init(); + + let builder = MockBuilder::default(); + // set to a lower delay to make it run faster + let op = Operator::new(builder.clone()) + .unwrap() + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_secs_f32(0.1)) + .with_max_times(5), + ) + .finish(); + + let paths = vec![ + "hello".into(), + "world".into(), + "test".into(), + "batch".into(), + ]; + op.remove(paths).await.expect("batch must succeed"); + assert_eq!(*builder.attempt.lock().unwrap(), 5); + } } diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs index 75b2c8429996..05954afce2ac 100644 --- a/core/src/raw/rps.rs +++ b/core/src/raw/rps.rs @@ -190,6 +190,14 @@ impl BatchedResults { Delete(v) => v.iter().filter(|v| v.1.is_err()).count(), } } + + /// Return an iterator over error results. + pub fn errors(&self) -> Box + '_> { + use BatchedResults::*; + match self { + Delete(v) => Box::new(v.iter().filter_map(|v| v.1.as_ref().err())), + } + } } /// Reply for `stat` operation. From d64e6dc10affbc0a7f77adcb41ed59e740324eea Mon Sep 17 00:00:00 2001 From: Sen Na Date: Sun, 2 Apr 2023 17:54:34 +0800 Subject: [PATCH 2/2] fix: Fix clippy warnings --- core/src/layers/retry.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 42944f7dd18a..3e251a0ae8c9 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -288,15 +288,10 @@ impl LayeredAccessor for RetryAccessor { } async fn batch(&self, args: OpBatch) -> Result { - { || self.inner.batch(args.clone()).map(|res| Err::<(), _>(res)) } + { || self.inner.batch(args.clone()).map(Err::<(), _>) } .retry(&self.builder) .when(|res| match res { - Ok(rp) => rp - .results() - .errors() - .filter(|e| e.is_temporary()) - .next() - .is_some(), + Ok(rp) => rp.results().errors().any(|e| e.is_temporary()), Err(err) => err.is_temporary(), }) .notify(|res, dur| match res {