Skip to content

Commit

Permalink
Check opened streams periodically
Browse files Browse the repository at this point in the history
Fixes issue #10

Pull request libp2p/go-libp2p-core#250 removed the OpenStream notification. Punchr uses the notification here: https://github.com/dennis-tra/punchr/blob/c4b2fcf93e63ae4a8656864a399638c0c9641372/cmd/client/host.go#L372
  • Loading branch information
dennis-tra committed Jun 14, 2022
1 parent d029589 commit 832cf47
Showing 1 changed file with 37 additions and 67 deletions.
104 changes: 37 additions & 67 deletions cmd/client/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

var (
Version = "0.2.0"
CommunicationTimeout = 15 * time.Second
RetryCount = 3
)
Expand All @@ -33,21 +34,19 @@ type Host struct {
host.Host

holePunchEventsPeers sync.Map
streamOpenPeers sync.Map
}

func InitHost(ctx context.Context, privKey crypto.PrivKey) (*Host, error) {
log.Info("Starting libp2p host...")

h := &Host{
holePunchEventsPeers: sync.Map{},
streamOpenPeers: sync.Map{},
}

// Configure new libp2p host
libp2pHost, err := libp2p.New(
libp2p.Identity(privKey),
libp2p.UserAgent("punchr/go-client/0.1.0"),
libp2p.UserAgent("punchr/go-client/"+Version),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic"),
libp2p.ListenAddrStrings("/ip6/::/tcp/0"),
Expand Down Expand Up @@ -184,13 +183,13 @@ func (h *Host) HolePunch(ctx context.Context, addrInfo peer.AddrInfo) *HolePunch
for i := 0; i < RetryCount; i++ {
// wait for the DCUtR stream to be opened
select {
case <-h.WaitForDCUtRStream(addrInfo.ID):
// pass
case <-time.After(CommunicationTimeout):
// Stream was not opened in time by the remote.
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM
hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String()
return hpState
case _, ok := <-h.WaitForDCUtRStream(addrInfo.ID):
if !ok {
// Stream was not opened in time by the remote.
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM
hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String()
return hpState
}
case <-ctx.Done():
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_CANCELLED
hpState.Error = ctx.Err().Error()
Expand Down Expand Up @@ -274,25 +273,39 @@ func (hps HolePunchState) TrackHolePunch(ctx context.Context, remoteID peer.ID,
}

func (h *Host) WaitForDCUtRStream(pid peer.ID) <-chan struct{} {
evtChan := make(chan struct{})
h.streamOpenPeers.Store(pid, evtChan)
dcutrOpenedChan := make(chan struct{})

// Exit early if the DCUtR stream is already open
for _, conn := range h.Network().ConnsToPeer(pid) {
for _, stream := range conn.GetStreams() {
if stream.Protocol() == holepunch.Protocol {
// If not found, it was already closed by the open stream handler
if _, found := h.streamOpenPeers.LoadAndDelete(pid); !found {
return evtChan
// The following go routine is a hack. We want to be notified as soon as the remote peer has opened a dcutr stream.
// go-libp2p v0.20.0 has removed the OpenedStream notification (which didn't really work anyway). Now we check
// every 10 ms all streams on all connections for the /libp2p/dcutr stream. If
go func() {
timeout := time.After(CommunicationTimeout)
timer := time.NewTimer(0)
for {
select {
case <-timeout:
close(dcutrOpenedChan)
return
case <-timer.C:
}

for _, conn := range h.Network().ConnsToPeer(pid) {
for _, stream := range conn.GetStreams() {
if stream.Protocol() != holepunch.Protocol {
continue
}
h.logEntry(pid).Debugln("/libp2p/dcutr stream opened!")
dcutrOpenedChan <- struct{}{}
close(dcutrOpenedChan)
return
}
close(evtChan)
return evtChan
}
timer.Reset(10 * time.Millisecond)
}
}
}()

h.logEntry(pid).Infoln("Waiting for /libp2p/dcutr stream...")
return evtChan
return dcutrOpenedChan
}

func (h *Host) RegisterPeerToTrace(pid peer.ID) <-chan *holepunch.Event {
Expand All @@ -312,7 +325,7 @@ func (h *Host) UnregisterPeerToTrace(pid peer.ID) {
for {
select {
case evt := <-evtChan:
h.logEntry(pid).WithField("evtType", evt.Type).Infoln("Draining event channel")
h.logEntry(pid).WithField("evtType", evt.Type).Warnln("Draining event channel")
default:
close(evtChan)
return
Expand Down Expand Up @@ -362,46 +375,3 @@ func (h *Host) Listen(network.Network, multiaddr.Multiaddr) {}
func (h *Host) ListenClose(network.Network, multiaddr.Multiaddr) {}
func (h *Host) Connected(network.Network, network.Conn) {}
func (h *Host) Disconnected(network.Network, network.Conn) {}
func (h *Host) ClosedStream(_ network.Network, stream network.Stream) {
if stream.Protocol() != holepunch.Protocol {
return
}
h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream closed!")
}

func (h *Host) OpenedStream(_ network.Network, stream network.Stream) {
// The following is a hack. `stream` does not have the `Protocol` field set yet. So we just check
// every 5 ms for a total of 15 s.
go func() {
timeout := time.After(CommunicationTimeout)
timer := time.NewTimer(0)
for {

select {
case <-timeout:
return
case <-timer.C:
}

if stream.Protocol() == "" {
timer.Reset(5 * time.Millisecond)
continue
}

if stream.Protocol() != holepunch.Protocol {
return
}

break

}

val, found := h.streamOpenPeers.LoadAndDelete(stream.Conn().RemotePeer())
if !found {
return
}

h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream opened!")
close(val.(chan struct{}))
}()
}

0 comments on commit 832cf47

Please sign in to comment.