Skip to content

Commit

Permalink
Merge branch 'mk/sandbox_tests_timeout' into 'master'
Browse files Browse the repository at this point in the history
Chore: RUN-100: Make sandbox tests fail instead of hanging on panics

This MR attempts to improve our testing situation, where tests hang and timeout instead of failing in case where there is a panic in sandboxed wasm executor.

This MR probably doesn't cover all the places where channels should be dropped, but at least covers WasmExecutor::execute path. 

See merge request dfinity-lab/public/ic!19935
  • Loading branch information
Maciej Kot committed Jun 26, 2024
2 parents 235ea53 + bb10c35 commit 2174dae
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 12 deletions.
3 changes: 2 additions & 1 deletion rs/canister_sandbox/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn run_launcher(socket: std::os::unix::net::UnixStream, embedder_config_arg:
// Wrap it all up to handle frames received on socket.
let frame_handler = transport::Demux::<_, _, protocol::transport::ControllerToLauncher>::new(
Arc::new(rpc::ServerStub::new(svc, reply_out_stream)),
reply_handler,
reply_handler.clone(),
);

// Run RPC operations on the stream socket.
Expand All @@ -84,6 +84,7 @@ pub fn run_launcher(socket: std::os::unix::net::UnixStream, embedder_config_arg:
socket,
SocketReaderConfig::default(),
);
reply_handler.flush_with_errors();
}

#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion rs/canister_sandbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub fn run_canister_sandbox(
// RPC service offered by this binary.
let frame_handler = transport::Demux::<_, _, protocol::transport::ControllerToSandbox>::new(
Arc::new(rpc::ServerStub::new(svc, reply_out_stream)),
reply_handler,
reply_handler.clone(),
);

// It is fine if we fail to spawn this thread. Used for fault
Expand All @@ -249,4 +249,5 @@ pub fn run_canister_sandbox(
socket,
SocketReaderConfig::for_sandbox(),
);
reply_handler.flush_with_errors();
}
3 changes: 2 additions & 1 deletion rs/canister_sandbox/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub fn spawn_canister_sandbox_process_with_factory(
controller_service,
out.make_sink::<protocol::ctlsvc::Reply>(),
)),
reply_handler,
reply_handler.clone(),
);
transport::socket_read_messages::<_, _>(
move |message| {
Expand All @@ -110,6 +110,7 @@ pub fn spawn_canister_sandbox_process_with_factory(
socket,
SocketReaderConfig::default(),
);
reply_handler.flush_with_errors();
// If we the connection drops, but it is not terminated from
// our end, that implies that the sandbox process died. At
// that point we need to terminate replica as we have no way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type CompletionFunction = Box<dyn FnOnce(ExecId, CompletionResult) + Sync + Send
/// is presumed to be in progress on the sandbox process (it could
/// be that it is "about to be started" or that we have not received
/// and processed its completion yet).
struct ActiveExecutionState {
pub(crate) struct ActiveExecutionState {
/// Closure to call on completing execution. This will be set
/// on initialization, and cleared once the completion for this
/// execution has been called (it is not legal to receive two
Expand Down Expand Up @@ -99,6 +99,11 @@ impl ActiveExecutionStateRegistry {
None
}
}

pub(crate) fn take_all(&self) -> HashMap<ExecId, ActiveExecutionState> {
let mut mut_states = self.states.lock().unwrap();
std::mem::take(&mut *mut_states)
}
}

impl Default for ActiveExecutionStateRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ impl ControllerServiceImpl {
pub fn new(registry: Arc<ActiveExecutionStateRegistry>, log: ReplicaLogger) -> Arc<Self> {
Arc::new(ControllerServiceImpl { registry, log })
}

pub fn flush_with_errors(&self) {
let execs = self.registry.take_all();
for (_exec_id, entry) in execs {
// here we could do something like
// entry.completion(exec_id, CompletionResult::Finished(exec_failed_result));
// to pass the error to whoever is waiting, but since in the current setup we want to panic
// there anyway, it is sufficient to just drop the closure
// so the channel on which sandboxed_execution_controller is waiting will get disconnected
drop(entry)
}
}
}

impl ControllerService for ControllerServiceImpl {
Expand Down
17 changes: 10 additions & 7 deletions rs/canister_sandbox/src/replica_controller/launch_as_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crate::{
launcher_client_stub::LauncherClientStub,
launcher_service::LauncherService,
process::spawn_socketed_process,
protocol::ctllaunchersvc,
protocol::{
self,
launchersvc::{LaunchSandboxReply, LaunchSandboxRequest},
},
protocol::{ctllaunchersvc, ctlsvc},
rpc,
sandbox_client_stub::SandboxClientStub,
sandbox_service::SandboxService,
Expand Down Expand Up @@ -50,7 +50,7 @@ pub fn spawn_launcher_process(
controller_service,
out.make_sink::<protocol::ctllaunchersvc::Reply>(),
)),
reply_handler,
reply_handler.clone(),
);
transport::socket_read_messages::<_, _>(
move |message| {
Expand All @@ -59,6 +59,7 @@ pub fn spawn_launcher_process(
socket,
SocketReaderConfig::default(),
);
reply_handler.flush_with_errors();
});

Ok((svc, child_handle))
Expand All @@ -77,7 +78,7 @@ pub fn spawn_canister_sandbox_process(
exec_path: &str,
argv: &[String],
canister_id: CanisterId,
controller_service: Arc<dyn rpc::DemuxServer<ctlsvc::Request, ctlsvc::Reply> + Send + Sync>,
controller_service: Arc<super::controller_service_impl::ControllerServiceImpl>,
launcher: &dyn LauncherService,
) -> std::io::Result<(Arc<dyn SandboxService>, u32, std::thread::JoinHandle<()>)> {
let (sock_controller, sock_sandbox) = std::os::unix::net::UnixStream::pair()?;
Expand Down Expand Up @@ -108,10 +109,10 @@ pub fn spawn_canister_sandbox_process(
let thread_handle = std::thread::spawn(move || {
let demux = transport::Demux::<_, _, protocol::transport::SandboxToController>::new(
Arc::new(rpc::ServerStub::new(
controller_service,
Arc::clone(&controller_service) as Arc<_>,
out.make_sink::<protocol::ctlsvc::Reply>(),
)),
reply_handler,
reply_handler.clone(),
);
transport::socket_read_messages::<_, _>(
move |message| {
Expand All @@ -120,6 +121,8 @@ pub fn spawn_canister_sandbox_process(
socket,
SocketReaderConfig::default(),
);
reply_handler.flush_with_errors();
controller_service.flush_with_errors();
// Send a notification to the writer thread to stop.
// Otherwise, the writer thread will remain waiting forever.
out.stop();
Expand All @@ -130,7 +133,7 @@ pub fn spawn_canister_sandbox_process(

/// Spawns a sandbox process for the given canister.
pub fn create_sandbox_process(
controller_service: Arc<dyn rpc::DemuxServer<ctlsvc::Request, ctlsvc::Reply> + Send + Sync>,
controller_service: Arc<super::controller_service_impl::ControllerServiceImpl>,
launcher_service: &dyn LauncherService,
canister_id: CanisterId,
mut argv: Vec<String>,
Expand All @@ -142,7 +145,7 @@ pub fn create_sandbox_process(
&argv[0],
&argv[1..],
canister_id,
Arc::clone(&controller_service) as Arc<_>,
controller_service,
launcher_service,
)
.expect("Failed to start sandbox process");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,9 @@ impl WasmExecutor for SandboxedExecutionController {
.with_label_values(&[api_type_label])
.start_timer();
// Wait for completion.
let result = rx.recv().unwrap();
let result = rx
.recv()
.expect("Sandboxed_execution_controller reply channel closed unexpectedly");
drop(wait_timer);
let _finish_timer = self
.metrics
Expand Down

0 comments on commit 2174dae

Please sign in to comment.