Skip to content

Commit

Permalink
fix: try_recv() channel error : delta may be None
Browse files Browse the repository at this point in the history
basically if there was no change to report, the oneshot would not get
updated which is bad. so we put back the version we got and send a None
(the channel now has nullable TextChange). so basically its always a
try_recv, but its fine since recv is implemented with try_recv + poll
anyway
  • Loading branch information
alemidev committed Aug 16, 2024
1 parent fdcfc61 commit 0154e5a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
6 changes: 4 additions & 2 deletions src/buffer/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::api::TextChange;

use crate::ext::InternallyMutable;

use super::worker::DeltaRequest;

/// the buffer controller implementation
///
/// for each controller a worker exists, managing outgoing and inbound
Expand Down Expand Up @@ -52,7 +54,7 @@ pub(crate) struct BufferControllerInner {
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
}

Expand Down Expand Up @@ -85,7 +87,7 @@ impl Controller<TextChange> for BufferController {
self.0.delta_request.send((last_update, tx)).await?;
let (v, change) = rx.await?;
self.0.last_update.set(v);
Ok(Some(change))
Ok(change)
}

/// enqueue a text change for processing
Expand Down
9 changes: 7 additions & 2 deletions src/buffer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ use codemp_proto::buffer::{BufferEvent, Operation};

use super::controller::{BufferController, BufferControllerInner};

pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);

pub(crate) struct BufferWorker {
user_id: Uuid,
latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
delta_req: mpsc::Receiver<DeltaRequest>,
stop: mpsc::UnboundedReceiver<()>,
controller: BufferController,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
Expand Down Expand Up @@ -181,7 +184,9 @@ impl ControllerWorker<TextChange> for BufferWorker {
}
}
};
tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?");
tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?");
}
},
},
Expand Down

0 comments on commit 0154e5a

Please sign in to comment.