Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move go-libp2p-transport-upgrader here #1463

Merged
merged 85 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
ede46e0
initial commit
Stebalien Jan 19, 2018
4822176
fix nil dereference
Stebalien Jan 20, 2018
cf38072
nicer stringers
Stebalien Jan 20, 2018
61cd627
keep accepting and negotiating connections until we have 16 ready and…
Stebalien Jan 22, 2018
a950fa7
fix comment on why we don't need to wait on the context
Stebalien Jan 22, 2018
cec0d9f
remove unused count function from threshold
Stebalien Jan 22, 2018
6b92f28
Merge pull request #2 from libp2p/feat/accept-queue
Stebalien Jan 22, 2018
be4a5f4
fix off-by-one error in the threshold
marten-seemann Jan 23, 2018
90789da
add tests for the listener
marten-seemann Jan 23, 2018
516978c
wait after accepting a new connection if the queue is full
marten-seemann Jan 23, 2018
15e5327
Merge pull request #6 from marten-seemann/better-threshold-usage
Stebalien Jan 24, 2018
0212611
Merge pull request #3 from marten-seemann/fix-threshold-off-by-one-error
Stebalien Jan 26, 2018
1317fb9
complete rename
Stebalien Jan 29, 2018
a08397a
Merge pull request #5 from marten-seemann/listener-tests
Stebalien Jan 29, 2018
dc4b4ae
fix tests for rename
Stebalien Jan 29, 2018
0db3c0a
fix for interface move
Stebalien Feb 15, 2018
6055448
fix for moved insecure security transport
Stebalien Feb 15, 2018
6fbbb67
check if connection is closed before returning it from Accept
Stebalien Feb 16, 2018
07e87ba
ensure we force use of the libp2p protector if ForcePrivateNetwork is…
Stebalien Mar 9, 2018
32b0643
better document handleIncoming
Stebalien Mar 9, 2018
dbf3f83
remove dependency on the tcp transport
Stebalien Mar 10, 2018
66b1a47
use mplex for tests instead of yamux
Stebalien Mar 10, 2018
8ceb856
annotate errors
Stebalien Jan 9, 2019
5fc9b71
Merge pull request #11 from libp2p/fix/better-negotiation-errors
Stebalien Jan 10, 2019
e87a4e3
improve correctness of closing connections on failure
Stebalien Apr 26, 2019
f418d2c
test: cleanup after tests
Stebalien Apr 26, 2019
dc54d8a
Merge pull request #19 from libp2p/fix/close-on-err
Stebalien Apr 26, 2019
67b7b27
dep: import go-libp2p-mplex into the libp2p org
Stebalien May 22, 2019
9667b21
Merge pull request #21 from libp2p/dep/import-smux
Stebalien May 22, 2019
57d79cc
Consolidate abstractions and core types into go-libp2p-core (#28)
Stebalien May 24, 2019
b1dd953
Merge pull request #22 from libp2p/feat/consolidate-abstractions
Stebalien May 24, 2019
9bcb09e
complete the migration to consolidated types. (#23)
raulk May 26, 2019
f081071
fix an incorrect error message
Stebalien Sep 7, 2019
aabc5e4
Merge pull request #27 from libp2p/fix/error-message
Stebalien Sep 7, 2019
1c35066
use the ipnet.PSK instead of the ipnet.Protector for private networks
marten-seemann Feb 20, 2020
2267728
Merge pull request #45 from libp2p/generalize-private-network
Stebalien Mar 7, 2020
b19703d
simplify tests by removing ginkgo/gomega. (#60)
raulk May 13, 2020
59da2c1
remove leftover ginkgo test file. (#61)
raulk May 13, 2020
bf33d1e
call the connection gater when accepting connections and after crypto…
aarshkshah1992 May 15, 2020
d365120
fix int to string conversion in tests, update Go version on CI
marten-seemann Dec 17, 2020
44b29c4
Merge pull request #69 from libp2p/fix-int-to-string-conversion
Stebalien Dec 17, 2020
508be7e
pass contexts to OpenStream in tests
marten-seemann Dec 17, 2020
ebab543
Merge pull request #70 from libp2p/open-stream-context
marten-seemann Dec 19, 2020
4453d7a
Implement support for simultaneous open (#25)
vyzo Feb 17, 2021
2397d98
thread underlying conn Stat to allow transports to propagate stat inf…
vyzo Feb 4, 2021
cffe493
Merge pull request #71 from libp2p/feat/conn-stat
vyzo Feb 17, 2021
b743906
stop using the deprecated go-multiaddr-net
marten-seemann Feb 25, 2021
3a37a04
Merge pull request #72 from libp2p/dont-use-go-multiaddr-net
marten-seemann Feb 25, 2021
76f4936
don't listen on all interfaces in tests
marten-seemann Feb 25, 2021
bda39a2
Merge pull request #73 from libp2p/dont-listen-on-all-interfaces
Stebalien Mar 12, 2021
8739d56
fix staticcheck
marten-seemann Apr 23, 2021
8acc055
Merge pull request #74 from libp2p/fix-staticcheck
Stebalien Apr 23, 2021
3ac4a61
fix typo in error message
marten-seemann Jul 2, 2021
efd8047
Merge pull request #77 from libp2p/fix-typo
marten-seemann Jul 24, 2021
fc8779c
chore: update deps
marten-seemann Jul 26, 2021
8d16690
Merge pull request #78 from libp2p/update-deps
marten-seemann Jul 27, 2021
e267d49
add the peer ID to SecureInbound
marten-seemann Sep 5, 2021
e2284ac
Merge pull request #83 from libp2p/check-peer-id-on-inbound
marten-seemann Sep 8, 2021
289fcae
increase timeout in TestConnectionsClosedIfNotAccepted on CI
marten-seemann Sep 8, 2021
403b19e
Merge pull request #85 from libp2p/fix-flaky-accept-test
marten-seemann Sep 8, 2021
893c9a4
chore: update go-log
marten-seemann Sep 22, 2021
e53323d
Merge pull request #88 from libp2p/update-go-log
marten-seemann Sep 26, 2021
85de7f0
use the new network.ConnStats
marten-seemann Dec 10, 2021
aa4826e
Merge pull request #92 from libp2p/conn-stats
marten-seemann Dec 12, 2021
fe89a27
fix flaky TestAcceptQueueBacklogged test
marten-seemann Dec 12, 2021
e9a4b83
Merge pull request #96 from libp2p/fix-flaky-accept-queue-backlogged
marten-seemann Dec 12, 2021
f06d0df
reset the temporary error catcher delay after successful accept
marten-seemann Dec 20, 2021
b16a446
make the accept timeout configurable, stop using transport.AcceptTimeout
marten-seemann Dec 20, 2021
20281dd
Merge pull request #98 from libp2p/configurable-accept-timeout
marten-seemann Dec 21, 2021
c622cb0
Merge pull request #97 from libp2p/reset-temp-err-catcher
marten-seemann Dec 21, 2021
a3f424b
use the new transport.Upgrader interface
marten-seemann Jan 2, 2022
5a79888
rename the constructor from NewUpgrader to New
marten-seemann Jan 2, 2022
e8056e8
remove UpgradeInbound and UpgradeOutbound
marten-seemann Jan 2, 2022
17c6e5e
Merge pull request #100 from libp2p/upgrader-interface
marten-seemann Jan 4, 2022
8f6a3dc
rename the package to upgrader
marten-seemann Jan 8, 2022
873efba
Merge pull request #101 from libp2p/rename-package
marten-seemann Jan 8, 2022
ccf4315
use the Resource Manager
marten-seemann Dec 22, 2021
8d193b3
Merge pull request #99 from libp2p/rcmgr
marten-seemann Jan 18, 2022
67fe765
always set the peer if the peer scope is null
vyzo Feb 2, 2022
191278d
fix tests
vyzo Feb 2, 2022
f75dfe1
more robust nil check
vyzo Feb 2, 2022
8350e75
avoid reflection
vyzo Feb 3, 2022
b6118ea
Merge pull request #104 from libp2p/fix/nil-peer-scope
vyzo Feb 3, 2022
981c9cf
move go-libp2p-transport-upgrader here
marten-seemann Apr 26, 2022
9dc18ed
switch from github.com/libp2p/go-libp2p-transport-upgrader to p2p/net…
marten-seemann Apr 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

tptu "github.com/libp2p/go-libp2p-transport-upgrader"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log/v2 v2.5.1
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/klauspost/compress v1.15.1
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1
Expand All @@ -22,10 +23,10 @@ require (
github.com/libp2p/go-libp2p-nat v0.1.0
github.com/libp2p/go-libp2p-noise v0.4.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-pnet v0.2.0
github.com/libp2p/go-libp2p-resource-manager v0.2.1
github.com/libp2p/go-libp2p-testing v0.9.2
github.com/libp2p/go-libp2p-tls v0.4.1
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1
github.com/libp2p/go-mplex v0.7.0
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
Expand Down Expand Up @@ -74,16 +75,15 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.0.3 // indirect
github.com/libp2p/go-libp2p-blankhost v0.3.0 // indirect
github.com/libp2p/go-libp2p-pnet v0.2.0 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.17.0 // indirect
github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 // indirect
github.com/libp2p/go-libp2p-yamux v0.9.1 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsb
github.com/libp2p/go-libp2p-core v0.15.1 h1:0RY+Mi/ARK9DgG1g9xVQLb8dDaaU8tCePMtGALEfBnM=
github.com/libp2p/go-libp2p-core v0.15.1/go.mod h1:agSaboYM4hzB1cWekgVReqV5M4g5M+2eNNejV+1EEhs=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-mplex v0.5.0 h1:vt3k4E4HSND9XH4Z8rUpacPJFSAgLOv6HDvG8W9Ks9E=
github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M=
github.com/libp2p/go-libp2p-nat v0.1.0 h1:vigUi2MEN+fwghe5ijpScxtbbDz+L/6y8XwlzYOJgSY=
github.com/libp2p/go-libp2p-nat v0.1.0/go.mod h1:DQzAG+QbDYjN1/C3B6vXucLtz3u9rEonLVPtZVzQqks=
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

Expand All @@ -19,7 +20,6 @@ import (

"github.com/libp2p/go-libp2p-peerstore/pstoremem"
tnet "github.com/libp2p/go-libp2p-testing/net"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
msmux "github.com/libp2p/go-stream-muxer-multistream"
ma "github.com/multiformats/go-multiaddr"

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

Expand All @@ -22,7 +23,6 @@ import (

"github.com/libp2p/go-libp2p-peerstore/pstoremem"
tnet "github.com/libp2p/go-libp2p-testing/net"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
msmux "github.com/libp2p/go-stream-muxer-multistream"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
Expand Down
52 changes: 52 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package upgrader

import (
"fmt"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/transport"
)

type transportConn struct {
mux.MuxedConn
network.ConnMultiaddrs
network.ConnSecurity
transport transport.Transport
scope network.ConnManagementScope
stat network.ConnStats
}

var _ transport.CapableConn = &transportConn{}

func (t *transportConn) Transport() transport.Transport {
return t.transport
}

func (t *transportConn) String() string {
ts := ""
if s, ok := t.transport.(fmt.Stringer); ok {
ts = "[" + s.String() + "]"
}
return fmt.Sprintf(
"<stream.Conn%s %s (%s) <-> %s (%s)>",
ts,
t.LocalMultiaddr(),
t.LocalPeer(),
t.RemoteMultiaddr(),
t.RemotePeer(),
)
}

func (t *transportConn) Stat() network.ConnStats {
return t.stat
}

func (t *transportConn) Scope() network.ConnScope {
return t.scope
}

func (t *transportConn) Close() error {
defer t.scope.Done()
return t.MuxedConn.Close()
}
60 changes: 60 additions & 0 deletions p2p/net/upgrader/gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package upgrader_test

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type testGater struct {
sync.Mutex

blockAccept, blockSecured bool
}

var _ connmgr.ConnectionGater = (*testGater)(nil)

func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()

t.blockAccept = block
}

func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()

t.blockSecured = block
}

func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockAccept
}

func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockSecured
}

func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
178 changes: 178 additions & 0 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package upgrader

import (
"context"
"fmt"
"sync"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/transport"

logging "github.com/ipfs/go-log/v2"
tec "github.com/jbenet/go-temp-err-catcher"
manet "github.com/multiformats/go-multiaddr/net"
)

var log = logging.Logger("upgrader")

type listener struct {
manet.Listener

transport transport.Transport
upgrader *upgrader
rcmgr network.ResourceManager

incoming chan transport.CapableConn
err error

// Used for backpressure
threshold *threshold

// Canceling this context isn't sufficient to tear down the listener.
// Call close.
ctx context.Context
cancel func()
}

// Close closes the listener.
func (l *listener) Close() error {
// Do this first to try to get any relevent errors.
err := l.Listener.Close()

l.cancel()
// Drain and wait.
for c := range l.incoming {
c.Close()
}
return err
}

// handles inbound connections.
//
// This function does a few interesting things that should be noted:
//
// 1. It logs and discards temporary/transient errors (errors with a Temporary()
// function that returns true).
// 2. It stops accepting new connections once AcceptQueueLength connections have
// been fully negotiated but not accepted. This gives us a basic backpressure
// mechanism while still allowing us to negotiate connections in parallel.
func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
// make sure we're closed
l.Listener.Close()
if l.err == nil {
l.err = fmt.Errorf("listener closed")
}

wg.Wait()
close(l.incoming)
}()

var catcher tec.TempErrCatcher
for l.ctx.Err() == nil {
maconn, err := l.Listener.Accept()
if err != nil {
// Note: function may pause the accept loop.
if catcher.IsTemporary(err) {
log.Infof("temporary accept error: %s", err)
continue
}
l.err = err
return
}
catcher.Reset()

// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true)
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
}
continue
}

// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()

log.Debugf("listener %s got connection: %s <---> %s",
l,
maconn.LocalMultiaddr(),
maconn.RemoteMultiaddr())

wg.Add(1)
go func() {
defer wg.Done()

ctx, cancel := context.WithTimeout(l.ctx, l.upgrader.acceptTimeout)
defer cancel()

conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)
if err != nil {
// Don't bother bubbling this up. We just failed
// to completely negotiate the connection.
log.Debugf("accept upgrade error: %s (%s <--> %s)",
err,
maconn.LocalMultiaddr(),
maconn.RemoteMultiaddr())
connScope.Done()
return
}

log.Debugf("listener %s accepted connection: %s", l, conn)

// This records the fact that the connection has been
// setup and is waiting to be accepted. This call
// *never* blocks, even if we go over the threshold. It
// simply ensures that calls to Wait block while we're
// over the threshold.
l.threshold.Acquire()
defer l.threshold.Release()

select {
case l.incoming <- conn:
case <-ctx.Done():
if l.ctx.Err() == nil {
// Listener *not* closed but the accept timeout expired.
log.Warn("listener dropped connection due to slow accept")
}
// Wait on the context with a timeout. This way,
// if we stop accepting connections for some reason,
// we'll eventually close all the open ones
// instead of hanging onto them.
conn.Close()
}
}()
}
}

// Accept accepts a connection.
func (l *listener) Accept() (transport.CapableConn, error) {
for c := range l.incoming {
// Could have been sitting there for a while.
if !c.IsClosed() {
return c, nil
}
}
return nil, l.err
}

func (l *listener) String() string {
if s, ok := l.transport.(fmt.Stringer); ok {
return fmt.Sprintf("<stream.Listener[%s] %s>", s, l.Multiaddr())
}
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

var _ transport.Listener = (*listener)(nil)
Loading