Skip to content

Commit

Permalink
Fixing for inbound substream to close sooner following AddProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Feb 15, 2023
1 parent 5b1a59c commit 1f73380
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = "0.3.26"
log = "0.4"
libp2p-core = { version = "0.39.0", path = "../../core" }
libp2p-swarm = { version = "0.42.0", path = "../../swarm" }
parking_lot = "0.12.0"
prost = "0.11"
rand = "0.8"
sha2 = "0.10.0"
Expand Down
4 changes: 2 additions & 2 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ where
&mut self,
key: record::Key,
provider: KadPeer,
guard: InboundStreamEventGuard,
guard: Arc<InboundStreamEventGuard>,
) {
if &provider.node_id != self.kbuckets.local_key().preimage() {
let record = ProviderRecord {
Expand Down Expand Up @@ -1777,7 +1777,7 @@ where
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider {
record: Some(record),
guard: Some(Arc::new(guard)),
guard: Some(guard),
},
},
));
Expand Down
54 changes: 31 additions & 23 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::trace;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
Expand Down Expand Up @@ -178,13 +179,14 @@ enum OutboundSubstreamState<TUserData> {
#[derive(Debug)]
pub struct InboundStreamEventGuard {
ready: Arc<AtomicBool>,
waker: Option<Waker>,
waker: Mutex<Option<Waker>>,
}

impl Drop for InboundStreamEventGuard {
fn drop(&mut self) {
self.ready.store(true, Ordering::Release);
self.waker
.lock()
.take()
.expect("Only called once in Drop impl")
.wake();
Expand All @@ -206,9 +208,9 @@ enum InboundSubstreamState<TUserData> {
KadInStreamSink<NegotiatedSubstream>,
Option<Waker>,
),
PendingStateTransition {
ready: Arc<AtomicBool>,
next_state: Box<InboundSubstreamState<TUserData>>,
PendingProcessing {
weak_guard: Weak<InboundStreamEventGuard>,
substream: KadInStreamSink<NegotiatedSubstream>,
},
/// Waiting to send an answer back to the remote.
PendingSend(
Expand Down Expand Up @@ -268,15 +270,12 @@ impl<TUserData> InboundSubstreamState<TUserData> {
) {
InboundSubstreamState::WaitingMessage { substream, .. }
| InboundSubstreamState::WaitingBehaviour(_, substream, _)
| InboundSubstreamState::PendingProcessing { substream, .. }
| InboundSubstreamState::PendingSend(_, substream, _)
| InboundSubstreamState::PendingFlush(_, substream)
| InboundSubstreamState::Closing(substream) => {
*self = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::PendingStateTransition { next_state, .. } => {
*self = *next_state;
self.close();
}
InboundSubstreamState::Cancelled => {
*self = InboundSubstreamState::Cancelled;
}
Expand Down Expand Up @@ -348,7 +347,7 @@ pub enum KademliaHandlerEvent<TUserData> {
/// The peer that is the provider of the value for `key`.
provider: KadPeer,
/// Guard corresponding to inbound stream that generated this event.
guard: InboundStreamEventGuard,
guard: Arc<InboundStreamEventGuard>,
},

/// Request to get a value from the dht records
Expand Down Expand Up @@ -1077,16 +1076,14 @@ where
}
Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => {
let ready = Arc::new(AtomicBool::new(false));
let guard = InboundStreamEventGuard {
ready: ready.clone(),
waker: Some(cx.waker().clone()),
};
let next_state = Box::new(InboundSubstreamState::WaitingMessage {
first: false,
connection_id,
substream,
let guard = Arc::new(InboundStreamEventGuard {
ready,
waker: Mutex::new(Some(cx.waker().clone())),
});
*this = InboundSubstreamState::PendingStateTransition { ready, next_state };
*this = InboundSubstreamState::PendingProcessing {
weak_guard: Arc::downgrade(&guard),
substream,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
KademliaHandlerEvent::AddProvider {
Expand Down Expand Up @@ -1145,11 +1142,22 @@ where

return Poll::Pending;
}
InboundSubstreamState::PendingStateTransition { ready, next_state } => {
*this = if ready.load(Ordering::Acquire) {
*next_state
InboundSubstreamState::PendingProcessing {
weak_guard,
substream,
} => {
*this = if let Some(guard) = weak_guard.upgrade() {
let old_waker = guard.waker.lock().replace(cx.waker().clone());
if old_waker.is_none() || guard.ready.load(Ordering::Acquire) {
return Poll::Ready(None);
} else {
InboundSubstreamState::PendingProcessing {
weak_guard,
substream,
}
}
} else {
InboundSubstreamState::PendingStateTransition { ready, next_state }
return Poll::Ready(None);
};

return Poll::Pending;
Expand Down

0 comments on commit 1f73380

Please sign in to comment.