Skip to content

Commit

Permalink
Merge pull request #76 from succinctlabs/n/conn-timeout
Browse files Browse the repository at this point in the history
fix: add timeouts on initial connections
  • Loading branch information
nhtyy authored Feb 15, 2025
2 parents 3e2d897 + cff097f commit 0e92500
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions services/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use services::types::{Commit, GrandpaJustification};
use sp_core::bytes;
use subxt::backend::rpc::RpcSubscription;

use timeout::Timeout;

/// The justification type that the Avail Subxt client returns for justifications. Needs a custom
/// deserializer, so we can't use the equivalent `GrandpaJustification` type.
#[derive(Clone, Debug, Decode)]
Expand Down Expand Up @@ -50,7 +52,7 @@ async fn handle_subscription(
timeout_duration: std::time::Duration,
) {
loop {
match tokio::time::timeout(timeout_duration, sub.next()).await {
match sub.next().timeout(timeout_duration).await {
Ok(Some(Ok(justification))) => {
debug!(
"New justification from block {}",
Expand Down Expand Up @@ -104,8 +106,17 @@ async fn listen_for_justifications() {

loop {
info!("Initializing fetcher and subscription...");
let fetcher = RpcDataFetcher::new().await;
let aws_client = AWSClient::new().await;

let Ok(fetcher) = RpcDataFetcher::new().timeout(timeout_duration).await else {
error!("Failed to initialize fetcher after timeout");
continue;
};

// Initialize the AWS client.
let Ok(aws_client) = AWSClient::new().timeout(timeout_duration).await else {
error!("Failed to initialize AWS client after timeout");
continue;
};

match initialize_subscription(&fetcher).await {
Ok(mut sub) => {
Expand All @@ -129,3 +140,19 @@ pub async fn main() {

listen_for_justifications().await;
}

mod timeout {
use std::future::Future;
use std::time::Duration;
use tokio::time::{timeout, Timeout as TimeoutFuture};

pub trait Timeout: Sized {
fn timeout(self, duration: Duration) -> TimeoutFuture<Self>;
}

impl<T: Future> Timeout for T {
fn timeout(self, duration: Duration) -> TimeoutFuture<Self> {
timeout(duration, self)
}
}
}

0 comments on commit 0e92500

Please sign in to comment.