Skip to content

Commit

Permalink
Caplin: Avoid republishing in GossipManager (#13314)
Browse files Browse the repository at this point in the history
Basically libp2p handles that automatically and I did not know
  • Loading branch information
Giulio2002 authored Jan 8, 2025
1 parent 05d72f0 commit d81cd6e
Show file tree
Hide file tree
Showing 30 changed files with 249 additions and 257 deletions.
100 changes: 64 additions & 36 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,8 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
attestationWithGossipData := &services.AttestationWithGossipData{
Attestation: attestation,
GossipData: &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixBeaconAttestation,
SubnetId: &subnet,
},
attestationWithGossipData := &services.AttestationForGossip{
Attestation: attestation,
ImmediateProcess: true, // we want to process attestation immediately
}

Expand All @@ -144,6 +139,16 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
})
continue
}
if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixBeaconAttestation,
SubnetId: &subnet,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
if len(failures) > 0 {
errResp := poolingError{
Expand Down Expand Up @@ -173,19 +178,23 @@ func (a *ApiHandler) PostEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r
return
}

if err := a.voluntaryExitService.ProcessMessage(r.Context(), nil, &cltypes.SignedVoluntaryExitWithGossipData{
GossipData: &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameVoluntaryExit,
},
if err := a.voluntaryExitService.ProcessMessage(r.Context(), nil, &services.SignedVoluntaryExitForGossip{
SignedVoluntaryExit: &req,
ImmediateVerification: true,
}); err != nil && !errors.Is(err, services.ErrIgnore) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
a.operationsPool.VoluntaryExitsPool.Insert(req.VoluntaryExit.ValidatorIndex, &req)

if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameVoluntaryExit,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// Only write 200
w.WriteHeader(http.StatusOK)
}
Expand Down Expand Up @@ -275,16 +284,21 @@ func (a *ApiHandler) PostEthV1BeaconPoolBlsToExecutionChanges(w http.ResponseWri
return
}

if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, &cltypes.SignedBLSToExecutionChangeWithGossipData{
if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, &services.SignedBLSToExecutionChangeForGossip{
SignedBLSToExecutionChange: v,
GossipData: &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBlsToExecutionChange,
},
}); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}
if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBlsToExecutionChange,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

if len(failures) > 0 {
Expand Down Expand Up @@ -312,21 +326,26 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
log.Warn("[Beacon REST] failed to encode aggregate and proof", "err", err)
return
}
gossipData := &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAggregateAndProof,
}

// for this service we are not publishing gossipData as the service does it internally, we just pass that data as a parameter.
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, &cltypes.SignedAggregateAndProofData{
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, &services.SignedAggregateAndProofForGossip{
SignedAggregateAndProof: v,
GossipData: gossipData,
ImmediateProcess: true, // we want to process aggregate and proof immediately
}); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process bls-change", "err", err)
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}

if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAggregateAndProof,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
}

Expand Down Expand Up @@ -356,7 +375,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r

for _, subnet := range publishingSubnets {

var syncCommitteeMessageWithGossipData cltypes.SyncCommitteeMessageWithGossipData
var syncCommitteeMessageWithGossipData services.SyncCommitteeMessageForGossip
syncCommitteeMessageWithGossipData.SyncCommitteeMessage = v
syncCommitteeMessageWithGossipData.ImmediateVerification = true

Expand All @@ -367,17 +386,22 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
}

subnetId := subnet
syncCommitteeMessageWithGossipData.GossipData = &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixSyncCommittee,
SubnetId: &subnetId,
}

if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, &syncCommitteeMessageWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation in syncCommittee service", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
break
}
if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixSyncCommittee,
SubnetId: &subnetId,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
}
if len(failures) > 0 {
Expand All @@ -403,7 +427,7 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
continue // skip empty contributions
}

var signedContributionAndProofWithGossipData cltypes.SignedContributionAndProofWithGossipData
var signedContributionAndProofWithGossipData services.SignedContributionAndProofForGossip
signedContributionAndProofWithGossipData.SignedContributionAndProof = v
signedContributionAndProofWithGossipData.ImmediateVerification = true

Expand All @@ -414,16 +438,20 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
return
}

signedContributionAndProofWithGossipData.GossipData = &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}

if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, &signedContributionAndProofWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process sync contribution", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
if a.sentinel != nil {
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

if len(failures) > 0 {
Expand Down
11 changes: 6 additions & 5 deletions cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/erigontech/erigon/cl/persistence/state/historical_states_reader"
"github.com/erigontech/erigon/cl/phase1/core/state"
mock_services2 "github.com/erigontech/erigon/cl/phase1/forkchoice/mock_services"
"github.com/erigontech/erigon/cl/phase1/network/services"
"github.com/erigontech/erigon/cl/phase1/network/services/mock_services"
"github.com/erigontech/erigon/cl/pool"
"github.com/erigontech/erigon/cl/utils/eth_clock"
Expand Down Expand Up @@ -119,22 +120,22 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
mockValidatorMonitor := mockMonitor.NewMockValidatorMonitor(ctrl)

// ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error
syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error {
syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SyncCommitteeMessageForGossip) error {
return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg.SyncCommitteeMessage)
}).AnyTimes()

syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProofWithGossipData) error {
syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedContributionAndProofForGossip) error {
return h.syncMessagePool.AddSyncContribution(postState, msg.SignedContributionAndProof.Message.Contribution)
}).AnyTimes()
aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedAggregateAndProofData) error {
aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedAggregateAndProofForGossip) error {
opPool.AttestationsPool.Insert(msg.SignedAggregateAndProof.Message.Aggregate.Signature, msg.SignedAggregateAndProof.Message.Aggregate)
return nil
}).AnyTimes()
voluntaryExitService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedVoluntaryExitWithGossipData) error {
voluntaryExitService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedVoluntaryExitForGossip) error {
opPool.VoluntaryExitsPool.Insert(msg.SignedVoluntaryExit.VoluntaryExit.ValidatorIndex, msg.SignedVoluntaryExit)
return nil
}).AnyTimes()
blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error {
blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedBLSToExecutionChangeForGossip) error {
opPool.BLSToExecutionChangesPool.Insert(msg.SignedBLSToExecutionChange.Signature, msg.SignedBLSToExecutionChange)
return nil
}).AnyTimes()
Expand Down
12 changes: 0 additions & 12 deletions cl/cltypes/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cltypes

import (
libcommon "github.com/erigontech/erigon-lib/common"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/merkle_tree"
ssz2 "github.com/erigontech/erigon/cl/ssz"
Expand Down Expand Up @@ -55,17 +54,6 @@ func (a *AggregateAndProof) HashSSZ() ([32]byte, error) {
return merkle_tree.HashTreeRoot(a.AggregatorIndex, a.Aggregate, a.SelectionProof[:])
}

// SignedAggregateAndProofData is passed to SignedAggregateAndProof service. The service does the signature verification
// asynchronously. That's why we cannot wait for its ProcessMessage call to finish to check error. The service
// will do re-publishing of the gossip or banning the peer in case of invalid signature by itself.
// that's why we are passing sentinel.SentinelClient and *sentinel.GossipData to enable the service
// to do all of that by itself.
type SignedAggregateAndProofData struct {
SignedAggregateAndProof *SignedAggregateAndProof
GossipData *sentinel.GossipData
ImmediateProcess bool
}

type SignedAggregateAndProof struct {
Message *AggregateAndProof `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down
8 changes: 0 additions & 8 deletions cl/cltypes/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

libcommon "github.com/erigontech/erigon-lib/common"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/types/ssz"
"github.com/erigontech/erigon/cl/merkle_tree"
ssz2 "github.com/erigontech/erigon/cl/ssz"
Expand Down Expand Up @@ -59,13 +58,6 @@ func (*BLSToExecutionChange) Static() bool {
return true
}

// SignedBLSToExecutionChangeWithGossipData type represents SignedBLSToExecutionChange with the gossip data where it's coming from.
type SignedBLSToExecutionChangeWithGossipData struct {
SignedBLSToExecutionChange *SignedBLSToExecutionChange
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SignedBLSToExecutionChange struct {
Message *BLSToExecutionChange `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down
14 changes: 0 additions & 14 deletions cl/cltypes/contribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/common/length"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/types/clonable"
"github.com/erigontech/erigon/cl/merkle_tree"
ssz2 "github.com/erigontech/erigon/cl/ssz"
Expand Down Expand Up @@ -60,13 +59,6 @@ func (a *ContributionAndProof) HashSSZ() ([32]byte, error) {
return merkle_tree.HashTreeRoot(a.AggregatorIndex, a.Contribution, a.SelectionProof[:])
}

// SignedContributionAndProofWithGossipData type represents SignedContributionAndProof with the gossip data where it's coming from.
type SignedContributionAndProofWithGossipData struct {
SignedContributionAndProof *SignedContributionAndProof
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SignedContributionAndProof struct {
Message *ContributionAndProof `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down Expand Up @@ -187,12 +179,6 @@ func (agg *SyncContribution) HashSSZ() ([32]byte, error) {

}

type SyncCommitteeMessageWithGossipData struct {
SyncCommitteeMessage *SyncCommitteeMessage
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SyncCommitteeMessage struct {
Slot uint64 `json:"slot,string"`
BeaconBlockRoot libcommon.Hash `json:"beacon_block_root"`
Expand Down
8 changes: 0 additions & 8 deletions cl/cltypes/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"

libcommon "github.com/erigontech/erigon-lib/common"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/types/clonable"
"github.com/erigontech/erigon-lib/types/ssz"

Expand Down Expand Up @@ -129,13 +128,6 @@ func (*VoluntaryExit) EncodingSizeSSZ() int {
return 16
}

// SignedVoluntaryExitWithGossipData type represents SignedVoluntaryExit with the gossip data where it's coming from.
type SignedVoluntaryExitWithGossipData struct {
SignedVoluntaryExit *SignedVoluntaryExit
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SignedVoluntaryExit struct {
VoluntaryExit *VoluntaryExit `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down
Loading

0 comments on commit d81cd6e

Please sign in to comment.