Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document and/or improve swarm event handling #1876

Closed
marc-casperlabs opened this issue Dec 7, 2020 · 8 comments
Closed

Document and/or improve swarm event handling #1876

marc-casperlabs opened this issue Dec 7, 2020 · 8 comments

Comments

@marc-casperlabs
Copy link

Currently we have an issue with the Swarm API (or to be more exact, ExpandedSwarm), as we try to run a swarm, process all of its events (not just behavior but the full SwarmEvent) and still react to external stimuli, such as a user requesting dialing a new connection.

Our problem can thus be summarized as follows: Given a swarm and a future or stream that represents external user input that requires mutable access to the ExpandedSwarm instance, how can we react to both swarm events and the future itself?

I did not find an obvious solution for this, so any advice is appreciated. Looking at the examples in the meantime, we did see some patterns, none of which seem applicable here:

future::select

This is similar to what was suggested in #682: Call future::select(swarm.next(), some_other_action).await, then react to either future.

If some_other_action is the one returning a T and finishing first, this would return something close to Either::Right((T, impl Future<Output=Self::OutEvent>)). The second half of that tuple is the "leftover" future from calling swarm.next(), thus my understand is swarm is still borrowed, thus we cannot dial swarm.dial() right there.

We can of course drop the future, but that ends up with the same issue as the tokio::select! approach describe further below.

Manual polling

From what I understand, the chat.rs example polls manually at

task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
}
break
}
}
}
Poll::Pending

try_poll_next_unpin is called for both futures/streams we want to react to, and this works, as the swarm is not borrowed afterwards. Unfortunately this can only be used to react to behavior events, as the Stream implementation discards all others:

loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(Some(event));
}
}

This makes it impossible to get to the SwarmEvents.

tokio::select!

Calling tokio::select! will appear to work and is used in

tokio::select! {
line = stdin.try_next() => Some((floodsub_topic.clone(), line?.expect("Stdin closed"))),
event = swarm.next() => {
println!("New Event: {:?}", event);
None
}
.

However, I am not even 100% sure that the example is correct. Someone sending a very long line at a very slow rate might lose parts of it as data is already buffered when the event is triggered.

In general, using select! seems a bit iffy, since it requires all futures to be cancellation correct, otherweise we run into issues, a good summary of which is available at https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8.

Even if we ensure that our own logic is cancellation safe, there is no indication that libp2p's next_event future is, so if that is the case, it should at least be documented, giving a modest guarantee that that will be the case in the future as well.

Some possible fixes?

If I understand the code correctly, these are possible fixes:

  • Document (and ensure) that next_event() is cancellation-correct, similar to how tokio does it. This would enable the use of tokio::select!.
  • Make ExtendedSwarm handle mutability/queuing internally, making all methods take &self instead of &mut self instead. I don't know if this is in the cards at all, but I imagine this would solve a lot of issues for users.
  • Make the Stream implementation on ExpandedSwarm return SwarmEvents, or offer an equivalent stream. This enables manual polling.
  • Change the visibility (and document) ExpandedSwarm::poll_next_event. This function is used internally by others, but offers a way of polling without having to create a new future that borrows the swarm, also enabling manual polling.

Otherwise we are grateful for any advice and potential improvements to the documentation.

@tomaka
Copy link
Member

tomaka commented Dec 7, 2020

Document (and ensure) that next_event() is cancellation-correct

While it's not documented, this is indeed the intention.

@marc-casperlabs
Copy link
Author

While it's not documented, this is indeed the intention.

Thanks, that's good enough for us right now, I think. It'd still be nice to have it documented (I do realize the whole Rust ecosystem is lacking on that front though).

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Dec 7, 2020

Make ExtendedSwarm handle mutability/queuing internally, making all methods take &self instead of &mut self instead. I don't know if this is in the cards at all, but I imagine this would solve a lot of issues for users.

I don't think this is a good idea. If my understanding is correct then precisely the fact that poll_next_event requires a mutable reference is what makes the future returned from next_event safe to drop because all state changes can be / are directly forwarded to the underlying swarm and the future itself is stateless.

I am not sure if that would be possible if Swarm would only take &self?

@elenaf9
Copy link
Contributor

elenaf9 commented Jun 4, 2021

Change the visibility (and document) ExpandedSwarm::poll_next_event. This function is used internally by others, but offers a way of polling without having to create a new future that borrows the swarm, also enabling manual polling.

@tomaka Would it be an acceptable change to make poll_next_event public, or did you have any specific reason for making it private? I would really appreciate having the option to manually poll for the next swarm event without blocking. I currently have the use case where I share a swarm in a Mutex, and I'd like to try poll for SwarmEvents but release the lock if there are no events.

@mxinden
Copy link
Member

mxinden commented Jun 4, 2021

Would Swarm.next_event().now_or_never() conflict with the Mutex guard @elenaf9?

To reduce the complexity of Swarm I would even go as far as:

  1. Remove next_event and next.
  2. Move logic from poll_next_event into the Swarm Stream implementation, where the Stream Item is a SwarmEvent and not a TBehaviour::OutEvent.

The former next_event could then be done via StreamExt::next. The former next could then be done with a simple StreamExt::filter.

@elenaf9
Copy link
Contributor

elenaf9 commented Jun 7, 2021

@mxinden yes, now_or_never() solves my case, thanks, I somehow didn't think about that before.

I like you suggested changes for ExpandedSwarm, since I was also always a bit confused why Stream was implemented to only returning the events from the network behaviour, instead of all events in the swarm.
If this is something that you'd like to have changed, I could draft a PR?

@mxinden
Copy link
Member

mxinden commented Jun 7, 2021

If this is something that you'd like to have changed, I could draft a PR?

That would be very much appreciated @elenaf9!

@Looooong
Copy link

Looooong commented Jul 1, 2021

I'm integrating libp2p into bevy and having some troubles with it. Because bevy uses ECS, I have a system that poll swarm event every frame. However, it doesn't connect when I try to dial another swarm in localhost. I managed to reproduce the issue without bevy by polling swarm event in a loop like this:

  • libp2p 0.36.0 (compatible with bevy)
   loop {
        match swarm.next_event().now_or_never() {
            Some(event) => println!("Event: {:?}", event),
            None => (),
        }

        thread::sleep(Duration::from_secs_f32(0.01));
    }
  • libp2p 0.38.0 (latest version at the time of writing)
   loop {
        let waker = futures::task::noop_waker();
        let mut cx = Context::from_waker(&waker);

        match swarm.poll_next_unpin(&mut cx) {
            Poll::Ready(Some(event)) => println!("Event: {:?}", event),
            _ => (),
        }

        thread::sleep(Duration::from_secs_f32(0.01));
    }

I'm putting thread::sleep() in here to represent the execution of other systems in bevy. Having thread::sleep() will result in UnknownPeerUnreachableAddr event. Without thread::sleep(), it can connect and establish connection properly.

Is this an issue or am I doing something wrong with the code?

@thomaseizinger thomaseizinger closed this as not planned Won't fix, can't repro, duplicate, stale Mar 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants