Skip to content

Commit

Permalink
Merge pull request #11 from libp2p/feat/metrics
Browse files Browse the repository at this point in the history
metrics collection support
  • Loading branch information
vyzo authored Feb 14, 2022
2 parents 7c88598 + 5b7e876 commit 74c7296
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 50 deletions.
168 changes: 168 additions & 0 deletions p2p/host/resource-manager/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package rcmgr

import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

// MetricsReporter is an interface for collecting metrics from resource manager actions
type MetricsReporter interface {
// AllowConn is invoked when opening a connection is allowed
AllowConn(dir network.Direction, usefd bool)
// BlockConn is invoked when opening a connection is blocked
BlockConn(dir network.Direction, usefd bool)

// AllowStream is invoked when opening a stream is allowed
AllowStream(p peer.ID, dir network.Direction)
// BlockStream is invoked when opening a stream is blocked
BlockStream(p peer.ID, dir network.Direction)

// AllowPeer is invoked when attaching ac onnection to a peer is allowed
AllowPeer(p peer.ID)
// BlockPeer is invoked when attaching ac onnection to a peer is blocked
BlockPeer(p peer.ID)

// AllowProtocol is invoked when setting the protocol for a stream is allowed
AllowProtocol(proto protocol.ID)
// BlockProtocol is invoked when setting the protocol for a stream is blocked
BlockProtocol(proto protocol.ID)
// BlockedProtocolPeer is invoekd when setting the protocol for a stream is blocked at the per protocol peer scope
BlockProtocolPeer(proto protocol.ID, p peer.ID)

// AllowPService is invoked when setting the protocol for a stream is allowed
AllowService(svc string)
// BlockPService is invoked when setting the protocol for a stream is blocked
BlockService(svc string)
// BlockedServicePeer is invoked when setting the service for a stream is blocked at the per service peer scope
BlockServicePeer(svc string, p peer.ID)

// AllowMemory is invoked when a memory reservation is allowed
AllowMemory(size int)
// BlockMemory is invoked when a memory reservation is blocked
BlockMemory(size int)
}

type metrics struct {
reporter MetricsReporter
}

// WithMetrics is a resource manager option to enable metrics collection
func WithMetrics(reporter MetricsReporter) Option {
return func(r *resourceManager) error {
r.metrics = &metrics{reporter: reporter}
return nil
}
}

func (m *metrics) AllowConn(dir network.Direction, usefd bool) {
if m == nil {
return
}

m.reporter.AllowConn(dir, usefd)
}

func (m *metrics) BlockConn(dir network.Direction, usefd bool) {
if m == nil {
return
}

m.reporter.BlockConn(dir, usefd)
}

func (m *metrics) AllowStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}

m.reporter.AllowStream(p, dir)
}

func (m *metrics) BlockStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}

m.reporter.BlockStream(p, dir)
}

func (m *metrics) AllowPeer(p peer.ID) {
if m == nil {
return
}

m.reporter.AllowPeer(p)
}

func (m *metrics) BlockPeer(p peer.ID) {
if m == nil {
return
}

m.reporter.BlockPeer(p)
}

func (m *metrics) AllowProtocol(proto protocol.ID) {
if m == nil {
return
}

m.reporter.AllowProtocol(proto)
}

func (m *metrics) BlockProtocol(proto protocol.ID) {
if m == nil {
return
}

m.reporter.BlockProtocol(proto)
}

func (m *metrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
if m == nil {
return
}

m.reporter.BlockProtocolPeer(proto, p)
}

func (m *metrics) AllowService(svc string) {
if m == nil {
return
}

m.reporter.AllowService(svc)
}

func (m *metrics) BlockService(svc string) {
if m == nil {
return
}

m.reporter.BlockService(svc)
}

func (m *metrics) BlockServicePeer(svc string, p peer.ID) {
if m == nil {
return
}

m.reporter.BlockServicePeer(svc, p)
}

func (m *metrics) AllowMemory(size int) {
if m == nil {
return
}

m.reporter.AllowMemory(size)
}

func (m *metrics) BlockMemory(size int) {
if m == nil {
return
}

m.reporter.BlockMemory(size)
}
71 changes: 48 additions & 23 deletions p2p/host/resource-manager/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var log = logging.Logger("rcmgr")
type resourceManager struct {
limits Limiter

trace *trace
trace *trace
metrics *metrics

system *systemScope
transient *transientScope
Expand Down Expand Up @@ -259,9 +260,11 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net

if err := conn.AddConn(dir, usefd); err != nil {
conn.Done()
r.metrics.BlockConn(dir, usefd)
return nil, err
}

r.metrics.AllowConn(dir, usefd)
return conn, nil
}

Expand All @@ -273,9 +276,11 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.
err := stream.AddStream(dir)
if err != nil {
stream.Done()
r.metrics.BlockStream(p, dir)
return nil, err
}

r.metrics.AllowStream(p, dir)
return stream, nil
}

Expand Down Expand Up @@ -360,56 +365,68 @@ func (r *resourceManager) gc() {

func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace),
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace, rcmgr.metrics),
}
}

func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace),
system: rcmgr.system,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
"transient", rcmgr.trace, rcmgr.metrics),
system: rcmgr.system,
}
}

func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace),
name: name,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("service:%s", name), rcmgr.trace, rcmgr.metrics),
name: name,
rcmgr: rcmgr,
}
}

func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace),
proto: proto,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics),
proto: proto,
rcmgr: rcmgr,
}
}

func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace),
peer: p,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("peer:%s", p), rcmgr.trace, rcmgr.metrics),
peer: p,
rcmgr: rcmgr,
}
}

func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
}
}

func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
resourceScope: newResourceScope(limit,
[]*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
}
}

Expand All @@ -433,7 +450,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}

ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps

ps.IncRef()
Expand All @@ -460,7 +477,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}

ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps

ps.IncRef()
Expand Down Expand Up @@ -497,6 +514,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error {
if err := s.peer.ReserveForChild(stat); err != nil {
s.peer.DecRef()
s.peer = nil
s.rcmgr.metrics.BlockPeer(p)
return err
}

Expand All @@ -510,6 +528,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error {
}
s.resourceScope.edges = edges

s.rcmgr.metrics.AllowPeer(p)
return nil
}

Expand Down Expand Up @@ -540,6 +559,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
if err := s.proto.ReserveForChild(stat); err != nil {
s.proto.DecRef()
s.proto = nil
s.rcmgr.metrics.BlockProtocol(proto)
return err
}

Expand All @@ -550,6 +570,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
s.proto = nil
s.peerProtoScope.DecRef()
s.peerProtoScope = nil
s.rcmgr.metrics.BlockProtocolPeer(proto, s.peer.peer)
return err
}

Expand All @@ -565,6 +586,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
}
s.resourceScope.edges = edges

s.rcmgr.metrics.AllowProtocol(proto)
return nil
}

Expand Down Expand Up @@ -598,6 +620,7 @@ func (s *streamScope) SetService(svc string) error {
if err := s.svc.ReserveForChild(stat); err != nil {
s.svc.DecRef()
s.svc = nil
s.rcmgr.metrics.BlockService(svc)
return err
}

Expand All @@ -609,6 +632,7 @@ func (s *streamScope) SetService(svc string) error {
s.svc = nil
s.peerSvcScope.DecRef()
s.peerSvcScope = nil
s.rcmgr.metrics.BlockServicePeer(svc, s.peer.peer)
return err
}

Expand All @@ -623,6 +647,7 @@ func (s *streamScope) SetService(svc string) error {
}
s.resourceScope.edges = edges

s.rcmgr.metrics.AllowService(svc)
return nil
}

Expand Down
Loading

0 comments on commit 74c7296

Please sign in to comment.