Skip to content

Commit

Permalink
Fix Service::stop
Browse files Browse the repository at this point in the history
* Fix Service::stop not working

There were 2 issues in the stop system for services :
- On the first poll of the stream, the shutdown future was not polled,
  thus stopping the service would never wake this task
- After the shutdown future is resolved, the next poll will still poll
  the shutdown future, which is not allowed

* Add test for service stop

Signed-off-by: Tomasz Pietrek <[email protected]>
Co-authored-by: MATILLAT Quentin <[email protected]>
  • Loading branch information
tinou98 authored Nov 13, 2023
1 parent 0fface3 commit 1dbe9c3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
17 changes: 11 additions & 6 deletions async-nats/src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ impl Stream for Endpoint {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
trace!("polling for next request");
match self.shutdown_future.as_mut() {
Some(shutdown) => match shutdown.as_mut().poll(cx) {
if let Some(mut receiver) = self.shutdown.take() {
// Need to initialize `shutdown_future` on first poll
self.shutdown_future = Some(Box::pin(async move { receiver.recv().await }));
}

if let Some(shutdown) = self.shutdown_future.as_mut() {
match shutdown.as_mut().poll(cx) {
Poll::Ready(_result) => {
debug!("got stop broadcast");
self.requests
Expand All @@ -54,16 +59,16 @@ impl Stream for Endpoint {
max: None,
})
.ok();

// Clear future, can't be resumed after completion
self.shutdown_future = None;
}
Poll::Pending => {
trace!("stop broadcast still pending");
}
},
None => {
let mut receiver = self.shutdown.take().unwrap();
self.shutdown_future = Some(Box::pin(async move { receiver.recv().await }));
}
}

trace!("checking for new messages");
match self.requests.poll_next_unpin(cx) {
Poll::Ready(message) => {
Expand Down
18 changes: 18 additions & 0 deletions async-nats/tests/service_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,24 @@ mod service {
}
}

#[tokio::test]
async fn stop() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let service = client
.service_builder()
.start("service", "1.0.0")
.await
.unwrap();

let mut endpoint = service.endpoint("products").await.unwrap();

service.stop().await.unwrap();
client.publish("products", "data".into()).await.unwrap();
assert!(endpoint.next().await.is_none());
}

#[tokio::test]
#[cfg(not(target_os = "windows"))]
async fn cross_clients_tests() {
Expand Down

0 comments on commit 1dbe9c3

Please sign in to comment.