Skip to content

Commit

Permalink
Fix backoff logic in ResilientStream
Browse files Browse the repository at this point in the history
The backoff is now truly exponential (Math.pow(count, 2)) because the sleep is done before the next recursive call to loop instead of inside it.
This way the message "Restarting in x seconds" actually becomes true. Previously the backoff was somewhat similar to a Fibonacci sequence because the restarted program contained the previous iteration's sleep time plus the current one.

Also removes the ??? in favor of logging the Exception messages and rethrowing for improved observability. (the logging backend gets a chance to e.g. format the log for proper ingestion into services such as DataDog)
  • Loading branch information
poohsen committed Dec 7, 2023
1 parent 010fc67 commit d2be00a
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NonFatal
*
* In case of failure, the entire stream will be restarted after the specified retry time with an exponential backoff.
*
* By default the program will be restarted in 5 seconds, then 10, then 15, etc.
* By default the program will be restarted in 5 * 1^2 seconds, then 5 * 2^2 seconds, then 5 * 3^2 seconds, etc.
*
* @see
* ResilientStreamSpec for more.
Expand All @@ -47,14 +47,18 @@ object ResilientStream {

private def loop[F[_]: Log: Temporal](
program: Stream[F, Unit],
retry: FiniteDuration,
retryDelay: FiniteDuration,
count: Int
): Stream[F, Unit] =
program.handleErrorWith {
case NonFatal(err) =>
Stream.eval(Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >>
loop[F](Stream.sleep(retry * count.toLong) >> program, retry, count + 1)
case _ => ???
val delay = retryDelay * Math.pow(count, 2).toInt
Stream.eval(
Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in $delay...")
) >>
Stream.sleep(delay) >> loop[F](program, retryDelay, count + 1)
case fatal =>
Stream.eval(Log[F].error(s"Fatal error: ${fatal.getMessage}")) *> Stream.raiseError(fatal)
}

}

0 comments on commit d2be00a

Please sign in to comment.