Skip to content

Commit

Permalink
explicitly parse json-rpc 2.0
Browse files Browse the repository at this point in the history
this is a prerequisite for implementing #22 and possibly #26 in a way
i'd be happy merging. the advantage is that we'll be able to do more
manipulation of LSP when we understand it, the disadvantage is possibly
breaking some language servers with slightly broken LSP implementation.
  • Loading branch information
pr2502 committed Oct 17, 2023
1 parent 38baefa commit 23548aa
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 203 deletions.
155 changes: 77 additions & 78 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::ErrorKind;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use serde_json::{json, Map, Value};
use serde_json::Value;
use tokio::io::BufReader;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
Expand All @@ -13,7 +13,8 @@ use tracing::{debug, error, info, trace, Instrument};
use crate::instance::{
InitializeCache, InstanceKey, InstanceRegistry, RaInstance, INIT_REQUEST_ID,
};
use crate::lsp::transport::{LspReader, LspWriter, Message};
use crate::lsp::jsonrpc::{Message, ResponseSuccess, Version};
use crate::lsp::transport::{LspReader, LspWriter};
use crate::proto;

pub struct Client {
Expand Down Expand Up @@ -52,9 +53,7 @@ impl Client {
client.spawn_input_task(client_rx, close_rx, socket_write);
client.spawn_output_task(socket_read, close_tx);

client
.wait_for_initialize_response(client_tx, &mut buffer)
.await?;
client.wait_for_initialize_response(client_tx).await?;
Ok(())
}

Expand All @@ -63,30 +62,34 @@ impl Client {
socket_read: &mut BufReader<OwnedReadHalf>,
) -> Result<()> {
let mut reader = LspReader::new(socket_read);
let mut buffer = Vec::new();
let (mut json, _bytes) = reader.read_message().await?.context("channel closed")?;
if !matches!(json.get("method"), Some(Value::String(method)) if method == "initialize") {
bail!("first client message was not InitializeRequest");
}
debug!("recv InitializeRequest");

let message = reader.read_message().await?.context("channel closed")?;
trace!(?message, "<- client");

let mut req = match message {
Message::Request(req) if req.method == "initialize" => {
debug!(message = ?Message::from(req.clone()), "recv InitializeRequest");
req
}
_ => bail!("first client message was not InitializeRequest"),
};

// this is an initialize request, it's special because it cannot be sent twice or
// rust-analyzer will crash.

// we save the request id so we can later use it for the response
self.initialize_request_id = Some(
json.remove("id")
.context("InitializeRequest is missing an `id`")?,
);
self.initialize_request_id = Some(req.id.clone());

if self.instance.init_cache.attempt_send_request() {
// it haven't been sent yet, we can send it.
//
// instead of tagging the original id we replace it with a custom id that only
// the `initialize` uses
json.insert("id".to_owned(), Value::String(INIT_REQUEST_ID.to_owned()));
req.id = Value::String(INIT_REQUEST_ID.to_owned());

self.instance
.message_writer
.send(Message::from_json(&json, &mut buffer))
.send(req.into())
.await
.context("forward client request")?;
} else {
Expand All @@ -95,24 +98,17 @@ impl Client {
Ok(())
}

async fn wait_for_initialize_response(
&self,
tx: mpsc::Sender<Message>,
buffer: &mut Vec<u8>,
) -> Result<()> {
async fn wait_for_initialize_response(&self, tx: mpsc::Sender<Message>) -> Result<()> {
// parse the cached message and restore the `id` to the value this client expects
let response = self.instance.init_cache.response.get().await;
let mut json: Map<String, Value> = serde_json::from_slice(response.as_bytes())
.expect("BUG: cached initialize response was invalid");
json.insert(
"id".to_owned(),
self.initialize_request_id
.clone()
.expect("BUG: need to wait_for_initialize_request first"),
);
let message = Message::from_json(&json, buffer);
debug!("send response to InitializeRequest");
tx.send(message).await.context("send initialize response")?;
let mut res = self.instance.init_cache.response.get().await;
res.id = self
.initialize_request_id
.clone()
.expect("BUG: need to wait_for_initialize_request first");
debug!(message = ?Message::from(res.clone()), "send response to InitializeRequest");
tx.send(res.into())
.await
.context("send initialize response")?;
Ok(())
}

Expand Down Expand Up @@ -151,7 +147,8 @@ impl Client {
message = close_rx.recv() => message,
message = rx.recv() => message,
} {
if let Err(err) = writer.write_message(message).await {
trace!(?message, "-> client");
if let Err(err) = writer.write_message(&message).await {
match err.kind() {
// ignore benign errors, treat as socket close
ErrorKind::BrokenPipe => {}
Expand Down Expand Up @@ -198,8 +195,8 @@ impl Client {

fn tag_id(port: u16, id: &Value) -> Result<String> {
match id {
Value::Number(number) => Ok(format!("{port:04x}:n:{number}")),
Value::String(string) => Ok(format!("{port:04x}:s:{string}")),
Value::Number(number) => Ok(format!("{port}:n:{number}")),
Value::String(string) => Ok(format!("{port}:s:{string}")),
_ => bail!("unexpected message id type {id:?}"),
}
}
Expand All @@ -214,56 +211,58 @@ async fn read_client_socket(
init_cache: &InitializeCache,
) -> Result<()> {
let mut reader = LspReader::new(socket_read);
let mut buffer = Vec::new();

while let Some((mut json, bytes)) = reader.read_message().await? {
trace!(message = serde_json::to_string(&json).unwrap(), "client");
if matches!(json.get("method"), Some(Value::String(method)) if method == "initialized") {
// initialized notification can only be sent once per server
if init_cache.attempt_send_notif() {
debug!("send InitializedNotification");
} else {
// we're not the first, skip processing the message further
debug!("skip InitializedNotification");
continue;
while let Some(message) = reader.read_message().await? {
trace!(?message, "<- client");

match message {
Message::Request(mut req) if req.method == "initialized" => {
// initialized notification can only be sent once per server
if init_cache.attempt_send_notif() {
debug!("send InitializedNotification");

req.id = tag_id(port, &req.id)?.into();
if tx.send(req.into()).await.is_err() {
break;
}
} else {
// we're not the first, skip processing the message further
debug!("skip InitializedNotification");
continue;
}
}
}
if matches!(json.get("method"), Some(Value::String(method)) if method == "shutdown") {
// client requested the server to shut down but other clients might still be connected.
// instead we disconnect this client to prevent the editor hanging
// see <https://github.com/pr2502/ra-multiplex/issues/5>
if let Some(shutdown_request_id) = json.get("id") {

Message::Request(req) if req.method == "shutdown" => {
// client requested the server to shut down but other clients might still be connected.
// instead we disconnect this client to prevent the editor hanging
// see <https://github.com/pr2502/ra-multiplex/issues/5>
info!("client sent shutdown request, sending a response and closing connection");
// <https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#shutdown>
let message = Message::from_json(
&json!({
"id": shutdown_request_id,
"jsonrpc": "2.0",
"result": null,
}),
&mut buffer,
);
let message = Message::ResponseSuccess(ResponseSuccess {
jsonrpc: Version,
result: Value::Null,
id: req.id,
});
// ignoring error because we would've closed the connection regardless
let _ = close_tx.send(message).await;
break;
}
break;
}
if let Some(id) = json.get("id") {
// messages containing an id need the id modified so we can discern which client to send
// the response to
let tagged_id = tag_id(port, id)?;
json.insert("id".to_owned(), Value::String(tagged_id));

let message = Message::from_json(&json, &mut buffer);
if tx.send(message).await.is_err() {
break;
Message::Request(mut req) => {
req.id = tag_id(port, &req.id)?.into();
if tx.send(req.into()).await.is_err() {
break;
}
}
} else {
// notification messages without an id don't need any modification and can be forwarded
// to rust-analyzer as is
let message = Message::from_bytes(bytes);
if tx.send(message).await.is_err() {
break;

Message::ResponseSuccess(_) | Message::ResponseError(_) => {
debug!(?message, "client response");
}

Message::Notification(notif) => {
if tx.send(notif.into()).await.is_err() {
break;
}
}
}
}
Expand Down
Loading

0 comments on commit 23548aa

Please sign in to comment.