Skip to content

Commit

Permalink
Merge pull request #79 from hexedtech/feat/better-tracing
Browse files Browse the repository at this point in the history
feat: introduce tracing::instrument and more debug
  • Loading branch information
zaaarf authored Feb 15, 2025
2 parents 51fe6e4 + a4c2583 commit 253461b
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 18 deletions.
26 changes: 24 additions & 2 deletions src/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
/// `host`, `port` and `tls` affect all connections to all gRPC services; the
/// resulting endpoint is composed like this:
/// http{tls?'s':''}://{host}:{port}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(feature = "py", pyo3::pyclass(get_all, set_all))]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
pub struct Config {
/// User identifier used to register, possibly your email.
pub username: String,
/// User password chosen upon registration.
pub password: String,
pub password: String, // must not leak this!
/// Address of server to connect to, default api.code.mp.
pub host: Option<String>,
/// Port to connect to, default 50053.
Expand Down Expand Up @@ -61,3 +61,25 @@ impl Config {
)
}
}

// manual impl: we want to obfuscate the password field!!
// TODO: can we just tag password to be obfuscated in debug print?
// reimplementing the whole Debug thing is pretty lame
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if f.alternate() {
write!(f,
r#"""Config {{
username: {},
password: ********,
host: {:#?},
port: {:#?},
tls: {:#?}
}}"""#,
self.username, self.host, self.port, self.tls
)
} else {
write!(f, "Config {{ username: {}, password: ********, host: {:?}, port: {:?}, tls: {:?} }}", self.username, self.host, self.port, self.tls)
}
}
}
21 changes: 17 additions & 4 deletions src/buffer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::controller::{BufferController, BufferControllerInner};
struct BufferWorker {
agent_id: u32,
path: String,
workspace_id: String,
latest_version: watch::Sender<diamond_types::LocalVersion>,
local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
Expand Down Expand Up @@ -75,6 +76,7 @@ impl BufferController {
let worker = BufferWorker {
agent_id,
path: path.to_string(),
workspace_id: workspace_id.to_string(),
latest_version: latest_version_tx,
local_version: my_version_tx,
ack_rx,
Expand All @@ -95,15 +97,16 @@ impl BufferController {
BufferController(controller)
}

#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id, path = worker.path))]
async fn work(
mut worker: BufferWorker,
tx: mpsc::Sender<Operation>,
mut rx: Streaming<BufferEvent>,
) {
tracing::debug!("controller worker started");
tracing::debug!("buffer worker started");
loop {
if worker.controller.upgrade().is_none() {
break;
break tracing::debug!("buffer worker clean exit");
};

// block until one of these is ready
Expand All @@ -114,6 +117,7 @@ impl BufferController {
res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"),
Some(v) => {
tracing::debug!("client acked change");
worker.branch.merge(&worker.oplog, &v);
worker.local_version.send(worker.branch.local_version())
.unwrap_or_warn("could not ack local version");
Expand Down Expand Up @@ -160,11 +164,12 @@ impl BufferController {
}
}

tracing::debug!("controller worker stopped");
tracing::debug!("buffer worker stopped");
}
}

impl BufferWorker {
#[tracing::instrument(skip(self, tx))]
async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) {
let last_ver = self.oplog.local_version();
// clip to buffer extents
Expand Down Expand Up @@ -205,11 +210,16 @@ impl BufferWorker {
}
}

#[tracing::instrument(skip(self))]
async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
match self.controller.upgrade() {
None => true, // clean exit actually, just weird we caught it here
None => { // clean exit actually, just weird we caught it here
tracing::debug!("clean exit while handling server change");
true
},
Some(controller) => match self.oplog.decode_and_add(&change.op.data) {
Ok(local_version) => {
tracing::debug!("updating local version: {local_version:?}");
self.latest_version
.send(local_version)
.unwrap_or_warn("failed to update latest version!");
Expand All @@ -229,6 +239,7 @@ impl BufferWorker {
}
}

#[tracing::instrument(skip(self, tx))]
async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) {
let last_ver = self.branch.local_version();
if let Some((lv, Some(dtop))) = self
Expand Down Expand Up @@ -285,9 +296,11 @@ impl BufferWorker {
},
},
};
tracing::debug!("sending update {tc:?}");
tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tracing::debug!("no enqueued changes");
tx.send(None)
.unwrap_or_warn("could not update ops channel -- is controller dead?");
}
Expand Down
2 changes: 2 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ClientInner {

impl Client {
/// Connect to the server, authenticate and instantiate a new [`Client`].
#[tracing::instrument]
pub async fn connect(config: crate::api::Config) -> ConnectionResult<Self> {
// TODO move these two into network.rs
let channel = Endpoint::from_shared(config.endpoint())?.connect().await?;
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Client {
}

/// Join and return a [`Workspace`].
#[tracing::instrument(skip(self, workspace), fields(ws = workspace.as_ref()))]
pub async fn attach_workspace(
&self,
workspace: impl AsRef<str>,
Expand Down
15 changes: 10 additions & 5 deletions src/cursor/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner};

struct CursorWorker {
workspace_id: String,
op: mpsc::UnboundedReceiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
Expand All @@ -24,6 +25,7 @@ struct CursorWorker {
}

impl CursorWorker {
#[tracing::instrument(skip(self, tx))]
fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) {
tx.send(
self.store.pop_front().and_then(|event| {
Expand Down Expand Up @@ -71,6 +73,7 @@ impl CursorController {
let weak = Arc::downgrade(&controller);

let worker = CursorWorker {
workspace_id: workspace_id.to_string(),
op: op_rx,
map: user_map,
stream: stream_rx,
Expand All @@ -86,16 +89,17 @@ impl CursorController {
CursorController(controller)
}

#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id))]
async fn work(
mut worker: CursorWorker,
tx: mpsc::Sender<CursorPosition>,
mut rx: Streaming<CursorEvent>,
) {
tracing::debug!("starting cursor worker");
loop {
tracing::debug!("cursor worker polling");
if worker.controller.upgrade().is_none() {
break;
}; // clean exit: all controllers dropped
break tracing::debug!("cursor worker clean exit");
};
tokio::select! {
biased;

Expand All @@ -110,7 +114,7 @@ impl CursorController {

// server sents us a cursor
Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
None => break, // clean exit, just weird that we got it here
None => break tracing::debug!("cursor worker clean (late) exit"), // clean exit, just weird that we got it here
Some(controller) => {
tracing::debug!("received cursor from server");
worker.store.push_back(cur);
Expand All @@ -127,8 +131,9 @@ impl CursorController {
// client wants to get next cursor event
Some(tx) = worker.stream.recv() => worker.handle_recv(tx),

else => break,
else => break tracing::debug!("cursor worker clean-ish exit"),
}
}
tracing::debug!("stopping cursor worker");
}
}
20 changes: 13 additions & 7 deletions src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl AsyncReceiver<Event> for Workspace {
}

impl Workspace {
#[tracing::instrument(skip(name, user, token, claims), fields(ws = name))]
pub(crate) async fn connect(
name: String,
user: Arc<User>,
Expand Down Expand Up @@ -165,6 +166,7 @@ impl Workspace {
}

/// Attach to a buffer and return a handle to it.
#[tracing::instrument(skip(self))]
pub async fn attach_buffer(&self, path: &str) -> ConnectionResult<buffer::Controller> {
let mut worskspace_client = self.0.services.ws();
let request = tonic::Request::new(BufferNode {
Expand Down Expand Up @@ -326,7 +328,7 @@ impl Workspace {
.0
.filetree
.iter()
.filter(|f| filter.map_or(true, |flt| f.starts_with(flt)))
.filter(|f| filter.is_none_or(|flt| f.starts_with(flt)))
.map(|f| f.clone())
.collect::<Vec<String>>();
tree.sort();
Expand All @@ -342,7 +344,8 @@ struct WorkspaceWorker {
}

impl WorkspaceWorker {
pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
#[tracing::instrument(skip(self, stream, weak))]
pub(crate) async fn work(mut self, ws: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
tracing::debug!("workspace worker starting");
loop {
tokio::select! {
Expand All @@ -352,13 +355,16 @@ impl WorkspaceWorker {
},

res = stream.message() => match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),
Err(e) => break tracing::error!("workspace '{ws}' stream closed: {e}"),
Ok(None) => break tracing::info!("leaving workspace {ws}"),
Ok(Some(WorkspaceEvent { event: None })) => {
tracing::warn!("workspace {} received empty event", name)
tracing::warn!("workspace {ws} received empty event")
}
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
let Some(inner) = weak.upgrade() else { break };
let Some(inner) = weak.upgrade() else {
break tracing::debug!("workspace worker clean exit");
};
tracing::debug!("received workspace event: {ev:?}");
let update = crate::api::Event::from(&ev);
match ev {
// user
Expand Down Expand Up @@ -391,7 +397,7 @@ impl WorkspaceWorker {
if let Some(ws) = weak.upgrade() {
cb.call(Workspace(ws));
} else {
break tracing::debug!("workspace worker clean exit");
break tracing::debug!("workspace worker clean (late) exit");
}
}
}
Expand Down

0 comments on commit 253461b

Please sign in to comment.