Skip to content

Commit

Permalink
Reduce client loops and improve window handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nullchinchilla committed Mar 17, 2024
1 parent 47f71cf commit a5b892e
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 53 deletions.
173 changes: 159 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion binaries/geph5-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Client {
pub type CtxField<T> = fn(&AnyCtx<Config>) -> T;

async fn client_main(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
let _client_loops: Vec<_> = (0..8)
let _client_loops: Vec<_> = (0..1)
.map(|_| {
Immortal::respawn(
RespawnStrategy::JitterDelay(Duration::from_secs(1), Duration::from_secs(5)),
Expand Down
3 changes: 3 additions & 0 deletions binaries/geph5-client/src/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub async fn client_once(ctx: AnyCtx<Config>) -> anyhow::Result<()> {
.context("overall dial/mux/auth timeout")??;
client_inner(ctx, authed_pipe).await
}

#[tracing::instrument(skip_all, fields(remote=display(authed_pipe.remote_addr().unwrap_or("(none)"))))]
async fn client_inner(ctx: AnyCtx<Config>, authed_pipe: impl Pipe) -> anyhow::Result<()> {
let (read, write) = authed_pipe.split();
Expand Down Expand Up @@ -96,7 +97,9 @@ async fn client_inner(ctx: AnyCtx<Config>, authed_pipe: impl Pipe) -> anyhow::Re
let _ = send_back.send(stream);
}
Err(err) => {
tracing::warn!(remote_addr = display(&remote_addr), err = debug(&err), "session is dead, hot-potatoing the connection request to somebody else");
let _ = send_stop.try_send(err);
let _ = ctx.get(CONN_REQ_CHAN).0.try_send((remote_addr, send_back));
}
}
anyhow::Ok(())
Expand Down
13 changes: 13 additions & 0 deletions libraries/picomux/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ impl Frame {
}
}

/// Create an frame with the given stream ID, command, and body
pub fn new(stream_id: u32, command: u8, body: &[u8]) -> Self {
Self {
header: Header {
version: 1,
command,
body_len: body.len() as _,
stream_id,
},
body: Bytes::copy_from_slice(body),
}
}

/// Read a frame from an async reader.
pub async fn read(mut rdr: impl AsyncRead + Unpin) -> std::io::Result<Self> {
let mut header_buf = [0; std::mem::size_of::<Header>()];
Expand Down
Loading

0 comments on commit a5b892e

Please sign in to comment.