Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identify: remove support for Identify Delta #1975

Merged
merged 4 commits into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} {
for _, id := range [...]protocol.ID{identify.ID, identify.IDPush} {
config.AddProtocolLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
Expand Down
2 changes: 0 additions & 2 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func TestHostProtoPreference(t *testing.T) {

// Prevent pushing identify information so this test works.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2.SetStreamHandler(protoOld, handler)

Expand Down Expand Up @@ -362,7 +361,6 @@ func TestHostProtoPreknowledge(t *testing.T) {
h2.SetStreamHandler("/super", handler)
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
Expand Down
73 changes: 51 additions & 22 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"

"github.com/libp2p/go-libp2p/p2p/host/eventbus"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"

Expand Down Expand Up @@ -115,7 +115,7 @@ type idService struct {
addPeerHandlerCh chan addPeerHandlerReq
rmPeerHandlerCh chan rmPeerHandlerReq

// pushSemaphore limits the push/delta concurrency to avoid storms
// pushSemaphore limits the push concurrency to avoid storms
// that clog the transient scope.
pushSemaphore chan struct{}
}
Expand Down Expand Up @@ -154,9 +154,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

// handle local protocol handler updates, and push deltas to peers.
var err error

observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
Expand All @@ -180,7 +177,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}

// register protocols that do not depend on peer records.
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.SetStreamHandler(ID, s.sendIdentifyResp)
h.SetStreamHandler(IDPush, s.pushHandler)

Expand Down Expand Up @@ -269,20 +265,18 @@ func (ids *idService) loop() {
select {
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping addr updated message for %s as buffer full", pid)
}
}

case event.EvtLocalProtocolsUpdated:
for pid := range phs {
select {
case phs[pid].deltaCh <- struct{}{}:
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping protocol updated message for %s as buffer full", pid)
}
}
}

case <-ids.ctx.Done():
return
}
Expand Down Expand Up @@ -372,7 +366,7 @@ func (ids *idService) identifyConn(c network.Conn) error {
return err
}

return ids.handleIdentifyResponse(s)
return ids.handleIdentifyResponse(s, false)
}

func (ids *idService) sendIdentifyResp(s network.Stream) {
Expand Down Expand Up @@ -406,14 +400,11 @@ func (ids *idService) sendIdentifyResp(s network.Stream) {
return
}

ph.snapshotMu.RLock()
snapshot := ph.snapshot
ph.snapshotMu.RUnlock()
ids.writeChunkedIdentifyMsg(c, snapshot, s)
ids.writeChunkedIdentifyMsg(c, s)
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
}

func (ids *idService) handleIdentifyResponse(s network.Stream) error {
func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) error {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
Expand Down Expand Up @@ -444,7 +435,7 @@ func (ids *idService) handleIdentifyResponse(s network.Stream) error {

log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())

ids.consumeMessage(mes, c)
ids.consumeMessage(mes, c, isPush)

return nil
}
Expand Down Expand Up @@ -477,7 +468,8 @@ func (ids *idService) getSnapshot() *identifySnapshot {
return snapshot
}

func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error {
func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, s network.Stream) error {
snapshot := ids.getSnapshot()
mes := ids.createBaseIdentifyResponse(c, snapshot)
sr := ids.getSignedRecord(snapshot)
mes.SignedPeerRecord = sr
Expand Down Expand Up @@ -566,11 +558,49 @@ func (ids *idService) getSignedRecord(snapshot *identifySnapshot) []byte {
return recBytes
}

func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) {
// diff takes two slices of strings (a and b) and computes which elements were added and removed in b
func diff(a, b []string) (added, removed []string) {
// This is O(n^2), but it's fine because the slices are small.
for _, x := range b {
var found bool
for _, y := range a {
if x == y {
found = true
break
}
}
if !found {
added = append(added, x)
}
}
for _, x := range a {
var found bool
for _, y := range b {
if x == y {
found = true
break
}
}
if !found {
removed = append(removed, x)
}
}
return
}

func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bool) {
p := c.RemotePeer()

// mes.Protocols
supported, _ := ids.Host.Peerstore().GetProtocols(p)
added, removed := diff(supported, mes.Protocols)
ids.Host.Peerstore().SetProtocols(p, mes.Protocols...)
if isPush {
ids.emitters.evtPeerProtocolsUpdated.Emit(event.EvtPeerProtocolsUpdated{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. We should have been doing this all along...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. A lot of libp2p users rely on this event, and Push is much more common than Delta.
My Kubo node receives 10x as many Push messages (see Grafana screenshot in #1974). I wonder how many subtle failures were caused by this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out we even had an issue for this: #977.

Peer: p,
Added: protocol.ConvertFromStrings(added),
Removed: protocol.ConvertFromStrings(removed),
})
}

// mes.ObservedAddr
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
Expand Down Expand Up @@ -598,7 +628,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) {

// add certified addresses for the peer, if they sent us a signed peer record
// otherwise use the unsigned addresses.
var signedPeerRecord *record.Envelope
signedPeerRecord, err := signedPeerRecordFromMessage(mes)
if err != nil {
log.Errorf("error getting peer record from Identify message: %v", err)
Expand Down
82 changes: 0 additions & 82 deletions p2p/protocol/identify/id_delta.go

This file was deleted.

9 changes: 3 additions & 6 deletions p2p/protocol/identify/id_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"github.com/libp2p/go-libp2p/core/network"
)

// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing
// the current state of the peer.
//
// It is in the process of being replaced by identify delta, which sends only diffs for better
// resource utilisation.
// IDPush is the protocol.ID of the Identify push protocol.
// It sends full identify messages containing the current state of the peer.
const IDPush = "/ipfs/id/push/1.0.0"

// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.
func (ids *idService) pushHandler(s network.Stream) {
ids.handleIdentifyResponse(s)
ids.handleIdentifyResponse(s, true)
}
Loading