Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(router): gracefully handle client crashes #1710

Merged
merged 2 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn assert_socket(name: &str) -> bool {
match LocalSocketStream::connect(path) {
Ok(stream) => {
let mut sender = IpcSenderWithContext::new(stream);
sender.send(ClientToServerMsg::ConnStatus);
let _ = sender.send(ClientToServerMsg::ConnStatus);
let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver();
match receiver.recv() {
Some((ServerToClientMsg::Connected, _)) => true,
Expand Down Expand Up @@ -115,7 +115,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) {
Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
},
Err(e) => {
eprintln!("Error occurred: {:?}", e);
Expand Down
4 changes: 3 additions & 1 deletion zellij-client/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ impl ClientOsApi for ClientOsInputOutput {
}

fn send_to_server(&self, msg: ClientToServerMsg) {
self.send_instructions_to_server
// TODO: handle the error here, right now we silently ignore it
let _ = self
.send_instructions_to_server
.lock()
.unwrap()
.as_mut()
Expand Down
2 changes: 1 addition & 1 deletion zellij-client/src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) {
Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
},
Err(e) => {
eprintln!("Error occurred: {:?}", e);
Expand Down
50 changes: 40 additions & 10 deletions zellij-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ macro_rules! remove_client {
};
}

macro_rules! send_to_client {
($client_id:expr, $os_input:expr, $msg:expr, $session_state:expr) => {
let send_to_client_res = $os_input.send_to_client($client_id, $msg);
if let Err(_) = send_to_client_res {
// failed to send to client, remove it
remove_client!($client_id, $os_input, $session_state);
}
};
}

#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SessionState {
clients: HashMap<ClientId, Option<Size>>,
Expand Down Expand Up @@ -392,15 +402,26 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
Event::ModeUpdate(mode_info),
))
.unwrap();
os_input.send_to_client(client_id, ServerToClientMsg::SwitchToMode(mode));
send_to_client!(
client_id,
os_input,
ServerToClientMsg::SwitchToMode(mode),
session_state
);
},
ServerInstruction::UnblockInputThread => {
for client_id in session_state.read().unwrap().clients.keys() {
os_input.send_to_client(*client_id, ServerToClientMsg::UnblockInputThread);
send_to_client!(
*client_id,
os_input,
ServerToClientMsg::UnblockInputThread,
session_state
);
}
},
ServerInstruction::ClientExit(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ =
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size() {
session_data
Expand Down Expand Up @@ -465,14 +486,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::KillSession => {
let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
}
break;
},
ServerInstruction::DetachSession(client_ids) => {
for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size()
{
Expand Down Expand Up @@ -509,14 +532,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
// If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane.
if let Some(output) = &serialized_output {
for (client_id, client_render_instruction) in output.iter() {
os_input.send_to_client(
send_to_client!(
*client_id,
os_input,
ServerToClientMsg::Render(client_render_instruction.clone()),
session_state
);
}
} else {
for client_id in client_ids {
os_input
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
}
Expand All @@ -526,7 +551,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::Error(backtrace) => {
let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
os_input.send_to_client(
let _ = os_input.send_to_client(
client_id,
ServerToClientMsg::Exit(ExitReason::Error(backtrace.clone())),
);
Expand All @@ -535,7 +560,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
break;
},
ServerInstruction::ConnStatus(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Connected);
let _ = os_input.send_to_client(client_id, ServerToClientMsg::Connected);
remove_client!(client_id, os_input, session_state);
},
ServerInstruction::ActiveClients(client_id) => {
Expand All @@ -545,7 +570,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
client_ids,
client_id
);
os_input.send_to_client(client_id, ServerToClientMsg::ActiveClients(client_ids));
send_to_client!(
client_id,
os_input,
ServerToClientMsg::ActiveClients(client_ids),
session_state
);
},
}
}
Expand Down
16 changes: 13 additions & 3 deletions zellij-server/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ pub trait ServerOsApi: Send + Sync {
fn force_kill(&self, pid: Pid) -> Result<(), nix::Error>;
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>;
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg);
fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str>;
fn new_client(
&mut self,
client_id: ClientId,
Expand Down Expand Up @@ -373,9 +377,15 @@ impl ServerOsApi for ServerOsInputOutput {
let _ = kill(pid, Some(Signal::SIGKILL));
Ok(())
}
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg) {
fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str> {
if let Some(sender) = self.client_senders.lock().unwrap().get_mut(&client_id) {
sender.send(msg);
sender.send(msg)
} else {
Ok(())
}
}
fn new_client(
Expand Down
9 changes: 7 additions & 2 deletions zellij-server/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,15 @@ pub(crate) fn route_thread_main(
let client_id = maybe_client_id.unwrap_or(client_id);
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
if let Action::SwitchToMode(input_mode) = action {
os_input.send_to_client(
let send_res = os_input.send_to_client(
client_id,
ServerToClientMsg::SwitchToMode(input_mode),
);
if send_res.is_err() {
let _ = to_server
.send(ServerInstruction::RemoveClient(client_id));
return true;
}
}
if route_action(
action,
Expand Down Expand Up @@ -642,7 +647,7 @@ pub(crate) fn route_thread_main(
},
None => {
log::error!("Received empty message from client");
os_input.send_to_client(
let _ = os_input.send_to_client(
client_id,
ServerToClientMsg::Exit(ExitReason::Error(
"Received empty message".to_string(),
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/tab/unit/tab_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/tab/unit/tab_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/unit/screen_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
14 changes: 9 additions & 5 deletions zellij-utils/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ impl<T: Serialize> IpcSenderWithContext<T> {
}

/// Sends an event, along with the current [`ErrorContext`], on this [`IpcSenderWithContext`]'s socket.
pub fn send(&mut self, msg: T) {
pub fn send(&mut self, msg: T) -> Result<(), &'static str> {
let err_ctx = get_current_ctx();
rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).unwrap();
// TODO: unwrapping here can cause issues when the server disconnects which we don't mind
// do we need to handle errors here in other cases?
let _ = self.sender.flush();
if rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).is_err() {
Err("Failed to send message to client")
} else {
// TODO: unwrapping here can cause issues when the server disconnects which we don't mind
// do we need to handle errors here in other cases?
let _ = self.sender.flush();
Ok(())
}
}

/// Returns an [`IpcReceiverWithContext`] with the same socket as this sender.
Expand Down