diff --git a/nats-extra/src/request_many.rs b/nats-extra/src/request_many.rs index 06166a4..a28d749 100644 --- a/nats-extra/src/request_many.rs +++ b/nats-extra/src/request_many.rs @@ -55,7 +55,7 @@ use std::{ time::Duration, }; -use async_nats::{subject::ToSubject, Client, RequestError, Subscriber}; +use async_nats::{subject::ToSubject, Client, RequestError, StatusCode, Subscriber}; use bytes::Bytes; use futures::{FutureExt, Stream, StreamExt}; @@ -247,6 +247,7 @@ impl RequestMany { sentinel: self.sentinel, max_messages: self.max_messags, stall_wait: self.stall_wait, + reason: None, }) } } @@ -260,6 +261,23 @@ pub struct Responses { sentinel: SentinelPredicate, max_messages: Option, stall_wait: Option, + reason: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum TerminationReason { + MaxMessages, + MaxWait, + StallWait, + Sentinel, + NoResponders, + SubscriptionClosed, +} + +impl Responses { + pub fn termination_reason(&self) -> Option { + self.reason.clone() + } } impl Stream for Responses { @@ -270,6 +288,7 @@ impl Stream for Responses { if let Some(timer) = self.timer.as_mut() { match timer.poll_unpin(cx) { Poll::Ready(_) => { + self.reason = Some(TerminationReason::MaxWait); return Poll::Ready(None); } Poll::Pending => {} @@ -279,6 +298,7 @@ impl Stream for Responses { // max_messages if let Some(max_messages) = self.max_messages { if self.messages_received >= max_messages { + self.reason = Some(TerminationReason::MaxMessages); return Poll::Ready(None); } } @@ -291,6 +311,7 @@ impl Stream for Responses { match stall.as_mut().poll_unpin(cx) { Poll::Ready(_) => { + self.reason = Some(TerminationReason::StallWait); return Poll::Ready(None); } Poll::Pending => {} @@ -300,6 +321,12 @@ impl Stream for Responses { match self.responses.poll_next_unpin(cx) { Poll::Ready(message) => match message { Some(message) => { + // no responders + if message.status == Some(StatusCode::NO_RESPONDERS) { + self.reason = Some(TerminationReason::NoResponders); + return Poll::Ready(None); + } + self.messages_received += 1; // reset timer @@ -309,6 +336,7 @@ impl Stream for Responses { match self.sentinel { Some(ref sentinel) => { if sentinel(&message) { + self.reason = Some(TerminationReason::Sentinel); Poll::Ready(None) } else { Poll::Ready(Some(message)) @@ -317,7 +345,10 @@ impl Stream for Responses { None => Poll::Ready(Some(message)), } } - None => Poll::Ready(None), + None => { + self.reason = Some(TerminationReason::SubscriptionClosed); + Poll::Ready(None) + } }, Poll::Pending => Poll::Pending, } diff --git a/nats-extra/tests/request_many_tests.rs b/nats-extra/tests/request_many_tests.rs index f3932ae..e5a2d71 100644 --- a/nats-extra/tests/request_many_tests.rs +++ b/nats-extra/tests/request_many_tests.rs @@ -25,7 +25,7 @@ mod request_many { // request many with sentinel let mut requests = client.subscribe("test").await.unwrap(); - let responses = client + let mut responses = client .request_many() .sentinel(|msg| msg.payload.is_empty()) .send("test", "data".into()) @@ -45,12 +45,20 @@ mod request_many { .await .unwrap(); - assert_eq!(responses.count().await, 100); + let mut count = 0; + while let Some(_) = responses.next().await { + count += 1; + } + assert_eq!(count, 100); + assert_eq!( + responses.termination_reason(), + Some(nats_extra::request_many::TerminationReason::Sentinel) + ); requests.unsubscribe().await.unwrap(); // request many with max messages let mut requests = client.subscribe("test").await.unwrap(); - let responses = client + let mut responses = client .request_many() .max_messages(20) .send("test", "data".into()) @@ -66,12 +74,21 @@ mod request_many { .unwrap(); } - assert_eq!(responses.count().await, 20); + let mut count = 0; + while let Some(_) = responses.next().await { + count += 1; + } + assert_eq!(count, 20); + let termination = responses.termination_reason(); + assert_eq!( + termination, + Some(nats_extra::request_many::TerminationReason::MaxMessages) + ); requests.unsubscribe().await.unwrap(); // request many with stall let mut requests = client.subscribe("test").await.unwrap(); - let responses = client + let mut responses = client .request_many() .stall_wait(Duration::from_millis(100)) .send("test", "data".into()) @@ -94,11 +111,19 @@ mod request_many { requests.unsubscribe().await.unwrap(); } }); - assert_eq!(responses.count().await, 50); + let mut count = 0; + while let Some(_) = responses.next().await { + count += 1; + } + assert_eq!(count, 50); + assert_eq!( + responses.termination_reason(), + Some(nats_extra::request_many::TerminationReason::StallWait) + ); // request many with max wait let mut requests = client.subscribe("test").await.unwrap(); - let responses = client + let mut responses = client .request_many() .max_wait(Some(Duration::from_secs(5))) .send("test", "data".into()) @@ -120,6 +145,31 @@ mod request_many { } } }); - assert_eq!(responses.count().await, 20); + let mut count = 0; + while let Some(_) = responses.next().await { + count += 1; + } + assert_eq!(count, 20); + assert_eq!( + responses.termination_reason(), + Some(nats_extra::request_many::TerminationReason::MaxWait) + ); + + // no responders + let mut responses = client + .request_many() + .send("noone_listening", "data".into()) + .await + .unwrap(); + + let mut count = 0; + while let Some(_) = responses.next().await { + count += 1; + } + assert_eq!(count, 0); + assert_eq!( + responses.termination_reason(), + Some(nats_extra::request_many::TerminationReason::NoResponders) + ); } }