Skip to content

Commit

Permalink
Banning peers sending bad gossip (#7134)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored and Alexey Sharp committed Mar 28, 2023
1 parent 5d0f778 commit 8ad4870
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/lightclient/lightclient/lightclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (l *LightClient) Start() {
for _, update := range updates {
err := l.processLightClientUpdate(update)
if err != nil {
log.Warn("Could not validate update", "err", err)
log.Debug("Could not validate update", "err", err)
updates = []*cltypes.LightClientUpdate{}
break
}
Expand Down
41 changes: 34 additions & 7 deletions cmd/lightclient/lightclient/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
Expand Down Expand Up @@ -46,24 +48,40 @@ func (c *ChainTipSubscriber) StartLoop() {
}
log.Info("[LightClient Gossip] Started Gossip")
c.started = true

Retry:
for {
if err := c.subscribeGossip(); err != nil {
if errors.Is(err, context.Canceled) {
return
}
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue Retry
}

log.Debug("[Lightclient] could not read gossip :/", "reason", err)
time.Sleep(time.Second)
}
}
}

func (c *ChainTipSubscriber) subscribeGossip() error {
stream, err := c.sentinel.SubscribeGossip(c.ctx, &sentinel.EmptyMessage{})
if err != nil {
log.Warn("could not start lightclient", "reason", err)
return
return err
}
defer stream.CloseSend()

for {
data, err := stream.Recv()
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Debug("[Lightclient] could not read gossip :/", "reason", err)
}
continue
return err
}
if err := c.handleGossipData(data); err != nil {
log.Warn("could not process new gossip",
log.Debug("could not process new gossip",
"gossipType", data.Type, "reason", err)

}
}
}
Expand All @@ -79,6 +97,9 @@ func (c *ChainTipSubscriber) handleGossipData(data *sentinel.GossipData) error {
case sentinel.GossipType_BeaconBlockGossipType:
block := &cltypes.SignedBeaconBlock{}
if err := block.DecodeSSZWithVersion(data.Data, int(version)); err != nil {
if _, err := c.sentinel.BanPeer(c.ctx, data.Peer); err != nil {
return err
}
return fmt.Errorf("could not unmarshall block: %s", err)
}

Expand All @@ -87,6 +108,9 @@ func (c *ChainTipSubscriber) handleGossipData(data *sentinel.GossipData) error {
case sentinel.GossipType_LightClientFinalityUpdateGossipType:
finalityUpdate := &cltypes.LightClientFinalityUpdate{}
if err := finalityUpdate.DecodeSSZWithVersion(data.Data, int(version)); err != nil {
if _, err := c.sentinel.BanPeer(c.ctx, data.Peer); err != nil {
return err
}
return fmt.Errorf("could not unmarshall finality update: %s", err)
}
c.lastUpdate = &cltypes.LightClientUpdate{
Expand All @@ -106,6 +130,9 @@ func (c *ChainTipSubscriber) handleGossipData(data *sentinel.GossipData) error {

optimisticUpdate := &cltypes.LightClientOptimisticUpdate{}
if err := optimisticUpdate.DecodeSSZWithVersion(data.Data, int(version)); err != nil {
if _, err := c.sentinel.BanPeer(c.ctx, data.Peer); err != nil {
return err
}
return fmt.Errorf("could not unmarshall optimistic update: %s", err)
}
c.lastUpdate = &cltypes.LightClientUpdate{
Expand Down
2 changes: 1 addition & 1 deletion cmd/sentinel/sentinel/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
maxBadPeers = 50
maxBadPeers = 50000
maxPeerRecordSize = 1000
DefaultMaxPeers = 33
MaxBadResponses = 50
Expand Down
4 changes: 3 additions & 1 deletion cmd/sentinel/sentinel/service/notifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
type gossipObject struct {
data []byte // gossip data
t sentinel.GossipType // determine which gossip message we are notifying of
pid string
}

type gossipNotifier struct {
Expand All @@ -28,14 +29,15 @@ func newGossipNotifier() *gossipNotifier {
}
}

func (g *gossipNotifier) notify(t sentinel.GossipType, data []byte) {
func (g *gossipNotifier) notify(t sentinel.GossipType, data []byte, pid string) {
g.mu.Lock()
defer g.mu.Unlock()

for _, ch := range g.notifiers {
ch <- gossipObject{
data: data,
t: t,
pid: pid,
}
}
}
Expand Down
34 changes: 27 additions & 7 deletions cmd/sentinel/sentinel/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication"
"github.com/ledgerwatch/log/v3"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)

type SentinelServer struct {
Expand All @@ -32,6 +33,17 @@ func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel) *Sentin
}
}

//BanPeer(context.Context, *Peer) (*EmptyMessage, error)

func (s *SentinelServer) BanPeer(_ context.Context, p *sentinelrpc.Peer) (*sentinelrpc.EmptyMessage, error) {
var pid peer.ID
if err := pid.UnmarshalText([]byte(p.Pid)); err != nil {
return nil, err
}
s.sentinel.Peers().BanBadPeer(pid)
return &sentinelrpc.EmptyMessage{}, nil
}

func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sentinelrpc.Sentinel_SubscribeGossipServer) error {
// first of all subscribe
ch, subId, err := s.gossipNotifier.addSubscriber()
Expand All @@ -49,6 +61,9 @@ func (s *SentinelServer) SubscribeGossip(_ *sentinelrpc.EmptyMessage, stream sen
if err := stream.Send(&sentinelrpc.GossipData{
Data: packet.data,
Type: packet.t,
Peer: &sentinelrpc.Peer{
Pid: packet.pid,
},
}); err != nil {
log.Warn("[Sentinel] Could not relay gossip packet", "reason", err)
}
Expand Down Expand Up @@ -131,28 +146,33 @@ func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error {
var err error
log.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic)
data := pkt.GetData()

// If we use snappy codec then decompress it accordingly.
if strings.Contains(*pkt.Topic, sentinel.SSZSnappyCodec) {
data, err = utils.DecompressSnappy(data)
if err != nil {
return err
}
}
textPid, err := pkt.ReceivedFrom.MarshalText()
if err != nil {
return err
}
// Check to which gossip it belongs to.
if strings.Contains(*pkt.Topic, string(sentinel.BeaconBlockTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_BeaconBlockGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_BeaconBlockGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.BeaconAggregateAndProofTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_AggregateAndProofGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_AggregateAndProofGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.VoluntaryExitTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_VoluntaryExitGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_VoluntaryExitGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.ProposerSlashingTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_ProposerSlashingGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_ProposerSlashingGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.AttesterSlashingTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_AttesterSlashingGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_AttesterSlashingGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.LightClientFinalityUpdateTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_LightClientFinalityUpdateGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_LightClientFinalityUpdateGossipType, data, string(textPid))
} else if strings.Contains(*pkt.Topic, string(sentinel.LightClientOptimisticUpdateTopic)) {
s.gossipNotifier.notify(sentinelrpc.GossipType_LightClientOptimisticUpdateGossipType, data)
s.gossipNotifier.notify(sentinelrpc.GossipType_LightClientOptimisticUpdateGossipType, data, string(textPid))
}
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18

require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce
github.com/ledgerwatch/erigon-lib v0.0.0-20230328191829-416af23d9dcd
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3
github.com/ledgerwatch/log/v3 v3.7.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce h1:KsoGX2RLGvqTc97Et7wAwaRNk1/RDAUbe20B9TM7/70=
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce/go.mod h1:/xzmS14QeWZFjRXDiTdeqcSW4IrrUSYkCXZRfsZ5XbI=
github.com/ledgerwatch/erigon-lib v0.0.0-20230328191829-416af23d9dcd h1:VUp9woJj6sVoZO6lIwOOm4qA5Qe0LRiOpUn84cy3ohw=
github.com/ledgerwatch/erigon-lib v0.0.0-20230328191829-416af23d9dcd/go.mod h1:nyJqfX9uPm1P/poZB1211DFe5DnAKOhYqvkEPyW7dXM=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3 h1:tfzawK1gIIgRjVZeANXOr0Ziu+kqCIBuKMe0TXfl5Aw=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.7.0 h1:aFPEZdwZx4jzA3+/Pf8wNDN5tCI0cIolq/kfvgcM+og=
Expand Down

0 comments on commit 8ad4870

Please sign in to comment.