Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all recv_with_delay and add shutdown condition to loops in client-core #5435

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/client-core/src/client/mix_traffic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl MixTrafficController {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");

loop {
while !shutdown.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
Expand Down Expand Up @@ -146,7 +146,7 @@ impl MixTrafficController {
log::trace!("MixTrafficController, client request channel closed");
}
},
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl AcknowledgementListener {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("AcknowledgementListener: Received shutdown");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl ActionController {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started ActionController with graceful shutdown support");

loop {
while !shutdown.is_shutdown() {
tokio::select! {
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
Expand All @@ -286,7 +286,7 @@ impl ActionController {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("ActionController: Received shutdown");
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl SentNotificationListener {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started SentNotificationListener with graceful shutdown support");

loop {
while !shutdown.is_shutdown() {
tokio::select! {
frag_id = self.sent_notifier.next() => match frag_id {
Some(frag_id) => {
Expand All @@ -51,7 +51,7 @@ impl SentNotificationListener {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("SentNotificationListener: Received shutdown");
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ where
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));

loop {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
Expand Down
4 changes: 2 additions & 2 deletions common/client-core/src/client/received_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ impl<R: MessageReceiver> RequestReceiver<R> {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("RequestReceiver: Received shutdown");
}
request = self.query_receiver.next() => {
Expand All @@ -441,7 +441,7 @@ impl<R: MessageReceiver> RequestReceiver<R> {
},
}
}
shutdown.recv_timeout().await;
shutdown.recv().await;
log::debug!("RequestReceiver: Exiting");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ where
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("ReplyController: Received shutdown");
},
req = self.request_receiver.next() => match req {
Expand Down
2 changes: 1 addition & 1 deletion common/client-core/src/client/statistics_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);

loop {
while !task_client.is_shutdown() {
tokio::select! {
biased;
_ = task_client.recv() => {
Expand Down
Loading