Skip to content

Commit

Permalink
poc: use bounded queue between pty and screen
Browse files Browse the repository at this point in the history
  • Loading branch information
kxt committed May 21, 2021
1 parent b767dad commit dacdc76
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 61 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions src/tests/fakes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::{HashMap, VecDeque};
use std::io::Write;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};

use zellij_utils::{nix, zellij_tile};
use zellij_utils::{crossbeam, nix, zellij_tile};

use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes};
use crate::tests::utils::commands::{QUIT, SLEEP};
Expand Down Expand Up @@ -83,9 +83,9 @@ pub struct FakeInputOutput {
possible_tty_inputs: HashMap<u16, Bytes>,
last_snapshot_time: Arc<Mutex<Instant>>,
send_instructions_to_client: SenderWithContext<ServerToClientMsg>,
receive_instructions_from_server: Arc<Mutex<mpsc::Receiver<(ServerToClientMsg, ErrorContext)>>>,
receive_instructions_from_server: Arc<Mutex<crossbeam::channel::Receiver<(ServerToClientMsg, ErrorContext)>>>,
send_instructions_to_server: SenderWithContext<ClientToServerMsg>,
receive_instructions_from_client: Arc<Mutex<mpsc::Receiver<(ClientToServerMsg, ErrorContext)>>>,
receive_instructions_from_client: Arc<Mutex<crossbeam::channel::Receiver<(ClientToServerMsg, ErrorContext)>>>,
should_trigger_sigwinch: Arc<(Mutex<bool>, Condvar)>,
sigwinch_event: Option<PositionAndSize>,
}
Expand All @@ -96,10 +96,10 @@ impl FakeInputOutput {
let last_snapshot_time = Arc::new(Mutex::new(Instant::now()));
let stdout_writer = FakeStdoutWriter::new(last_snapshot_time.clone());
let (client_sender, client_receiver): ChannelWithContext<ServerToClientMsg> =
mpsc::channel();
crossbeam::channel::unbounded();
let send_instructions_to_client = SenderWithContext::new(SenderType::Sender(client_sender));
let (server_sender, server_receiver): ChannelWithContext<ClientToServerMsg> =
mpsc::channel();
crossbeam::channel::unbounded();
let send_instructions_to_server = SenderWithContext::new(SenderType::Sender(server_sender));
win_sizes.insert(0, winsize); // 0 is the current terminal
FakeInputOutput {
Expand Down
10 changes: 5 additions & 5 deletions zellij-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::env::current_exe;
use std::io::{self, Write};
use std::path::Path;
use std::process::Command;
use std::sync::mpsc;
use std::thread;

use crate::{
Expand All @@ -16,12 +15,13 @@ use crate::{
};
use zellij_utils::cli::CliArgs;
use zellij_utils::{
channels::{SenderType, SenderWithContext, SyncChannelWithContext},
channels::{SenderType, SenderWithContext, ChannelWithContext},
consts::ZELLIJ_IPC_PIPE,
errors::{ClientContext, ContextType, ErrorInstruction},
input::config::Config,
input::options::Options,
ipc::{ClientAttributes, ClientToServerMsg, ServerToClientMsg},
crossbeam,
};

/// Instructions related to the client-side application
Expand Down Expand Up @@ -120,11 +120,11 @@ pub fn start_client(mut os_input: Box<dyn ClientOsApi>, opts: CliArgs, config: C
.write(bracketed_paste.as_bytes())
.unwrap();

let (send_client_instructions, receive_client_instructions): SyncChannelWithContext<
let (send_client_instructions, receive_client_instructions): ChannelWithContext<
ClientInstruction,
> = mpsc::sync_channel(50);
> = crossbeam::channel::bounded(50);
let send_client_instructions =
SenderWithContext::new(SenderType::SyncSender(send_client_instructions));
SenderWithContext::new(SenderType::Sender(send_client_instructions));

#[cfg(not(any(feature = "test", test)))]
std::panic::set_hook({
Expand Down
1 change: 1 addition & 0 deletions zellij-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ unicode-width = "0.1.8"
wasmer = "1.0.0"
wasmer-wasi = "1.0.0"
zellij-utils = { path = "../zellij-utils/", version = "0.12.0" }
crossbeam = "0.8.0"

[dev-dependencies]
insta = "1.6.0"
Expand Down
24 changes: 14 additions & 10 deletions zellij-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zellij_utils::zellij_tile;

use std::sync::{Arc, RwLock};
use std::thread;
use std::{path::PathBuf, sync::mpsc};
use std::path::PathBuf;
use wasmer::Store;
use zellij_tile::data::PluginCapabilities;

Expand All @@ -27,7 +27,8 @@ use crate::{
};
use route::route_thread_main;
use zellij_utils::{
channels::{ChannelWithContext, SenderType, SenderWithContext, SyncChannelWithContext},
channels::{ChannelWithContext, SenderType, SenderWithContext},
crossbeam,
cli::CliArgs,
errors::{ContextType, ErrorInstruction, ServerContext},
input::options::Options,
Expand Down Expand Up @@ -104,9 +105,9 @@ pub fn start_server(os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {

std::env::set_var(&"ZELLIJ", "0");

let (to_server, server_receiver): SyncChannelWithContext<ServerInstruction> =
mpsc::sync_channel(50);
let to_server = SenderWithContext::new(SenderType::SyncSender(to_server));
let (to_server, server_receiver): ChannelWithContext<ServerInstruction> =
crossbeam::channel::bounded(50);
let to_server = SenderWithContext::new(SenderType::Sender(to_server));
let sessions: Arc<RwLock<Option<SessionMetaData>>> = Arc::new(RwLock::new(None));

#[cfg(not(any(feature = "test", test)))]
Expand Down Expand Up @@ -221,12 +222,15 @@ fn init_session(
to_server: SenderWithContext<ServerInstruction>,
client_attributes: ClientAttributes,
) -> SessionMetaData {
let (to_screen, screen_receiver): ChannelWithContext<ScreenInstruction> = mpsc::channel();
let (to_screen, screen_receiver): ChannelWithContext<ScreenInstruction> = crossbeam::channel::unbounded();
let to_screen = SenderWithContext::new(SenderType::Sender(to_screen));

let (to_plugin, plugin_receiver): ChannelWithContext<PluginInstruction> = mpsc::channel();
let (to_screen_pty, screen_receiver_pty): ChannelWithContext<ScreenInstruction> = crossbeam::channel::bounded(100);
let to_screen_pty = SenderWithContext::new(SenderType::Sender(to_screen_pty));

let (to_plugin, plugin_receiver): ChannelWithContext<PluginInstruction> = crossbeam::channel::unbounded();
let to_plugin = SenderWithContext::new(SenderType::Sender(to_plugin));
let (to_pty, pty_receiver): ChannelWithContext<PtyInstruction> = mpsc::channel();
let (to_pty, pty_receiver): ChannelWithContext<PtyInstruction> = crossbeam::channel::unbounded();
let to_pty = SenderWithContext::new(SenderType::Sender(to_pty));

// Determine and initialize the data directory
Expand Down Expand Up @@ -258,7 +262,7 @@ fn init_session(
let pty = Pty::new(
Bus::new(
pty_receiver,
Some(&to_screen),
Some(&to_screen_pty),
None,
Some(&to_plugin),
Some(&to_server),
Expand All @@ -285,7 +289,7 @@ fn init_session(
let max_panes = opts.max_panes;

move || {
screen_thread_main(screen_bus, max_panes, client_attributes, config_options);
screen_thread_main(screen_bus, screen_receiver_pty, max_panes, client_attributes, config_options);
}
})
.unwrap();
Expand Down
23 changes: 10 additions & 13 deletions zellij-server/src/pty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ async fn deadline_read(
}
}

async fn async_send_to_screen(senders: ThreadSenders, screen_instruction: ScreenInstruction) {
task::spawn_blocking(move || senders.send_to_screen(screen_instruction)).await.unwrap()
}

fn stream_terminal_bytes(
pid: RawFd,
senders: ThreadSenders,
Expand All @@ -171,36 +175,29 @@ fn stream_terminal_bytes(
match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await {
ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error
ReadResult::Timeout => {
let _ = senders.send_to_screen(ScreenInstruction::Render);
async_send_to_screen(senders.clone(), ScreenInstruction::Render).await;
// next read does not need a deadline as we just rendered everything
render_deadline = None;

// yield so Screen thread has some time to render before send additional
// PtyBytes.
task::sleep(Duration::from_millis(10)).await;
}
ReadResult::Ok(n_bytes) => {
let bytes = &buf[..n_bytes];
let bytes = buf[..n_bytes].to_vec();
if debug {
let _ = debug_to_file(bytes, pid);
let _ = debug_to_file(&bytes, pid);
}
let _ = senders
.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes.to_vec()));
async_send_to_screen(senders.clone(), ScreenInstruction::PtyBytes(pid, bytes)).await;
// if we already have a render_deadline we keep it, otherwise we set it
// to the duration of `render_pause`.
render_deadline.get_or_insert(Instant::now() + render_pause);
}
}
}
let _ = senders.send_to_screen(ScreenInstruction::Render);
async_send_to_screen(senders.clone(), ScreenInstruction::Render).await;

#[cfg(not(any(feature = "test", test)))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data
// a better solution would be to fix the test fakes, but this will do for now
senders
.send_to_screen(ScreenInstruction::ClosePane(PaneId::Terminal(pid)))
.unwrap();
async_send_to_screen(senders, ScreenInstruction::ClosePane(PaneId::Terminal(pid))).await;
}
})
}
Expand Down
18 changes: 14 additions & 4 deletions zellij-server/src/screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::BTreeMap;
use std::os::unix::io::RawFd;
use std::str;
use zellij_utils::{crossbeam, errors::ErrorContext};

use zellij_utils::zellij_tile;

Expand Down Expand Up @@ -126,6 +127,7 @@ impl From<&ScreenInstruction> for ScreenContext {
pub(crate) struct Screen {
/// A Bus for sending and receiving messages with the other threads.
pub bus: Bus<ScreenInstruction>,
pty_receiver: crossbeam::channel::Receiver<(ScreenInstruction, ErrorContext)>,
/// An optional maximal amount of panes allowed per [`Tab`] in this [`Screen`] instance.
max_panes: Option<usize>,
/// A map between this [`Screen`]'s tabs and their ID/key.
Expand All @@ -143,13 +145,15 @@ impl Screen {
/// Creates and returns a new [`Screen`].
pub fn new(
bus: Bus<ScreenInstruction>,
pty_receiver: crossbeam::channel::Receiver<(ScreenInstruction, ErrorContext)>,
client_attributes: &ClientAttributes,
max_panes: Option<usize>,
mode_info: ModeInfo,
input_mode: InputMode,
) -> Self {
Screen {
bus,
pty_receiver,
max_panes,
position_and_size: client_attributes.position_and_size,
colors: client_attributes.palette,
Expand Down Expand Up @@ -387,6 +391,7 @@ impl Screen {
#[allow(clippy::boxed_local)]
pub(crate) fn screen_thread_main(
bus: Bus<ScreenInstruction>,
pty_receiver: crossbeam::channel::Receiver<(ScreenInstruction, ErrorContext)>,
max_panes: Option<usize>,
client_attributes: ClientAttributes,
config_options: Box<Options>,
Expand All @@ -395,6 +400,7 @@ pub(crate) fn screen_thread_main(

let mut screen = Screen::new(
bus,
pty_receiver.clone(),
&client_attributes,
max_panes,
ModeInfo {
Expand All @@ -406,11 +412,15 @@ pub(crate) fn screen_thread_main(
},
InputMode::Normal,
);
let mut sel = crossbeam::channel::Select::new();
let recvers = [pty_receiver, screen.bus.receiver.clone()];
for r in &recvers {
sel.recv(r);
}
loop {
let (event, mut err_ctx) = screen
.bus
.recv()
.expect("failed to receive event on channel");
let oper = sel.select();
let r = &recvers[oper.index()];
let (event, mut err_ctx) = oper.recv(r).unwrap();
err_ctx.add_call(ContextType::Screen((&event).into()));
match event {
ScreenInstruction::PtyBytes(pid, vte_bytes) => {
Expand Down
17 changes: 8 additions & 9 deletions zellij-server/src/thread_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::{
os_input_output::ServerOsApi, pty::PtyInstruction, screen::ScreenInstruction,
wasm_vm::PluginInstruction, ServerInstruction,
};
use std::sync::mpsc;
use zellij_utils::{channels::SenderWithContext, errors::ErrorContext};
use zellij_utils::{channels::SenderWithContext, crossbeam, errors::ErrorContext};

/// A container for senders to the different threads in zellij on the server side
#[derive(Clone)]
Expand All @@ -20,42 +19,42 @@ impl ThreadSenders {
pub fn send_to_screen(
&self,
instruction: ScreenInstruction,
) -> Result<(), mpsc::SendError<(ScreenInstruction, ErrorContext)>> {
) -> Result<(), crossbeam::channel::SendError<(ScreenInstruction, ErrorContext)>> {
self.to_screen.as_ref().unwrap().send(instruction)
}

pub fn send_to_pty(
&self,
instruction: PtyInstruction,
) -> Result<(), mpsc::SendError<(PtyInstruction, ErrorContext)>> {
) -> Result<(), crossbeam::channel::SendError<(PtyInstruction, ErrorContext)>> {
self.to_pty.as_ref().unwrap().send(instruction)
}

pub fn send_to_plugin(
&self,
instruction: PluginInstruction,
) -> Result<(), mpsc::SendError<(PluginInstruction, ErrorContext)>> {
) -> Result<(), crossbeam::channel::SendError<(PluginInstruction, ErrorContext)>> {
self.to_plugin.as_ref().unwrap().send(instruction)
}

pub fn send_to_server(
&self,
instruction: ServerInstruction,
) -> Result<(), mpsc::SendError<(ServerInstruction, ErrorContext)>> {
) -> Result<(), crossbeam::channel::SendError<(ServerInstruction, ErrorContext)>> {
self.to_server.as_ref().unwrap().send(instruction)
}
}

/// A container for a receiver, OS input and the senders to a given thread
pub(crate) struct Bus<T> {
pub receiver: mpsc::Receiver<(T, ErrorContext)>,
pub receiver: crossbeam::channel::Receiver<(T, ErrorContext)>,
pub senders: ThreadSenders,
pub os_input: Option<Box<dyn ServerOsApi>>,
}

impl<T> Bus<T> {
pub fn new(
receiver: mpsc::Receiver<(T, ErrorContext)>,
receiver: crossbeam::channel::Receiver<(T, ErrorContext)>,
to_screen: Option<&SenderWithContext<ScreenInstruction>>,
to_pty: Option<&SenderWithContext<PtyInstruction>>,
to_plugin: Option<&SenderWithContext<PluginInstruction>>,
Expand All @@ -74,7 +73,7 @@ impl<T> Bus<T> {
}
}

pub fn recv(&self) -> Result<(T, ErrorContext), mpsc::RecvError> {
pub fn recv(&self) -> Result<(T, ErrorContext), crossbeam::channel::RecvError> {
self.receiver.recv()
}
}
Loading

0 comments on commit dacdc76

Please sign in to comment.