diff --git a/Cargo.lock b/Cargo.lock index 845e953..7a476ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.4" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -107,12 +107,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bitflags" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" - [[package]] name = "bytes" version = "1.4.0" @@ -201,40 +195,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "env_logger" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - -[[package]] -name = "errno" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" -dependencies = [ - "errno-dragonfly", - "libc", - "windows-sys", -] - -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "getrandom" version = "0.2.10" @@ -264,23 +224,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - -[[package]] -name = "is-terminal" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" -dependencies = [ - "hermit-abi", - "rustix", - "windows-sys", -] - [[package]] name = "itoa" version = "1.0.9" @@ -288,16 +231,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] -name = "libc" -version = "0.2.147" +name = "lazy_static" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] -name = "linux-raw-sys" -version = "0.4.5" +name = "libc" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "lock_api" @@ -315,6 +258,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.5.0" @@ -341,6 +293,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -366,6 +328,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -391,9 +359,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "proc-macro2" @@ -420,13 +388,14 @@ dependencies = [ "anyhow", "clap", "directories", - "env_logger", - "log", + "pin-project-lite", "serde", "serde_derive", "serde_json", "tokio", "toml", + "tracing", + "tracing-subscriber", ] [[package]] @@ -435,7 +404,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags 1.3.2", + "bitflags", ] [[package]] @@ -444,7 +413,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags 1.3.2", + "bitflags", ] [[package]] @@ -460,51 +429,53 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.3" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.3.7", + "regex-syntax 0.7.5", ] [[package]] name = "regex-automata" -version = "0.3.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.5", ] [[package]] name = "regex-syntax" -version = "0.7.4" +version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] -name = "rustc-demangle" -version = "0.1.23" +name = "regex-syntax" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] -name = "rustix" -version = "0.38.8" +name = "rustc-demangle" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" -dependencies = [ - "bitflags 2.4.0", - "errno", - "libc", - "linux-raw-sys", - "windows-sys", -] +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "ryu" @@ -549,6 +520,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -591,15 +571,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "termcolor" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.47" @@ -620,6 +591,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tokio" version = "1.32.0" @@ -659,6 +640,67 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.11" @@ -671,6 +713,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -693,15 +741,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index f9409ac..1b97cc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,11 @@ edition = "2021" anyhow = "1.0.53" clap = { version = "4.3.0", features = ["derive", "env"] } directories = "4.0.1" -env_logger = "0.10.0" -log = "0.4.14" +pin-project-lite = "0.2.13" serde = { version = "1.0.186" } serde_derive = { version = "1.0.186" } serde_json = "1.0.78" tokio = { version = "1.15.0", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "sync", "time"] } toml = "0.5.8" +tracing = "0.1.39" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..64d94de --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +group_imports = "StdExternalCrate" +imports_granularity = "Module" diff --git a/src/client.rs b/src/client.rs index 0c105fe..1c586b5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,9 +1,52 @@ -use anyhow::{Context, Result}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::{Context as _, Result}; +use pin_project_lite::pin_project; use ra_multiplex::config::Config; use ra_multiplex::proto; -use tokio::io::AsyncWriteExt; +use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; -use tokio::{io, task}; + +pin_project! { + struct Stdio { + #[pin] + stdin: io::Stdin, + #[pin] + stdout: io::Stdout, + } +} + +fn stdio() -> Stdio { + Stdio { + stdin: io::stdin(), + stdout: io::stdout(), + } +} + +impl AsyncRead for Stdio { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut io::ReadBuf, + ) -> Poll> { + self.project().stdin.poll_read(cx, buf) + } +} + +impl AsyncWrite for Stdio { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + self.project().stdout.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().stdout.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().stdout.poll_shutdown(cx) + } +} pub async fn main(server_path: String, server_args: Vec) -> Result<()> { let config = Config::load_or_default().await; @@ -12,31 +55,18 @@ pub async fn main(server_path: String, server_args: Vec) -> Result<()> { let mut proto_init = serde_json::to_vec(&proto_init).context("sending proto init")?; proto_init.push(b'\0'); - let stream = TcpStream::connect(config.connect) + let mut stream = TcpStream::connect(config.connect) .await .context("connect")?; - let (mut read_stream, mut write_stream) = stream.into_split(); - write_stream + stream .write_all(&proto_init) .await .context("sending proto init")?; drop(proto_init); - let t1 = task::spawn(async move { - io::copy(&mut read_stream, &mut io::stdout()) - .await - .context("io error") - }); - let t2 = task::spawn(async move { - io::copy(&mut io::stdin(), &mut write_stream) - .await - .context("io error") - }); - tokio::select! { - res = t1 => res, - res = t2 => res, - } - .context("join")??; + io::copy_bidirectional(&mut stream, &mut stdio()) + .await + .context("io error")?; Ok(()) } diff --git a/src/config.rs b/src/config.rs index 70792de..e731b12 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,13 @@ +use std::net::{IpAddr, Ipv4Addr}; + use anyhow::Context; use directories::ProjectDirs; use serde::de::{Error, Unexpected}; use serde::{Deserialize, Deserializer}; use serde_derive::{Deserialize, Serialize}; -use std::net::{IpAddr, Ipv4Addr}; use tokio::fs; use tokio::sync::OnceCell; +use tracing::info; mod default { use super::*; @@ -147,10 +149,20 @@ impl Config { /// panics if called multiple times fn init_logger(&self) { - env_logger::Builder::from_env(env_logger::Env::new().default_filter_or(&self.log_filters)) - .format_timestamp(None) - .format_module_path(false) - .format_target(false) + use tracing_subscriber::prelude::*; + use tracing_subscriber::EnvFilter; + + let format = tracing_subscriber::fmt::layer() + .without_time() + .with_target(false); + + let filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(&self.log_filters)) + .unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::registry() + .with(filter) + .with(format) .init(); } @@ -168,9 +180,9 @@ impl Config { }; let global_config = Box::leak(Box::new(config)); global_config.init_logger(); - if let Some(load_err) = load_err { + if let Some(err) = load_err { // log only after the logger has been initialized - log::info!("cannot load config, continuing with defaults: {load_err:?}"); + info!(?err, "cannot load config, continuing with defaults"); } &*global_config }) diff --git a/src/lsp.rs b/src/lsp.rs index 0d94484..96afb56 100644 --- a/src/lsp.rs +++ b/src/lsp.rs @@ -28,15 +28,15 @@ //! - Progress notifications - contains a `token` property which could be used to identify the //! client but the specification also says it has nothing to do with the request IDs -use anyhow::{bail, ensure, Context, Result}; -use serde::Serialize; -use serde_json::{Map, Value}; use std::fmt::{self, Debug}; use std::io::{self, ErrorKind}; use std::str; use std::sync::Arc; -use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use anyhow::{bail, ensure, Context, Result}; +use serde::Serialize; +use serde_json::{Map, Value}; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt}; /// Every message begins with a HTTP-style header /// diff --git a/src/main.rs b/src/main.rs index 1933846..d9a6437 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,11 +29,7 @@ struct ClientArgs { } #[derive(Args, Debug)] -struct ServerArgs { - /// Dump all communication with the client - #[arg(long)] - dump: bool, -} +struct ServerArgs {} #[derive(Subcommand, Debug)] enum Cmd { @@ -49,7 +45,7 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); match cli.command { - Some(Cmd::Server(args)) => server::main(args.dump).await, + Some(Cmd::Server(_args)) => server::main().await, Some(Cmd::Client(args)) => client::main(args.server_path, args.server_args).await, None => { let server_path = env::var("RA_MUX_SERVER").unwrap_or_else(|_| "rust-analyzer".into()); diff --git a/src/proto.rs b/src/proto.rs index a37a80f..e5488ee 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,6 +1,7 @@ +use std::env; + use anyhow::{ensure, Context, Result}; use serde_derive::{Deserialize, Serialize}; -use std::env; use tokio::io::{AsyncBufRead, AsyncBufReadExt}; #[derive(Serialize, Deserialize)] diff --git a/src/server.rs b/src/server.rs index 5cdc7e8..fdbc30b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,21 +6,20 @@ //! LSP Multiplexer attempts to solve this problem by spawning a single rust-analyzer instance per //! cargo workspace and routing the messages through TCP to multiple clients. -use crate::server::client::Client; -use crate::server::instance::InstanceRegistry; use anyhow::{Context, Result}; use ra_multiplex::config::Config; use tokio::net::TcpListener; use tokio::task; +use tracing::{debug, error, info, info_span, warn, Instrument}; + +use crate::server::client::Client; +use crate::server::instance::InstanceRegistry; mod async_once_cell; mod client; mod instance; -pub async fn main(dump: bool) -> Result<()> { - if dump { - todo!("--dump is not yet implemented"); - } +pub async fn main() -> Result<()> { let config = Config::load_or_default().await; let registry = InstanceRegistry::new().await; @@ -31,18 +30,21 @@ pub async fn main(dump: bool) -> Result<()> { let port = addr.port(); let registry = registry.clone(); - log::info!("[{port}] client connected"); - task::spawn(async move { - match Client::process(socket, port, registry).await { - Ok(_) => log::debug!("[{port}] client initialized"), - Err(err) => log::error!("[{port}] client error: {err:?}"), + task::spawn( + async move { + info!("client connected"); + match Client::process(socket, port, registry).await { + Ok(_) => debug!("client initialized"), + Err(err) => error!("client error: {err:?}"), + } } - }); + .instrument(info_span!("client", %port)), + ); } Err(err) => match err.kind() { // ignore benign errors std::io::ErrorKind::NotConnected => { - log::warn!("listener error {err}"); + warn!("listener error {err}"); } _ => { Err(err).context("accept connection")?; diff --git a/src/server/client.rs b/src/server/client.rs index d760d2d..a201dc7 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -1,19 +1,20 @@ -use super::instance::{ - InitializeCache, InstanceKey, InstanceRegistry, RaInstance, INIT_REQUEST_ID, -}; -use anyhow::{bail, Context, Result}; -use ra_multiplex::{ - lsp::{self, Message}, - proto, -}; -use serde_json::{json, Map, Value}; use std::io::ErrorKind; use std::sync::Arc; + +use anyhow::{bail, Context, Result}; +use ra_multiplex::lsp::{self, Message}; +use ra_multiplex::proto; +use serde_json::{json, Map, Value}; use tokio::io::BufReader; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::{select, task}; +use tracing::{debug, error, info, trace, Instrument}; + +use super::instance::{ + InitializeCache, InstanceKey, InstanceRegistry, RaInstance, INIT_REQUEST_ID, +}; pub struct Client { port: u16, @@ -31,7 +32,12 @@ impl Client { let proto_init = proto::Init::from_reader(&mut buffer, &mut socket_read).await?; let key = InstanceKey::from_proto_init(&proto_init).await; - log::debug!("client configured {key:?}"); + debug!( + path = ?key.workspace_root(), + server = ?key.server(), + args = ?key.args(), + "client configured", + ); let mut client = Client { port, @@ -57,14 +63,13 @@ impl Client { socket_read: &mut BufReader, ) -> Result<()> { let mut buffer = Vec::new(); - let port = self.port; let (mut json, _bytes) = lsp::read_message(&mut *socket_read, &mut buffer) .await? .context("channel closed")?; if !matches!(json.get("method"), Some(Value::String(method)) if method == "initialize") { bail!("first client message was not InitializeRequest"); } - log::debug!("[{port}] recv InitializeRequest"); + debug!("recv InitializeRequest"); // this is an initialize request, it's special because it cannot be sent twice or // rust-analyzer will crash. @@ -96,7 +101,6 @@ impl Client { tx: mpsc::Sender, buffer: &mut Vec, ) -> Result<()> { - let port = self.port; // parse the cached message and restore the `id` to the value this client expects let response = self.instance.init_cache.response.get().await; let mut json: Map = serde_json::from_slice(response.as_bytes()) @@ -108,7 +112,7 @@ impl Client { .expect("BUG: need to wait_for_initialize_request first"), ); let message = Message::from_json(&json, buffer); - log::debug!("[{port}] send response to InitializeRequest"); + debug!("send response to InitializeRequest"); tx.send(message).await.context("send initialize response")?; Ok(()) } @@ -131,35 +135,37 @@ impl Client { mut close_rx: mpsc::Receiver, mut socket_write: OwnedWriteHalf, ) { - let port = self.port; - task::spawn(async move { - // unlike the output task, here we first wait on the channel which is going to - // block until the rust-analyzer server sends a notification, however if we're the last - // client and have just closed the server is unlikely to send any. this results in the - // last client often falsely hanging while the gc task depends on the input channels being - // closed to detect a disconnected client. - // - // when a client sends a shutdown request we receive a message on the close_rx, send - // the reply and close the connection. if no shutdown request was received but the - // client closed close_rx channel will be dropped (unlike the normal rx channel which - // is shared) and the connection will close without sending any response. - while let Some(message) = select! { - message = close_rx.recv() => message, - message = rx.recv() => message, - } { - if let Err(err) = message.to_writer(&mut socket_write).await { - match err.kind() { - // ignore benign errors, treat as socket close - ErrorKind::BrokenPipe => {} - // report fatal errors - _ => log::error!("[{port}] error writing client input: {err}"), + task::spawn( + async move { + // unlike the output task, here we first wait on the channel which is going to + // block until the rust-analyzer server sends a notification, however if we're the last + // client and have just closed the server is unlikely to send any. this results in the + // last client often falsely hanging while the gc task depends on the input channels being + // closed to detect a disconnected client. + // + // when a client sends a shutdown request we receive a message on the close_rx, send + // the reply and close the connection. if no shutdown request was received but the + // client closed close_rx channel will be dropped (unlike the normal rx channel which + // is shared) and the connection will close without sending any response. + while let Some(message) = select! { + message = close_rx.recv() => message, + message = rx.recv() => message, + } { + if let Err(err) = message.to_writer(&mut socket_write).await { + match err.kind() { + // ignore benign errors, treat as socket close + ErrorKind::BrokenPipe => {} + // report fatal errors + _ => error!(?err, "error writing client input: {err}"), + } + break; // break on any error } - break; // break on any error } + debug!("client input closed"); + info!("client disconnected"); } - log::debug!("[{port}] client input closed"); - log::info!("[{port}] client disconnected"); - }); + .in_current_span(), + ); } fn spawn_output_task( @@ -170,20 +176,23 @@ impl Client { let port = self.port; let instance = Arc::clone(&self.instance); let instance_tx = self.instance.message_writer.clone(); - task::spawn(async move { - match read_client_socket( - socket_read, - instance_tx, - close_tx, - port, - &instance.init_cache, - ) - .await - { - Ok(_) => log::debug!("[{port}] client output closed"), - Err(err) => log::error!("[{port}] error reading client output: {err:?}"), + task::spawn( + async move { + match read_client_socket( + socket_read, + instance_tx, + close_tx, + port, + &instance.init_cache, + ) + .await + { + Ok(_) => debug!("client output closed"), + Err(err) => error!(?err, "error reading client output"), + } } - }); + .in_current_span(), + ); } } @@ -207,13 +216,14 @@ async fn read_client_socket( let mut buffer = Vec::new(); while let Some((mut json, bytes)) = lsp::read_message(&mut socket_read, &mut buffer).await? { + trace!(message = serde_json::to_string(&json).unwrap(), "client"); if matches!(json.get("method"), Some(Value::String(method)) if method == "initialized") { // initialized notification can only be sent once per server if init_cache.attempt_send_notif() { - log::debug!("[{port}] send InitializedNotification"); + debug!("send InitializedNotification"); } else { // we're not the first, skip processing the message further - log::debug!("[{port}] skip InitializedNotification"); + debug!("skip InitializedNotification"); continue; } } @@ -222,7 +232,7 @@ async fn read_client_socket( // instead we disconnect this client to prevent the editor hanging // see if let Some(shutdown_request_id) = json.get("id") { - log::info!("[{port}] client sent shutdown request, sending a response and closing connection"); + info!("client sent shutdown request, sending a response and closing connection"); // let message = Message::from_json( &json!({ diff --git a/src/server/instance.rs b/src/server/instance.rs index 745f3ae..1bcea0e 100644 --- a/src/server/instance.rs +++ b/src/server/instance.rs @@ -1,14 +1,5 @@ -use super::async_once_cell::AsyncOnceCell; -use anyhow::{bail, Context, Result}; -use ra_multiplex::{ - config::Config, - lsp::{self, Message}, - proto, -}; -use serde_json::{Number, Value}; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::fmt::Write; use std::io::ErrorKind; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; @@ -18,10 +9,19 @@ use std::str::{self, FromStr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; + +use anyhow::{bail, Context, Result}; +use ra_multiplex::config::Config; +use ra_multiplex::lsp::{self, Message}; +use ra_multiplex::proto; +use serde_json::{Number, Value}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; use tokio::sync::{mpsc, Mutex, Notify, RwLock}; use tokio::{select, task, time}; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; + +use super::async_once_cell::AsyncOnceCell; /// keeps track of the initialize/initialized handshake for an instance #[derive(Default)] @@ -60,6 +60,18 @@ pub struct InstanceKey { } impl InstanceKey { + pub fn server(&self) -> &str { + &self.server + } + + pub fn args(&self) -> &[String] { + &self.args + } + + pub fn workspace_root(&self) -> &Path { + &self.workspace_root + } + pub async fn from_proto_init(proto_init: &proto::Init) -> InstanceKey { let config = Config::load_or_default().await; @@ -142,9 +154,7 @@ impl RaInstance { impl Drop for RaInstance { fn drop(&mut self) { - let path = self.key.workspace_root.display(); - let pid = self.pid; - log::debug!("[{path} {pid}] instance dropped"); + debug!("instance dropped"); } } @@ -174,51 +184,60 @@ impl InstanceRegistry { let registry = self.clone(); let mut closed_ports = Vec::new(); - task::spawn(async move { - loop { - time::sleep(Duration::from_secs(gc_interval)).await; - - for (key, instance) in registry.map.lock().await.iter() { - for (port, sender) in instance.message_readers.read().await.iter() { - if sender.is_closed() { - closed_ports.push(*port); + task::spawn( + async move { + loop { + time::sleep(Duration::from_secs(gc_interval)).await; + + for (key, instance) in registry.map.lock().await.iter() { + for (port, sender) in instance.message_readers.read().await.iter() { + if sender.is_closed() { + closed_ports.push(*port); + } } - } - let mut message_readers = instance.message_readers.write().await; - for port in closed_ports.drain(..) { - message_readers.remove(&port); - } - if let Some(instance_timeout) = instance_timeout { - if message_readers.is_empty() && instance.attempt_start_timeout() { - registry.spawn_timeout_task(key, instance_timeout); - let pid = instance.pid; - let path = key.workspace_root.display(); - log::warn!("[{path} {pid}] no clients, close timeout started ({instance_timeout} seconds)"); + let mut message_readers = instance.message_readers.write().await; + for port in closed_ports.drain(..) { + message_readers.remove(&port); + } + if let Some(instance_timeout) = instance_timeout { + if message_readers.is_empty() && instance.attempt_start_timeout() { + registry.spawn_timeout_task(key, instance_timeout); + info!( + pid = instance.pid, + path = ?key.workspace_root, + timeout = ?instance_timeout, + "no clients, close timeout started", + ); + } + } else { + // if there is no instance timeout we don't need to spawn the timeout task } - } else { - // if there is no instance timeout we don't need to spawn the timeout task } } } - }); + .instrument(info_span!("garbage collector")), + ); } /// closes a rust-analyzer instance after a timeout if it still doesn't have any fn spawn_timeout_task(&self, key: &InstanceKey, instance_timeout: u64) { let registry = self.clone(); let key = key.to_owned(); - task::spawn(async move { - time::sleep(Duration::from_secs(instance_timeout)).await; + task::spawn( + async move { + time::sleep(Duration::from_secs(instance_timeout)).await; - if let Some(instance) = registry.map.lock().await.get(&key) { - if instance.message_readers.read().await.is_empty() { - instance.close.notify_one(); + if let Some(instance) = registry.map.lock().await.get(&key) { + if instance.message_readers.read().await.is_empty() { + instance.close.notify_one(); + } + instance.timeout_finished(); + } else { + // instance has already been deleted } - instance.timeout_finished(); - } else { - // instance has already been deleted } - }); + .in_current_span(), + ); } /// find or spawn an instance for a `project_root` @@ -236,6 +255,7 @@ impl InstanceRegistry { } impl RaInstance { + #[instrument(name = "instance", fields(path = ?key.workspace_root), skip_all, parent = None)] fn spawn(key: &InstanceKey, registry: InstanceRegistry) -> Result> { let mut child = Command::new(&key.server) .args(&key.args) @@ -274,139 +294,137 @@ impl RaInstance { instance.spawn_stdout_task(stdout); instance.spawn_stdin_task(rx, stdin); - let path = key.workspace_root.display(); - log::info!("[{path} {pid}] spawned {server}", server = &key.server); + info!(server = ?key.server, args = ?key.args, "spawned"); Ok(instance) } /// read errors from server stderr and log them fn spawn_stderr_task(self: &Arc, stderr: ChildStderr) { - let pid = self.pid; - let path = self.key.workspace_root.display().to_string(); let mut stderr = BufReader::new(stderr); let mut buffer = String::new(); - task::spawn(async move { - loop { - buffer.clear(); - match stderr.read_line(&mut buffer).await { - Ok(0) => { - // reached EOF - log::debug!("[{path} {pid}] instance stderr closed"); - break; - } - Ok(_) => { - let line = buffer.trim_end(); // remove trailing '\n' or possibly '\r\n' - log::error!("[{path} {pid}] {line}"); - } - Err(err) => { - let err = anyhow::Error::from(err); - log::error!("[{path} {pid}] error reading from stderr: {err:?}"); + task::spawn( + async move { + loop { + buffer.clear(); + match stderr.read_line(&mut buffer).await { + Ok(0) => { + // reached EOF + debug!("stderr closed"); + break; + } + Ok(_) => { + let line = buffer.trim_end(); // remove trailing '\n' or possibly '\r\n' + error!(%line, "stderr"); + } + Err(err) => { + let err = anyhow::Error::from(err); + error!(?err, "error reading from stderr"); + } } } } - }); + .in_current_span(), + ); } /// read messages from server stdout and distribute them to client channels fn spawn_stdout_task(self: &Arc, stdout: ChildStdout) { let stdout = BufReader::new(stdout); let instance = Arc::clone(self); - let pid = self.pid; - let path = instance.key.workspace_root.display().to_string(); - - task::spawn(async move { - match read_server_socket(stdout, &instance.message_readers, &instance.init_cache).await - { - Err(err) => log::error!("[{path} {pid}] error reading from stdout: {err:?}"), - Ok(_) => log::debug!("[{path} {pid}] instance stdout closed"), + + task::spawn( + async move { + match read_server_socket(stdout, &instance.message_readers, &instance.init_cache) + .await + { + Err(err) => error!(?err, "error reading from stdout"), + Ok(_) => debug!("stdout closed"), + } } - }); + .in_current_span(), + ); } /// read messages sent by clients from a channel and write them into server stdin fn spawn_stdin_task(self: &Arc, rx: mpsc::Receiver, stdin: ChildStdin) { - let pid = self.pid; - let path = self.key.workspace_root.display().to_string(); let mut receiver = rx; let mut stdin = stdin; - task::spawn(async move { - // because we (stdin task) don't keep a reference to `self` it will be dropped when the - // child closes and all the clients disconnect including the sender and this receiver - // will not keep blocking (unlike in client input task) - while let Some(message) = receiver.recv().await { - if let Err(err) = message.to_writer(&mut stdin).await { - match err.kind() { - // stdin is closed, no need to log an error - ErrorKind::BrokenPipe => {} - _ => { - let err = anyhow::Error::from(err); - log::error!("[{path} {pid}] error writing to stdin: {err:?}"); + task::spawn( + async move { + // because we (stdin task) don't keep a reference to `self` it will be dropped when the + // child closes and all the clients disconnect including the sender and this receiver + // will not keep blocking (unlike in client input task) + while let Some(message) = receiver.recv().await { + if let Err(err) = message.to_writer(&mut stdin).await { + match err.kind() { + // stdin is closed, no need to log an error + ErrorKind::BrokenPipe => {} + _ => { + let err = anyhow::Error::from(err); + error!(?err, "error writing to stdin"); + } } + break; } - break; } + debug!("stdin closed"); } - log::debug!("[{path} {pid}] instance stdin closed"); - }); + .in_current_span(), + ); } /// waits for child and logs when it exits fn spawn_wait_task(self: &Arc, child: Child, registry: InstanceRegistry) { let key = self.key.clone(); - let pid = self.pid; - let path = key.workspace_root.display().to_string(); let instance = Arc::clone(self); - task::spawn(async move { - let mut child = child; - loop { - select! { - _ = instance.close.notified() => { - if let Err(err) = child.start_kill() { - log::error!("[{path} {pid}] failed to close rust-analyzer: {err}"); + task::spawn( + async move { + let mut child = child; + loop { + select! { + _ = instance.close.notified() => { + if let Err(err) = child.start_kill() { + error!(?err, "failed to close child"); + } } - } - exit = child.wait() => { - // remove the closing instance from the registry so new clients spawn new - // instance - registry.map.lock().await.remove(&key); - - // disconnect all current clients - // - // we'll rely on the editor client to restart the ra-multiplex client, - // start a new connection and we'll spawn another instance like we'd with - // any other new client - let _ = instance.message_readers.write().await.drain(); - - match exit { - Ok(status) => { - let success = if status.success() { - "successfully" - } else { - "with error" - }; - let mut details = String::new(); - if let Some(code) = status.code() { - details.write_fmt(format_args!(" code {code}")).unwrap(); - } - #[cfg(unix)] - { - if let Some(signal) = status.signal() { - details.write_fmt(format_args!(" signal {signal}")).unwrap(); - } + exit = child.wait() => { + // remove the closing instance from the registry so new clients spawn new + // instance + registry.map.lock().await.remove(&key); + + // disconnect all current clients + // + // we'll rely on the editor client to restart the ra-multiplex client, + // start a new connection and we'll spawn another instance like we'd with + // any other new client + let _ = instance.message_readers.write().await.drain(); + + match exit { + Ok(status) => { + #[cfg(unix)] + let signal = status.signal(); + #[cfg(not(unix))] + let signal = tracing::field::Empty; + + error!( + success = status.success(), + code = status.code(), + signal, + "child exited", + ); } - let pid = instance.pid; - log::error!("[{path} {pid}] child exited {success}{details}"); + Err(err) => error!(?err, "error waiting for child"), } - Err(err) => log::error!("[{path} {pid}] error waiting for child: {err}"), + break; } - break; } } } - }); + .in_current_span(), + ); } } @@ -430,7 +448,7 @@ async fn read_server_socket( let mut buffer = Vec::new(); while let Some((mut json, bytes)) = lsp::read_message(&mut reader, &mut buffer).await? { - log::trace!("{}", Value::Object(json.clone())); + trace!(message = serde_json::to_string(&json).unwrap(), "server"); if let Some(id) = json.get("id") { // we tagged the request id so we expect to only receive tagged responses @@ -438,7 +456,7 @@ async fn read_server_socket( Value::String(string) if string == INIT_REQUEST_ID => { // this is a response to the InitializeRequest, we need to process it // separately - log::debug!("recv InitializeRequest response"); + debug!("recv InitializeRequest response"); init_cache .response .set(Message::from_bytes(bytes)) @@ -449,7 +467,10 @@ async fn read_server_socket( } Value::String(string) => string, _ => { - log::debug!("response to no request {}", Value::Object(json)); + debug!( + message = serde_json::to_string(&json).unwrap(), + "response to no request", + ); // FIXME uncommenting this crashes rust-analyzer, presumably because the client // then sends a confusing response or something? i'm guessing that the response // id gets tagged with port but rust-analyzer expects to know the id because @@ -479,7 +500,7 @@ async fn read_server_socket( let (port, old_id) = match parse_tagged_id(tagged_id) { Ok(ok) => ok, Err(err) => { - log::warn!("invalid tagged id {err:?}"); + warn!(?err, "invalid tagged id"); continue; } }; @@ -491,7 +512,7 @@ async fn read_server_socket( // ignore closed channels let _ignore = sender.send(message).await; } else { - log::warn!("[{port}] no client"); + warn!("no client"); } } else { // notification messages without an id are sent to all clients