Skip to content

Commit

Permalink
fix sigterm issue with translator sv
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 15, 2025
1 parent 69a5398 commit 80b67d4
Showing 1 changed file with 68 additions and 55 deletions.
123 changes: 68 additions & 55 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::{
};

use tokio::{
sync::broadcast,
select,
sync::{broadcast, Notify},
task::{self, AbortHandle},
};
use tracing::{debug, error, info, warn};
Expand All @@ -32,6 +33,7 @@ pub mod utils;
pub struct TranslatorSv2 {
config: ProxyConfig,
reconnect_wait_time: u64,
shutdown: Arc<Notify>,
}

impl TranslatorSv2 {
Expand All @@ -41,6 +43,7 @@ impl TranslatorSv2 {
Self {
config,
reconnect_wait_time: wait_time,
shutdown: Arc::new(Notify::new()),
}
}

Expand All @@ -58,7 +61,8 @@ impl TranslatorSv2 {
let task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>> =
Arc::new(Mutex::new(Vec::new()));

self.internal_start(
Self::internal_start(
self.config.clone(),
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
Expand All @@ -72,74 +76,79 @@ impl TranslatorSv2 {
debug!("Starting up status listener");
let wait_time = self.reconnect_wait_time;
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = tokio::select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = tokio::signal::ctrl_c().fuse() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
tokio::spawn({
let shutdown_signal = self.shutdown();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
State::BridgeShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
});

loop {
select! {
task_status = rx_status.recv().fuse() => {
if let Ok(task_status_) = task_status {
match task_status_.state {
State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
self.shutdown().notify_one();
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);
let task_collector1 = task_collector_.clone();
let tx_sv1_notify1 = tx_sv1_notify.clone();
let target = target.clone();
let tx_status = tx_status.clone();
let proxy_config = self.config.clone();
tokio::spawn (async move {
// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector1.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
Self::internal_start(
proxy_config,
tx_sv1_notify1,
target.clone(),
tx_status.clone(),
task_collector1,
)
.await;
});
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
self.shutdown().notify_one();
}
}
} else {
info!("Channel closed");
break; // Channel closed
}
}
State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
_ = self.shutdown.notified() => {
info!("Shutting down gracefully...");
break;
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);

// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector_.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
self.internal_start(
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
task_collector_.clone(),
)
.await;
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
}
}

async fn internal_start(
&self,
proxy_config: ProxyConfig,
tx_sv1_notify: broadcast::Sender<server_to_client::Notify<'static>>,
target: Arc<Mutex<Vec<u8>>>,
tx_status: async_channel::Sender<Status<'static>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let proxy_config = self.config.clone();
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10);
Expand Down Expand Up @@ -278,6 +287,10 @@ impl TranslatorSv2 {
let _ =
task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string())));
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
}
}

fn kill_tasks(task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>) {
Expand Down

0 comments on commit 80b67d4

Please sign in to comment.