Skip to content

Commit

Permalink
Port to event-listener v5.0
Browse files Browse the repository at this point in the history
cc smol-rs/event-listener#105

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Dec 29, 2023
1 parent 92adf87 commit a6d844b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 32 deletions.
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ name = "broadcast_bench"
[features]

[dependencies]
event-listener = "3"
event-listener-strategy = "0.1.0"
event-listener = "4.0.1"
event-listener-strategy = "0.4.0"
futures-core = "0.3.21"

[dev-dependencies]
Expand All @@ -32,3 +32,9 @@ doc-comment = "0.3.3"
easy-parallel = "3.2.0"
futures-lite = "1.11.3"
futures-util = "0.3.21"

[patch.crates-io]
event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" }
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" }


53 changes: 23 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ pub struct Receiver<T> {
pos: u64,

/// Listens for a send or close event to unblock this stream.
listener: Option<Pin<Box<EventListener>>>,
listener: Option<EventListener>,
}

impl<T> Receiver<T> {
Expand Down Expand Up @@ -1599,8 +1599,7 @@ easy_wrapper! {
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
// TODO: Remove the Pin<Box<>> at the next breaking release and make this type !Unpin
listener: Option<Pin<Box<EventListener>>>,
listener: Option<EventListener>,
msg: Option<T>,
}

Expand All @@ -1610,7 +1609,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
type Output = Result<Option<T>, SendError<T>>;

fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down Expand Up @@ -1641,17 +1640,14 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
}

// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try sending again.
let inner = inner.write().unwrap();
this.listener = Some(inner.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
}
if this.listener.is_none() {
// Start listening and then try sending again.
let inner = inner.write().unwrap();
this.listener = Some(inner.send_ops.listen());
} else {
// Wait for a notification.
ready!(strategy.poll(&mut this.listener, context));
this.listener = None;
}
}
}
Expand All @@ -1668,7 +1664,7 @@ easy_wrapper! {
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<Pin<Box<EventListener>>>,
listener: Option<EventListener>,
}

impl<'a, T> Unpin for RecvInner<'a, T> {}
Expand All @@ -1677,7 +1673,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand All @@ -1695,19 +1691,16 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
}

// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = {
let inner = this.receiver.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(l) => {
// Wait for a notification.
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
}
if this.listener.is_none() {
// Start listening and then try receiving again.
this.listener = {
let inner = this.receiver.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
} else {
// Wait for a notification.
ready!(strategy.poll(&mut this.listener, context));
this.listener = None;
}
}
}
Expand Down

0 comments on commit a6d844b

Please sign in to comment.