Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): Avoid inifinty loop during transport/listener Accept #3662

Merged
merged 12 commits into from
Feb 5, 2025
73 changes: 37 additions & 36 deletions tm2/pkg/p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -622,50 +623,50 @@
// and persisting them
func (sw *MultiplexSwitch) runAcceptLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
sw.Logger.Debug("switch context close received")
p, err := sw.transport.Accept(ctx, sw.peerBehavior)

return
switch {
case err == nil: // ok
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
// Upper context as been canceled/timeout
sw.Logger.Debug("switch context close received")
return // exit
case errors.As(err, &errTransportClosed):
// Underlaying transport as been closed
sw.Logger.Warn("cannot accept connection on closed transport, exiting")
return // exit
default:
p, err := sw.transport.Accept(ctx, sw.peerBehavior)
if err != nil {
sw.Logger.Error(
"error encountered during peer connection accept",
"err", err,
)
// An error occurred during accept, report and continue
sw.Logger.Error("error encountered during peer connection accept", "err", err)
continue

Check warning on line 641 in tm2/pkg/p2p/switch.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/switch.go#L639-L641

Added lines #L639 - L641 were not covered by tests
}

continue
}
// Ignore connection if we already have enough peers.
if in := sw.Peers().NumInbound(); in >= sw.maxInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.maxInboundPeers,
)

// Ignore connection if we already have enough peers.
if in := sw.Peers().NumInbound(); in >= sw.maxInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.maxInboundPeers,
)
sw.transport.Remove(p)
continue
}

sw.transport.Remove(p)
// There are open peer slots, add peers
if err := sw.addPeer(p); err != nil {
sw.transport.Remove(p)

continue
if p.IsRunning() {
_ = p.Stop()
}

// There are open peer slots, add peers
if err := sw.addPeer(p); err != nil {
sw.transport.Remove(p)

if p.IsRunning() {
_ = p.Stop()
}

sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions tm2/pkg/p2p/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,34 @@ func TestCalculateBackoff(t *testing.T) {
}
})
}

func TestSwitchAcceptLoopTransportClosed(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var transportClosed bool
mockTransport := &mockTransport{
acceptFn: func(context.Context, PeerBehavior) (PeerConn, error) {
transportClosed = true
return nil, errTransportClosed
},
}

sw := NewMultiplexSwitch(mockTransport)

// Run the accept loop
done := make(chan struct{})
go func() {
sw.runAcceptLoop(ctx)
close(done) // signal that accept loop as ended
}()

select {
case <-time.After(time.Second * 2):
require.FailNow(t, "timeout while waiting for running loop to stop")
case <-done:
assert.True(t, transportClosed)
}
}
102 changes: 44 additions & 58 deletions tm2/pkg/p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
goerrors "errors"
"fmt"
"io"
"log/slog"
Expand All @@ -22,7 +23,6 @@

var (
errTransportClosed = errors.New("transport is closed")
errTransportInactive = errors.New("transport is inactive")
errDuplicateConnection = errors.New("duplicate peer connection")
errPeerIDNodeInfoMismatch = errors.New("connection ID does not match node info ID")
errPeerIDDialMismatch = errors.New("connection ID does not match dialed ID")
Expand Down Expand Up @@ -75,7 +75,10 @@
mConfig conn.MConnConfig,
logger *slog.Logger,
) *MultiplexTransport {
ctx, cancel := context.WithCancel(context.Background())
gfanton marked this conversation as resolved.
Show resolved Hide resolved
return &MultiplexTransport{
ctx: ctx,
cancelFn: cancel,
peerCh: make(chan peerInfo, 1),
mConfig: mConfig,
nodeInfo: nodeInfo,
Expand All @@ -92,12 +95,6 @@

// Accept waits for a verified inbound Peer to connect, and returns it [BLOCKING]
func (mt *MultiplexTransport) Accept(ctx context.Context, behavior PeerBehavior) (PeerConn, error) {
// Sanity check, no need to wait
// on an inactive transport
if mt.listener == nil {
return nil, errTransportInactive
}

select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -142,44 +139,35 @@
}

mt.cancelFn()

return mt.listener.Close()
}

// Listen starts an active process of listening for incoming connections [NON-BLOCKING]
func (mt *MultiplexTransport) Listen(addr types.NetAddress) (rerr error) {
func (mt *MultiplexTransport) Listen(addr types.NetAddress) error {
// Reserve a port, and start listening
ln, err := net.Listen("tcp", addr.DialString())
if err != nil {
return fmt.Errorf("unable to listen on address, %w", err)
}

defer func() {
if rerr != nil {
ln.Close()
}
}()

if addr.Port == 0 {
// net.Listen on port 0 means the kernel will auto-allocate a port
// - find out which one has been given to us.
tcpAddr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
ln.Close()

Check warning on line 158 in tm2/pkg/p2p/transport.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/transport.go#L158

Added line #L158 was not covered by tests
gfanton marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error finding port (after listening on port 0): %w", err)
}

addr.Port = uint16(tcpAddr.Port)
}

// Set up the context
mt.ctx, mt.cancelFn = context.WithCancel(context.Background())

mt.netAddr = addr
mt.listener = ln

// Run the routine for accepting
// incoming peer connections
go mt.runAcceptLoop()
go mt.runAcceptLoop(mt.ctx)

return nil
}
Expand All @@ -189,60 +177,58 @@
// 1. accepted by the transport
// 2. filtered
// 3. upgraded (handshaked + verified)
func (mt *MultiplexTransport) runAcceptLoop() {
func (mt *MultiplexTransport) runAcceptLoop(ctx context.Context) {
var wg sync.WaitGroup

defer func() {
wg.Wait() // Wait for all process routines

close(mt.peerCh)
}()

for {
select {
case <-mt.ctx.Done():
mt.logger.Debug("transport accept context closed")
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel sub-connection process

return
for {
// Accept an incoming peer connection
c, err := mt.listener.Accept()

switch {
case err == nil: // ok
case goerrors.Is(err, net.ErrClosed):
// Listener has been closed, this is not recoverable.
mt.logger.Debug("listener has been closed")
return // exit
default:
// Accept an incoming peer connection
c, err := mt.listener.Accept()
// An error occurred during accept, report and continue
mt.logger.Warn("accept p2p connection error", "err", err)
continue

Check warning on line 203 in tm2/pkg/p2p/transport.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/transport.go#L201-L203

Added lines #L201 - L203 were not covered by tests
}

// Process the new connection asynchronously
wg.Add(1)

go func(c net.Conn) {
defer wg.Done()

info, err := mt.processConn(c, "")
if err != nil {
mt.logger.Error(
"unable to accept p2p connection",
"unable to process p2p connection",
"err", err,
)

continue
}

// Process the new connection asynchronously
wg.Add(1)
// Close the connection
_ = c.Close()

go func(c net.Conn) {
defer wg.Done()

info, err := mt.processConn(c, "")
if err != nil {
mt.logger.Error(
"unable to process p2p connection",
"err", err,
)

// Close the connection
_ = c.Close()

return
}
return
}

select {
case mt.peerCh <- info:
case <-mt.ctx.Done():
// Give up if the transport was closed.
_ = c.Close()
}
}(c)
}
select {
case mt.peerCh <- info:
case <-ctx.Done():
// Give up if the transport was closed.
_ = c.Close()

Check warning on line 229 in tm2/pkg/p2p/transport.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/transport.go#L227-L229

Added lines #L227 - L229 were not covered by tests
}
}(c)
}
}

Expand Down
10 changes: 4 additions & 6 deletions tm2/pkg/p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ func TestMultiplexTransport_Accept(t *testing.T) {

transport := NewMultiplexTransport(ni, nk, mCfg, logger)

p, err := transport.Accept(context.Background(), nil)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

p, err := transport.Accept(ctx, nil)
assert.Nil(t, p)
assert.ErrorIs(
t,
err,
errTransportInactive,
)
assert.ErrorIs(t, err, context.DeadlineExceeded)
gfanton marked this conversation as resolved.
Show resolved Hide resolved
})

t.Run("transport closed", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (bs *BaseService) OnStart() error { return nil }
func (bs *BaseService) Stop() error {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
if atomic.LoadUint32(&bs.started) == 0 {
bs.Logger.Error(fmt.Sprintf("Not stopping %v -- have not been started yet", bs.name), "impl", bs.impl)
bs.Logger.Warn(fmt.Sprintf("Not stopping %v -- have not been started yet", bs.name), "impl", bs.impl)
// revert flag
atomic.StoreUint32(&bs.stopped, 0)
return ErrNotStarted
Expand Down
Loading