-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathclient.rs
419 lines (374 loc) · 14.2 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
use std::collections::BTreeMap;
use std::io::ErrorKind;
use std::sync::Arc;
use anyhow::{bail, ensure, Context, Result};
use percent_encoding::percent_decode_str;
use serde_json::Value;
use tokio::io::BufReader;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, Mutex};
use tokio::task;
use tracing::{debug, error, info, warn, Instrument};
use uriparse::URI;
use crate::instance::{self, Instance, InstanceKey, InstanceMap};
use crate::lsp::ext::{self, LspMuxOptions, Tag};
use crate::lsp::jsonrpc::{
self, Message, Request, RequestId, ResponseError, ResponseSuccess, Version,
};
use crate::lsp::transport::{LspReader, LspWriter};
use crate::lsp::InitializeParams;
use crate::socketwrapper::{OwnedReadHalf, OwnedWriteHalf, Stream};
/// Read first client message and dispatch lsp mux commands
pub async fn process(
socket: Stream,
client_id: usize,
instance_map: Arc<Mutex<InstanceMap>>,
) -> Result<()> {
let (socket_read, socket_write) = socket.into_split();
let mut reader = LspReader::new(BufReader::new(socket_read), "client");
let writer = LspWriter::new(socket_write, "client");
// Read the first client message, this must be `initialize` request.
let req = match reader
.read_message()
.await
.context("receive `initialize` request")?
.context("channel closed")?
{
Message::Request(req) if req.method == "initialize" => req,
_ => bail!("first client message was not `initialize` request"),
};
let mut init_params = serde_json::from_value::<InitializeParams>(req.params.clone())
.context("parse `initialize` request params")?;
// Remove `lspMux` from `initializationOptions`, it's ra-multiplex extension
// and we don't want to forward it to the real language server.
let options = init_params
.initialization_options
.as_mut()
.context("missing `initializationOptions` in `initialize` request")?
.lsp_mux
.take()
.context("missing `lspMux` in `initializationOptions` in `initialize` request")?;
ensure!(
options.version == LspMuxOptions::PROTOCOL_VERSION,
"unsupported protocol version {:?}, expected {:?}",
&options.version,
LspMuxOptions::PROTOCOL_VERSION,
);
debug!(?options, "lspmux initialization");
match options.method {
ext::Request::Connect {
server,
args,
env,
cwd,
} => {
connect(
client_id,
instance_map,
(server, args, env, cwd),
req,
init_params,
reader,
writer,
)
.await
}
ext::Request::Status {} => status(instance_map, writer).await,
ext::Request::Reload { cwd } => reload(cwd, instance_map, writer).await,
}
}
#[derive(Clone)]
pub struct Client {
id: usize,
sender: mpsc::Sender<Message>,
}
impl Client {
fn new(id: usize) -> (Client, mpsc::Receiver<Message>) {
let (sender, receiver) = mpsc::channel(16);
(Client { id, sender }, receiver)
}
pub fn id(&self) -> usize {
self.id
}
/// Send a message to the client channel
pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
self.sender.send(message).await
}
}
async fn status(
instance_map: Arc<Mutex<InstanceMap>>,
mut writer: LspWriter<OwnedWriteHalf>,
) -> Result<()> {
let status = task::spawn_blocking(move || instance_map.blocking_lock().get_status())
.await
.unwrap();
writer
.write_message(&Message::ResponseSuccess(ResponseSuccess {
jsonrpc: Version,
result: serde_json::to_value(status).unwrap(),
id: RequestId::Number(0),
}))
.await
.context("writing response")
}
async fn reload(
cwd: String,
instance_map: Arc<Mutex<InstanceMap>>,
mut writer: LspWriter<OwnedWriteHalf>,
) -> Result<()> {
if let Some(instance) = instance_map.lock().await.get_by_cwd(&cwd) {
instance
.send_message(Message::Request(Request {
jsonrpc: Version,
method: "rust-analyzer/reloadWorkspace".into(),
params: Value::Null,
id: RequestId::Number(0).tag(Tag::Drop),
}))
.await
.ok()
.context("instance closed")?;
writer
.write_message(&Message::ResponseSuccess(ResponseSuccess::null(
RequestId::Number(0),
)))
.await
.context("writing response")?;
} else {
writer
.write_message(&Message::ResponseError(ResponseError {
jsonrpc: Version,
error: jsonrpc::Error {
code: 0,
message: "no instance found".into(),
data: None,
},
id: RequestId::Number(0),
}))
.await
.context("writing response")?;
debug!(?cwd, "no instance found for path");
}
Ok(())
}
/// Find or spawn a language server instance and connect the client to it
async fn connect(
client_id: usize,
instance_map: Arc<Mutex<InstanceMap>>,
(server, args, env, cwd): (
String,
Vec<String>,
BTreeMap<String, String>,
Option<String>,
),
req: Request,
init_params: InitializeParams,
mut reader: LspReader<BufReader<OwnedReadHalf>>,
mut writer: LspWriter<OwnedWriteHalf>,
) -> Result<()> {
// Select the workspace root directory.
let workspace_root = select_workspace_root(&init_params, cwd.as_deref())
.context("could not get any workspace_root")?;
// Get an language server instance for this client.
let key = InstanceKey {
server,
args,
env,
workspace_root,
};
let instance = instance::get_or_spawn(instance_map, key, init_params).await?;
// Respond to client's `initialize` request using a response result from
// the first time this server instance was initialized, it might not be
// a response directly to our previous request but it should be hopefully
// similar if it comes from another instance of the same client.
let res = ResponseSuccess {
jsonrpc: Version,
result: serde_json::to_value(instance.initialize_result()).unwrap(),
id: req.id,
};
writer
.write_message(&res.into())
.await
.context("send `initialize` request response")?;
// Wait for the client to send `initialized` notification. We don't want to
// forward it since the server only expects one and we already sent a fake
// one during the server handshake.
match reader
.read_message()
.await
.context("receive `initialized` notification")?
.context("channel closed")?
{
Message::Notification(notif) if notif.method == "initialized" => {
// Discard the notification.
}
_ => bail!("second client message was not `initialized` notification"),
}
info!("initialized client");
let (client, client_rx) = Client::new(client_id);
task::spawn(input_task(client_rx, writer).in_current_span());
instance.add_client(client.clone()).await;
task::spawn(output_task(reader, client, instance).in_current_span());
Ok(())
}
// Parse a file path as String out of a LSP `URI` type.
fn parse_root_uri(root_uri: &str) -> Result<String> {
let (scheme, _, mut path, _, _) = URI::try_from(root_uri)
.context("failed to parse URI")?
.into_parts();
if scheme != uriparse::Scheme::File {
bail!("only `file://` URIs are supported");
}
path.normalize(false);
let root = percent_decode_str(&path.to_string())
.decode_utf8()
.context("decoded URI was not valid utf-8")?
.to_string();
// On windows the drive letter `C:/` gets interpreted as the first
// segment of an absolute path which results in an extra `/` at the
// beginning of the string representation which needs to be removed.
let root = match root.as_bytes() {
#[cfg(any(windows, test))]
[b'/', drive, b':', b'/', ..] if drive.is_ascii_alphabetic() => {
root.strip_prefix('/').unwrap().to_owned()
}
_ => root,
};
Ok(root)
}
#[cfg(test)]
#[test]
fn parsing_root_uris() {
use parse_root_uri as p;
assert_eq!(p("file:///home/user/proj").unwrap(), "/home/user/proj");
assert_eq!(p("file:///c:/dev/proj").unwrap(), "c:/dev/proj");
assert_eq!(p("file:///proj").unwrap(), "/proj");
assert_eq!(p("file:///d:/proj").unwrap(), "d:/proj");
assert_eq!(p("file:///").unwrap(), "/");
assert_eq!(p("file:///e:/").unwrap(), "e:/");
}
fn select_workspace_root<'a>(
init_params: &'a InitializeParams,
proxy_cwd: Option<&'a str>,
) -> Result<String> {
if init_params.workspace_folders.len() > 1 {
// TODO Ideally we'd be looking up any server which has a superset of
// workspace folders active possibly adding transforming the `initialize`
// request into a few requests for adding workspace folders if the
// server supports it. Buuut let's just run with supporting single-folder
// workspaces only at first, it's probably the most common use-case anyway.
warn!("initialize request with multiple workspace folders isn't supported");
debug!(workspace_folders = ?init_params.workspace_folders);
}
if init_params.workspace_folders.len() == 1 {
return parse_root_uri(&init_params.workspace_folders[0].uri)
.context("parse initParams.workspaceFolders[0].uri");
}
assert!(init_params.workspace_folders.is_empty());
// Using the deprecated LSP fields `rootPath` or `rootUri` as fallback
if let Some(root_uri) = &init_params.root_uri {
return parse_root_uri(root_uri).context("parse initParams.rootUri");
}
if let Some(root_path) = &init_params.root_path {
return Ok(root_path.to_owned());
}
// Using the proxy `cwd` as fallback
if let Some(proxy_cwd) = proxy_cwd {
return Ok(proxy_cwd.to_owned());
}
bail!("could not determine a suitable workspace_root");
}
/// Receive messages from channel and write them to the client input socket
async fn input_task(mut rx: mpsc::Receiver<Message>, mut writer: LspWriter<OwnedWriteHalf>) {
// The other end of this channel is held by the `output_task` _and_ in the
// `Instance` itself, this task depends on the `output_task` to detect a
// client disconnect and call `Instance::cleanup_client`, otherwise we're
// going to hang forever here.
while let Some(message) = rx.recv().await {
if let Err(err) = writer.write_message(&message).await {
match err.kind() {
// ignore benign errors, treat as socket close
ErrorKind::BrokenPipe => {}
// report fatal errors
_ => error!(?err, "error writing client input: {err}"),
}
break; // break on any error
}
}
debug!("client input closed");
info!("client disconnected");
}
/// Read messages from client output socket and send them to the server channel
async fn output_task(
mut reader: LspReader<BufReader<OwnedReadHalf>>,
client: Client,
instance: Arc<Instance>,
) {
loop {
let message = match reader.read_message().await {
Ok(Some(message)) => message,
Ok(None) => {
debug!("client output closed");
break;
}
Err(err) => {
error!(?err, "error reading client output");
continue;
}
};
instance.keep_alive();
match message {
Message::Request(req) if req.method == "shutdown" => {
// Client requested the server to shut down but other clients might still be connected.
// Instead we disconnect this client to prevent the editor hanging
// see <https://github.com/pr2502/ra-multiplex/issues/5>.
info!("client sent shutdown request, sending a response and closing connection");
// <https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#shutdown>
let res = ResponseSuccess::null(req.id);
// Ignoring error because we would've closed the connection regardless
let _ = client.send_message(res.into()).await;
break;
}
Message::Request(mut req) => {
req.id = req.id.tag(Tag::ClientId(client.id));
if instance.send_message(req.into()).await.is_err() {
break;
}
}
Message::ResponseSuccess(mut res) => match res.id.untag() {
(Some(Tag::Forward), id) => {
res.id = id;
if instance.send_message(res.into()).await.is_err() {
break;
}
}
(Some(Tag::Drop), _) => {
// Drop the message
}
_ => {
debug!(?res, "unexpected client response");
}
},
Message::ResponseError(res) => {
warn!(?res, "client responded with error");
}
Message::Notification(notif) if notif.method == "textDocument/didOpen" => {
if let Err(err) = instance.open_file(client.id, notif.params).await {
warn!(?err, "error opening file");
}
}
Message::Notification(notif) if notif.method == "textDocument/didClose" => {
if let Err(err) = instance.close_file(client.id, notif.params).await {
warn!(?err, "error closing file");
}
}
Message::Notification(notif) => {
if instance.send_message(notif.into()).await.is_err() {
break;
}
}
}
}
if let Err(err) = instance.cleanup_client(client).await {
warn!(?err, "error cleaning up after a client");
}
}