diff --git a/src/connection.rs b/src/connection.rs index fad471b..9fd05eb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -869,26 +869,22 @@ impl Connection { return Ok(Connection { id, url, sender }); } - let mut fatal_errors = vec![]; - let mut retryable_errors = vec![]; + // Prefer returning a retryable error if possible, so that the caller can + // retry the connection attempt. If all errors are fatal, return the first + // fatal error found. + let mut fatal_error = None; + for e in errors.into_iter() { if e.establish_retryable() { - retryable_errors.push(e); + return Err(e); } else { - fatal_errors.push(e); + if fatal_error.is_none() { + fatal_error = Some(e); + } } } - if retryable_errors.is_empty() { - error!("connection error, not retryable: {:?}", fatal_errors); - Err(ConnectionError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - "fatal error when connecting to the Pulsar server", - ))) - } else { - warn!("retry establish connection on: {:?}", retryable_errors); - Err(retryable_errors.into_iter().next().unwrap()) - } + Err(fatal_error.expect("expected fatal error")) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 6dcb1a9..a4b62e1 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -340,6 +340,7 @@ impl ConnectionManager { self.executor.delay(current_backoff).await; } Err(e) => { + error!("connection error, not retryable: {}", e); return Err(e); } }