From 883bd8adffa2f5437c82fc76ba6047c85b8a5de8 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Thu, 19 May 2022 15:27:44 -0400 Subject: [PATCH] feat: log when resource manager limits are exceeded (#8980) This periodically logs how many times Resource Manager limits were exceeded. If they aren't exceeded, then nothing is logged. The log levels are at ERROR log level so that they are shown by default. The motivation is so that users know when they have exceeded resource manager limits. To find what is exceeding the limits, they'll need to turn on debug logging and inspect the errors being logged. This could collect the specific limits being reached, but that's more complicated to implement and could result in much longer log messages. --- core/commands/swarm.go | 2 +- core/node/libp2p/rcmgr.go | 15 ++- core/node/libp2p/rcmgr_logging.go | 160 +++++++++++++++++++++++++ core/node/libp2p/rcmgr_logging_test.go | 58 +++++++++ go.mod | 7 +- 5 files changed, 236 insertions(+), 6 deletions(-) create mode 100644 core/node/libp2p/rcmgr_logging.go create mode 100644 core/node/libp2p/rcmgr_logging_test.go diff --git a/core/commands/swarm.go b/core/commands/swarm.go index d6a3e8d696d1..f904ed67e198 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel return errors.New("expected a JSON file") } if err := json.NewDecoder(file).Decode(&newLimit); err != nil { - return errors.New("failed to decode JSON as ResourceMgrScopeConfig") + return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err) } return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit) } diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index 28d05a131b43..4d4b29a564db 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -7,9 +7,11 @@ import ( "path/filepath" "strings" + "github.com/benbjohnson/clock" config "github.com/ipfs/go-ipfs/config" + "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/ipfs/go-ipfs/repo" - + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz" var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled") -func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { - return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { +func ResourceManager(cfg config.SwarmConfig) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var manager network.ResourceManager var opts Libp2pOpts @@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw if err != nil { return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err) } + lrm := &loggingResourceManager{ + clock: clock.New(), + logger: &logging.Logger("resourcemanager").SugaredLogger, + delegate: manager, + } + lrm.start(helpers.LifecycleCtx(mctx, lc)) + manager = lrm } else { log.Debug("libp2p resource manager is disabled") manager = network.NullResourceManager diff --git a/core/node/libp2p/rcmgr_logging.go b/core/node/libp2p/rcmgr_logging.go new file mode 100644 index 000000000000..06d22c71b8cb --- /dev/null +++ b/core/node/libp2p/rcmgr_logging.go @@ -0,0 +1,160 @@ +package libp2p + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "go.uber.org/zap" +) + +type loggingResourceManager struct { + clock clock.Clock + logger *zap.SugaredLogger + delegate network.ResourceManager + logInterval time.Duration + + mut sync.Mutex + limitExceededErrs uint64 +} + +type loggingScope struct { + logger *zap.SugaredLogger + delegate network.ResourceScope + countErrs func(error) +} + +var _ network.ResourceManager = (*loggingResourceManager)(nil) + +func (n *loggingResourceManager) start(ctx context.Context) { + logInterval := n.logInterval + if logInterval == 0 { + logInterval = 10 * time.Second + } + ticker := n.clock.Ticker(logInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + n.mut.Lock() + errs := n.limitExceededErrs + n.limitExceededErrs = 0 + n.mut.Unlock() + if errs != 0 { + n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (n *loggingResourceManager) countErrs(err error) { + if errors.Is(err, network.ErrResourceLimitExceeded) { + n.mut.Lock() + n.limitExceededErrs++ + n.mut.Unlock() + } +} + +func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error { + return n.delegate.ViewSystem(f) +} +func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error { + return n.delegate.ViewTransient(func(s network.ResourceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error { + return n.delegate.ViewService(svc, func(s network.ServiceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error { + return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error { + return n.delegate.ViewPeer(p, func(s network.PeerScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) { + connMgmtScope, err := n.delegate.OpenConnection(dir, usefd) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) { + connMgmtScope, err := n.delegate.OpenStream(p, dir) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) Close() error { + return n.delegate.Close() +} + +func (s *loggingScope) ReserveMemory(size int, prio uint8) error { + err := s.delegate.ReserveMemory(size, prio) + s.countErrs(err) + return err +} +func (s *loggingScope) ReleaseMemory(size int) { + s.delegate.ReleaseMemory(size) +} +func (s *loggingScope) Stat() network.ScopeStat { + return s.delegate.Stat() +} +func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) { + return s.delegate.BeginSpan() +} +func (s *loggingScope) Done() { + s.delegate.(network.ResourceScopeSpan).Done() +} +func (s *loggingScope) Name() string { + return s.delegate.(network.ServiceScope).Name() +} +func (s *loggingScope) Protocol() protocol.ID { + return s.delegate.(network.ProtocolScope).Protocol() +} +func (s *loggingScope) Peer() peer.ID { + return s.delegate.(network.PeerScope).Peer() +} +func (s *loggingScope) PeerScope() network.PeerScope { + return s.delegate.(network.PeerScope) +} +func (s *loggingScope) SetPeer(p peer.ID) error { + err := s.delegate.(network.ConnManagementScope).SetPeer(p) + s.countErrs(err) + return err +} +func (s *loggingScope) ProtocolScope() network.ProtocolScope { + return s.delegate.(network.ProtocolScope) +} +func (s *loggingScope) SetProtocol(proto protocol.ID) error { + err := s.delegate.(network.StreamManagementScope).SetProtocol(proto) + s.countErrs(err) + return err +} +func (s *loggingScope) ServiceScope() network.ServiceScope { + return s.delegate.(network.ServiceScope) +} +func (s *loggingScope) SetService(srv string) error { + err := s.delegate.(network.StreamManagementScope).SetService(srv) + s.countErrs(err) + return err +} +func (s *loggingScope) Limit() rcmgr.Limit { + return s.delegate.(rcmgr.ResourceScopeLimiter).Limit() +} +func (s *loggingScope) SetLimit(limit rcmgr.Limit) { + s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit) +} diff --git a/core/node/libp2p/rcmgr_logging_test.go b/core/node/libp2p/rcmgr_logging_test.go new file mode 100644 index 000000000000..72f34b80885c --- /dev/null +++ b/core/node/libp2p/rcmgr_logging_test.go @@ -0,0 +1,58 @@ +package libp2p + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestLoggingResourceManager(t *testing.T) { + clock := clock.NewMock() + limiter := rcmgr.NewDefaultLimiter() + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1) + rm, err := rcmgr.NewResourceManager(limiter) + if err != nil { + t.Fatal(err) + } + + oCore, oLogs := observer.New(zap.WarnLevel) + oLogger := zap.New(oCore) + lrm := &loggingResourceManager{ + clock: clock, + logger: oLogger.Sugar(), + delegate: rm, + logInterval: 1 * time.Second, + } + + // 2 of these should result in resource limit exceeded errors and subsequent log messages + for i := 0; i < 3; i++ { + _, _ = lrm.OpenConnection(network.DirInbound, false) + } + + // run the logger which will write an entry for those errors + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lrm.start(ctx) + clock.Add(3 * time.Second) + + timer := time.NewTimer(1 * time.Second) + for { + select { + case <-timer.C: + t.Fatalf("expected logs never arrived") + default: + if oLogs.Len() == 0 { + continue + } + require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message) + return + } + } +} diff --git a/go.mod b/go.mod index c2b77d793fe2..b7946841b272 100644 --- a/go.mod +++ b/go.mod @@ -125,13 +125,17 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad ) +require ( + github.com/benbjohnson/clock v1.3.0 + github.com/ipfs/go-log/v2 v2.5.1 +) + require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/Kubuxu/go-os-helper v0.0.1 // indirect github.com/Stebalien/go-bitfield v0.0.1 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect - github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect @@ -172,7 +176,6 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-peertaskqueue v0.7.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/compress v1.15.1 // indirect