Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update go-libp2p-core, remove stream methods from network.Notifiee #1521

Merged
merged 1 commit into from
May 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 28 additions & 31 deletions go.mod
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ require (
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p-asn-util v0.2.0
github.com/libp2p/go-libp2p-circuit v0.6.0
github.com/libp2p/go-libp2p-core v0.15.1
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-resource-manager v0.3.0
github.com/libp2p/go-libp2p-testing v0.9.2
@@ -55,70 +55,67 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/containerd/cgroups v1.0.3 // indirect
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/coreos/go-systemd/v22 v22.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/elastic/gosigar v0.12.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/godbus/dbus/v5 v5.0.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.0.3 // indirect
github.com/libp2p/go-libp2p-blankhost v0.3.0 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.17.0 // indirect
github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect
github.com/libp2p/go-libp2p-tls v0.4.1 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 // indirect
github.com/libp2p/go-libp2p-yamux v0.9.1 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/libp2p/go-tcp-transport v0.5.1 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.49 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/raulk/clock v1.1.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
google.golang.org/grpc v1.45.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.31.1 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
lukechampine.com/blake3 v1.1.7 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
)
181 changes: 43 additions & 138 deletions go.sum

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions p2p/host/autonat/notify.go
Original file line number Diff line number Diff line change
@@ -15,12 +15,6 @@ func (as *AmbientAutoNAT) Listen(net network.Network, a ma.Multiaddr) {}
// ListenClose is part of the network.Notifiee interface
func (as *AmbientAutoNAT) ListenClose(net network.Network, a ma.Multiaddr) {}

// OpenedStream is part of the network.Notifiee interface
func (as *AmbientAutoNAT) OpenedStream(net network.Network, s network.Stream) {}

// ClosedStream is part of the network.Notifiee interface
func (as *AmbientAutoNAT) ClosedStream(net network.Network, s network.Stream) {}

// Connected is part of the network.Notifiee interface
func (as *AmbientAutoNAT) Connected(net network.Network, c network.Conn) {
if c.Stat().Direction == network.DirInbound &&
6 changes: 2 additions & 4 deletions p2p/host/basic/natmgr.go
Original file line number Diff line number Diff line change
@@ -228,7 +228,5 @@ func (nn *nmgrNetNotifiee) ListenClose(n network.Network, addr ma.Multiaddr) {
nn.natManager().sync()
}

func (nn *nmgrNetNotifiee) Connected(network.Network, network.Conn) {}
func (nn *nmgrNetNotifiee) Disconnected(network.Network, network.Conn) {}
func (nn *nmgrNetNotifiee) OpenedStream(network.Network, network.Stream) {}
func (nn *nmgrNetNotifiee) ClosedStream(network.Network, network.Stream) {}
func (nn *nmgrNetNotifiee) Connected(network.Network, network.Conn) {}
func (nn *nmgrNetNotifiee) Disconnected(network.Network, network.Conn) {}
6 changes: 2 additions & 4 deletions p2p/host/basic/peer_connectedness.go
Original file line number Diff line number Diff line change
@@ -26,10 +26,8 @@ func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
}
}

func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}

func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
6 changes: 2 additions & 4 deletions p2p/host/blank/peer_connectedness.go
Original file line number Diff line number Diff line change
@@ -26,10 +26,8 @@ func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
}
}

func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}

func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
6 changes: 0 additions & 6 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
@@ -686,9 +686,3 @@ func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}

// ListenClose is no-op in this implementation.
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}

// OpenedStream is no-op in this implementation.
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}

// ClosedStream is no-op in this implementation.
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}
19 changes: 3 additions & 16 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
@@ -97,33 +97,20 @@ func (c *conn) teardown() error {

func (c *conn) addStream(s *stream) {
c.Lock()
defer c.Unlock()
s.conn = c
c.streams.PushBack(s)
s.notifLk.Lock()
defer s.notifLk.Unlock()
c.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.OpenedStream(c.net, s)
})
}

func (c *conn) removeStream(s *stream) {
c.Lock()
defer c.Unlock()
for e := c.streams.Front(); e != nil; e = e.Next() {
if s == e.Value {
c.streams.Remove(e)
break
return
}
}
c.Unlock()

go func() {
s.notifLk.Lock()
defer s.notifLk.Unlock()
s.conn.net.notifyAll(func(n network.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
}()
}

func (c *conn) allStreams() []network.Stream {
25 changes: 0 additions & 25 deletions p2p/net/mock/mock_notif_test.go
Original file line number Diff line number Diff line change
@@ -203,28 +203,3 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {
nn.streamState.Lock()
defer nn.streamState.Unlock()
_, ok := nn.streamState.m[s]
if ok {
nn.t.Error("duplicate stream open")
return
}
nn.streamState.m[s] = make(chan struct{})
}
func (nn *netNotifiee) ClosedStream(n network.Network, s network.Stream) {
nn.streamState.Lock()
defer nn.streamState.Unlock()
ch, ok := nn.streamState.m[s]
if !ok {
nn.t.Error("saw close event but no open event")
return
}
select {
case <-ch:
nn.t.Error("duplicate close event")
default:
close(ch)
}
}
5 changes: 1 addition & 4 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"io"
"net"
"strconv"
"sync"
"sync/atomic"
"time"

@@ -18,8 +17,6 @@ var streamCounter int64

// stream implements network.Stream
type stream struct {
notifLk sync.Mutex

rstream *stream
conn *conn
id int64
@@ -38,7 +35,7 @@ type stream struct {
stat network.Stats
}

var ErrClosed error = errors.New("stream closed")
var ErrClosed = errors.New("stream closed")

type transportObject struct {
msg []byte
10 changes: 0 additions & 10 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
@@ -233,17 +233,7 @@ func (c *Conn) addStream(ts network.MuxedStream, dir network.Direction, scope ne
// firing (in Swarm.remove).
c.swarm.refs.Add(1)

// Take the notification lock before releasing the streams lock to block
// StreamClose notifications until after the StreamOpen notifications
// done.
s.notifyLk.Lock()
c.streams.Unlock()

c.swarm.notifyAll(func(f network.Notifiee) {
f.OpenedStream(c.swarm, s)
})
s.notifyLk.Unlock()

return s, nil
}

59 changes: 0 additions & 59 deletions p2p/net/swarm/swarm_notif_test.go
Original file line number Diff line number Diff line change
@@ -107,55 +107,6 @@ func TestNotifications(t *testing.T) {
return nil, nil, nil
}

testOCStream := func(n *netNotifiee, s network.Stream) {
var s2 network.Stream
select {
case s2 = <-n.openedStream:
t.Log("got notif for opened stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != s2 {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}

select {
case s2 = <-n.closedStream:
t.Log("got notif for closed stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if s != s2 {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}
}

streams := make(chan network.Stream)
for _, s := range swarms {
s.SetStreamHandler(func(s network.Stream) {
streams <- s
s.Reset()
})
}

// open a streams in each conn
for i, s := range swarms {
for _, c := range s.Conns() {
_, n2, _ := complement(c)

st1, err := c.NewStream(context.Background())
if err != nil {
t.Error(err)
} else {
st1.Write([]byte("hello"))
st1.Reset()
testOCStream(notifiees[i], st1)
st2 := <-streams
testOCStream(n2, st2)
}
}
}

// close conns
for i, s := range swarms {
n := notifiees[i]
@@ -191,8 +142,6 @@ type netNotifiee struct {
listenClose chan ma.Multiaddr
connected chan network.Conn
disconnected chan network.Conn
openedStream chan network.Stream
closedStream chan network.Stream
}

func newNetNotifiee(buffer int) *netNotifiee {
@@ -201,8 +150,6 @@ func newNetNotifiee(buffer int) *netNotifiee {
listenClose: make(chan ma.Multiaddr, buffer),
connected: make(chan network.Conn, buffer),
disconnected: make(chan network.Conn, buffer),
openedStream: make(chan network.Stream, buffer),
closedStream: make(chan network.Stream, buffer),
}
}

@@ -218,9 +165,3 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {
nn.openedStream <- v
}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {
nn.closedStream <- v
}
19 changes: 3 additions & 16 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,6 @@ type Stream struct {

closeOnce sync.Once

notifyLk sync.Mutex

protocol atomic.Value

stat network.Stats
@@ -90,33 +88,22 @@ func (s *Stream) Reset() error {
return err
}

// Close closes the stream for writing, flushing all data and sending an EOF.
// CloseWrite closes the stream for writing, flushing all data and sending an EOF.
// This function does not free resources, call Close or Reset when done with the
// stream.
func (s *Stream) CloseWrite() error {
return s.stream.CloseWrite()
}

// Close closes the stream for reading. This function does not free resources,
// CloseRead closes the stream for reading. This function does not free resources,
// call Close or Reset when done with the stream.
func (s *Stream) CloseRead() error {
return s.stream.CloseRead()
}

func (s *Stream) remove() {
s.conn.removeStream(s)

// We *must* do this in a goroutine. This can be called during a
// an open notification and will block until that notification is done.
go func() {
s.notifyLk.Lock()
defer s.notifyLk.Unlock()

s.conn.swarm.notifyAll(func(f network.Notifiee) {
f.ClosedStream(s.conn.swarm, s)
})
s.conn.swarm.refs.Done()
}()
s.conn.swarm.refs.Done()
}

// Protocol returns the protocol negotiated on this stream (if set).
8 changes: 3 additions & 5 deletions p2p/protocol/holepunch/holepuncher.go
Original file line number Diff line number Diff line change
@@ -268,8 +268,6 @@ func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
}
}

func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {}
func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
6 changes: 2 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
@@ -814,7 +814,5 @@ func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
}
}

func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
2 changes: 0 additions & 2 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
@@ -588,5 +588,3 @@ func (on *obsAddrNotifiee) Connected(n network.Network, v network.Conn) {}
func (on *obsAddrNotifiee) Disconnected(n network.Network, v network.Conn) {
(*ObservedAddrManager)(on).removeConn(v)
}
func (on *obsAddrNotifiee) OpenedStream(n network.Network, s network.Stream) {}
func (on *obsAddrNotifiee) ClosedStream(n network.Network, s network.Stream) {}