Skip to content

Commit

Permalink
Add termination reason to request many
Browse files Browse the repository at this point in the history
Request many can termination for many different reasons.
This commit adds ability to retrieve information about the cause
without complicating the API.

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jan 9, 2025
1 parent bc9d673 commit 1d137ca
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 10 deletions.
35 changes: 33 additions & 2 deletions nats-extra/src/request_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::{
time::Duration,
};

use async_nats::{subject::ToSubject, Client, RequestError, Subscriber};
use async_nats::{subject::ToSubject, Client, RequestError, StatusCode, Subscriber};
use bytes::Bytes;
use futures::{FutureExt, Stream, StreamExt};

Expand Down Expand Up @@ -247,6 +247,7 @@ impl RequestMany {
sentinel: self.sentinel,
max_messages: self.max_messags,
stall_wait: self.stall_wait,
reason: None,
})
}
}
Expand All @@ -260,6 +261,23 @@ pub struct Responses {
sentinel: SentinelPredicate,
max_messages: Option<usize>,
stall_wait: Option<Duration>,
reason: Option<TerminationReason>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TerminationReason {
MaxMessages,
MaxWait,
StallWait,
Sentinel,
NoResponders,
SubscriptionClosed,
}

impl Responses {
pub fn termination_reason(&self) -> Option<TerminationReason> {
self.reason.clone()
}
}

impl Stream for Responses {
Expand All @@ -270,6 +288,7 @@ impl Stream for Responses {
if let Some(timer) = self.timer.as_mut() {
match timer.poll_unpin(cx) {
Poll::Ready(_) => {
self.reason = Some(TerminationReason::MaxWait);
return Poll::Ready(None);
}
Poll::Pending => {}
Expand All @@ -279,6 +298,7 @@ impl Stream for Responses {
// max_messages
if let Some(max_messages) = self.max_messages {
if self.messages_received >= max_messages {
self.reason = Some(TerminationReason::MaxMessages);
return Poll::Ready(None);
}
}
Expand All @@ -291,6 +311,7 @@ impl Stream for Responses {

match stall.as_mut().poll_unpin(cx) {
Poll::Ready(_) => {
self.reason = Some(TerminationReason::StallWait);
return Poll::Ready(None);
}
Poll::Pending => {}
Expand All @@ -300,6 +321,12 @@ impl Stream for Responses {
match self.responses.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => {
// no responders
if message.status == Some(StatusCode::NO_RESPONDERS) {
self.reason = Some(TerminationReason::NoResponders);
return Poll::Ready(None);
}

self.messages_received += 1;

// reset timer
Expand All @@ -309,6 +336,7 @@ impl Stream for Responses {
match self.sentinel {
Some(ref sentinel) => {
if sentinel(&message) {
self.reason = Some(TerminationReason::Sentinel);
Poll::Ready(None)
} else {
Poll::Ready(Some(message))
Expand All @@ -317,7 +345,10 @@ impl Stream for Responses {
None => Poll::Ready(Some(message)),
}
}
None => Poll::Ready(None),
None => {
self.reason = Some(TerminationReason::SubscriptionClosed);
Poll::Ready(None)
}
},
Poll::Pending => Poll::Pending,
}
Expand Down
66 changes: 58 additions & 8 deletions nats-extra/tests/request_many_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod request_many {

// request many with sentinel
let mut requests = client.subscribe("test").await.unwrap();
let responses = client
let mut responses = client
.request_many()
.sentinel(|msg| msg.payload.is_empty())
.send("test", "data".into())
Expand All @@ -45,12 +45,20 @@ mod request_many {
.await
.unwrap();

assert_eq!(responses.count().await, 100);
let mut count = 0;
while let Some(_) = responses.next().await {
count += 1;
}
assert_eq!(count, 100);
assert_eq!(
responses.termination_reason(),
Some(nats_extra::request_many::TerminationReason::Sentinel)
);
requests.unsubscribe().await.unwrap();

// request many with max messages
let mut requests = client.subscribe("test").await.unwrap();
let responses = client
let mut responses = client
.request_many()
.max_messages(20)
.send("test", "data".into())
Expand All @@ -66,12 +74,21 @@ mod request_many {
.unwrap();
}

assert_eq!(responses.count().await, 20);
let mut count = 0;
while let Some(_) = responses.next().await {
count += 1;
}
assert_eq!(count, 20);
let termination = responses.termination_reason();
assert_eq!(
termination,
Some(nats_extra::request_many::TerminationReason::MaxMessages)
);
requests.unsubscribe().await.unwrap();

// request many with stall
let mut requests = client.subscribe("test").await.unwrap();
let responses = client
let mut responses = client
.request_many()
.stall_wait(Duration::from_millis(100))
.send("test", "data".into())
Expand All @@ -94,11 +111,19 @@ mod request_many {
requests.unsubscribe().await.unwrap();
}
});
assert_eq!(responses.count().await, 50);
let mut count = 0;
while let Some(_) = responses.next().await {
count += 1;
}
assert_eq!(count, 50);
assert_eq!(
responses.termination_reason(),
Some(nats_extra::request_many::TerminationReason::StallWait)
);

// request many with max wait
let mut requests = client.subscribe("test").await.unwrap();
let responses = client
let mut responses = client
.request_many()
.max_wait(Some(Duration::from_secs(5)))
.send("test", "data".into())
Expand All @@ -120,6 +145,31 @@ mod request_many {
}
}
});
assert_eq!(responses.count().await, 20);
let mut count = 0;
while let Some(_) = responses.next().await {
count += 1;
}
assert_eq!(count, 20);
assert_eq!(
responses.termination_reason(),
Some(nats_extra::request_many::TerminationReason::MaxWait)
);

// no responders
let mut responses = client
.request_many()
.send("noone_listening", "data".into())
.await
.unwrap();

let mut count = 0;
while let Some(_) = responses.next().await {
count += 1;
}
assert_eq!(count, 0);
assert_eq!(
responses.termination_reason(),
Some(nats_extra::request_many::TerminationReason::NoResponders)
);
}
}

0 comments on commit 1d137ca

Please sign in to comment.