diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index b3ae00815..4bd26191d 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -60,15 +60,25 @@ pub async fn task( active_deployment_getter.clone(), runtime_manager.clone(), ); - let cleanup = move |response: SubscribeStopResponse| { + let cleanup = move |response: Option| { debug!(response = ?response, "stop client response: "); - match StopReason::from_i32(response.reason).unwrap_or_default() { - StopReason::Request => stopped_cleanup(&id), - StopReason::End => completed_cleanup(&id), - StopReason::Crash => { - crashed_cleanup(&id, Error::Run(anyhow::Error::msg(response.message).into())) + if let Some(response) = response { + match StopReason::from_i32(response.reason).unwrap_or_default() { + StopReason::Request => stopped_cleanup(&id), + StopReason::End => completed_cleanup(&id), + StopReason::Crash => crashed_cleanup( + &id, + Error::Run(anyhow::Error::msg(response.message).into()), + ), } + } else { + crashed_cleanup( + &id, + Error::Runtime(anyhow::anyhow!( + "stop subscribe channel stopped unexpectedly" + )), + ) } }; let runtime_manager = runtime_manager.clone(); @@ -188,7 +198,7 @@ impl Built { runtime_manager: Arc>, deployment_updater: impl DeploymentUpdater, kill_old_deployments: impl futures::Future>, - cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static, + cleanup: impl FnOnce(Option) + Send + 'static, ) -> Result<()> { // For alpha this is the path to the users project with an embedded runtime. // For shuttle-next this is the path to the compiled .wasm file, which will be @@ -343,7 +353,7 @@ async fn run( mut runtime_client: RuntimeClient>>, address: SocketAddr, deployment_updater: impl DeploymentUpdater, - cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static, + cleanup: impl FnOnce(Option) + Send + 'static, ) { deployment_updater .set_address(&id, &address) @@ -369,15 +379,15 @@ async fn run( info!(response = ?response.into_inner(), "start client response: "); // Wait for stop reason - let reason = stream.message().await.unwrap().unwrap(); + let reason = stream.message().await.expect("message from tonic stream"); cleanup(reason); } Err(ref status) if status.code() == Code::InvalidArgument => { - cleanup(SubscribeStopResponse { + cleanup(Some(SubscribeStopResponse { reason: StopReason::Crash as i32, message: status.to_string(), - }); + })); } Err(ref status) => { start_crashed_cleanup( @@ -534,12 +544,15 @@ mod tests { let runtime_manager = get_runtime_manager(); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |response: SubscribeStopResponse| match ( - StopReason::from_i32(response.reason).unwrap(), - response.message, - ) { - (StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(), - _ => panic!("expected stop due to request"), + let handle_cleanup = |response: Option| { + let response = response.unwrap(); + match ( + StopReason::from_i32(response.reason).unwrap(), + response.message, + ) { + (StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(), + _ => panic!("expected stop due to request"), + } }; built @@ -574,12 +587,15 @@ mod tests { let runtime_manager = get_runtime_manager(); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |response: SubscribeStopResponse| match ( - StopReason::from_i32(response.reason).unwrap(), - response.message, - ) { - (StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(), - _ => panic!("expected stop due to self end"), + let handle_cleanup = |response: Option| { + let response = response.unwrap(); + match ( + StopReason::from_i32(response.reason).unwrap(), + response.message, + ) { + (StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(), + _ => panic!("expected stop due to self end"), + } }; built @@ -611,14 +627,17 @@ mod tests { let runtime_manager = get_runtime_manager(); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |response: SubscribeStopResponse| match ( - StopReason::from_i32(response.reason).unwrap(), - response.message, - ) { - (StopReason::Crash, mes) if mes.contains("panic in bind") => { - cleanup_send.send(()).unwrap() + let handle_cleanup = |response: Option| { + let response = response.unwrap(); + match ( + StopReason::from_i32(response.reason).unwrap(), + response.message, + ) { + (StopReason::Crash, mes) if mes.contains("panic in bind") => { + cleanup_send.send(()).unwrap() + } + (_, mes) => panic!("expected stop due to crash: {mes}"), } - (_, mes) => panic!("expected stop due to crash: {mes}"), }; built diff --git a/deployer/src/runtime_manager.rs b/deployer/src/runtime_manager.rs index 7199f664c..80e769ef7 100644 --- a/deployer/src/runtime_manager.rs +++ b/deployer/src/runtime_manager.rs @@ -154,6 +154,8 @@ impl RuntimeManager { let stop_request = tonic::Request::new(StopRequest {}); let response = runtime_client.stop(stop_request).await.unwrap(); + trace!(?response, "stop deployment response"); + let result = response.into_inner().success; let _ = process.start_kill(); diff --git a/runtime/src/alpha/mod.rs b/runtime/src/alpha/mod.rs index 828ee421e..1c188c894 100644 --- a/runtime/src/alpha/mod.rs +++ b/runtime/src/alpha/mod.rs @@ -42,7 +42,7 @@ use tonic::{ Request, Response, Status, }; use tower::ServiceBuilder; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; use crate::{provisioner_factory::ProvisionerFactory, Logger}; @@ -378,7 +378,9 @@ where Ok(Response::new(StopResponse { success: true })) } else { - Err(Status::internal("failed to stop deployment")) + warn!("failed to stop deployment"); + + Ok(tonic::Response::new(StopResponse { success: false })) } } diff --git a/runtime/src/next/mod.rs b/runtime/src/next/mod.rs index 7a6a6bbce..bc6a0ac5c 100644 --- a/runtime/src/next/mod.rs +++ b/runtime/src/next/mod.rs @@ -17,14 +17,14 @@ use hyper::{Body, Request, Response}; use shuttle_common::wasm::{Bytesable, Log, RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{ - self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse, - SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse, + self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopReason, StopRequest, + StopResponse, SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse, }; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; use wasi_common::file::FileCaps; use wasmtime::{Engine, Linker, Module, Store}; use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; @@ -45,6 +45,7 @@ pub struct AxumWasm { logs_rx: Mutex>>>, logs_tx: Sender>, kill_tx: Mutex>>, + stopped_tx: broadcast::Sender<(StopReason, String)>, } impl AxumWasm { @@ -57,11 +58,14 @@ impl AxumWasm { // seems acceptable so going with double the number for some headroom let (tx, rx) = mpsc::channel(1 << 15); + let (stopped_tx, _stopped_rx) = broadcast::channel(10); + Self { router: Mutex::new(None), logs_rx: Mutex::new(Some(rx)), logs_tx: tx, kill_tx: Mutex::new(None), + stopped_tx, } } } @@ -122,7 +126,11 @@ impl Runtime for AxumWasm { .context("tried to start a service that was not loaded") .map_err(|err| Status::internal(err.to_string()))?; - tokio::spawn(run_until_stopped(router, address, logs_tx, kill_rx)); + let stopped_tx = self.stopped_tx.clone(); + + tokio::spawn(run_until_stopped( + router, address, logs_tx, kill_rx, stopped_tx, + )); let message = StartResponse { success: true }; @@ -160,9 +168,9 @@ impl Runtime for AxumWasm { Ok(tonic::Response::new(StopResponse { success: true })) } else { - Err(Status::internal( - "trying to stop a service that was not started", - )) + warn!("trying to stop a service that was not started"); + + Ok(tonic::Response::new(StopResponse { success: false })) } } @@ -172,8 +180,21 @@ impl Runtime for AxumWasm { &self, _request: tonic::Request, ) -> Result, Status> { - // Next does not really have a stopped state. Endpoints are loaded if and when needed until a request is done - let (_tx, rx) = mpsc::channel(1); + let mut stopped_rx = self.stopped_tx.subscribe(); + let (tx, rx) = mpsc::channel(1); + + // Move the stop channel into a stream to be returned + tokio::spawn(async move { + trace!("moved stop channel into thread"); + while let Ok((reason, message)) = stopped_rx.recv().await { + tx.send(Ok(SubscribeStopResponse { + reason: reason as i32, + message, + })) + .await + .unwrap(); + } + }); Ok(tonic::Response::new(ReceiverStream::new(rx))) } @@ -354,6 +375,7 @@ async fn run_until_stopped( address: SocketAddr, logs_tx: Sender>, kill_rx: tokio::sync::oneshot::Receiver, + stopped_tx: broadcast::Sender<(StopReason, String)>, ) { let make_service = make_service_fn(move |_conn| { let router = router.clone(); @@ -383,12 +405,21 @@ async fn run_until_stopped( trace!("starting hyper server on: {}", &address); tokio::select! { _ = server => { + stopped_tx.send((StopReason::End, String::new())).unwrap(); trace!("axum wasm server stopped"); }, message = kill_rx => { match message { - Ok(msg) => trace!("{msg}"), - Err(_) => trace!("the sender dropped") + Ok(msg) =>{ + stopped_tx.send((StopReason::Request, String::new())).unwrap(); + trace!("{msg}") + } , + Err(_) => { + stopped_tx + .send((StopReason::Crash, "the kill sender dropped".to_string())) + .unwrap(); + trace!("the sender dropped") + } } } };