Skip to content

Commit

Permalink
Add Data Column Gossip Handlers (#13894)
Browse files Browse the repository at this point in the history
* Add Data Column Subscriber

* Add Data Column Vaidator

* Wire all Handlers In

* Fix Build

* Fix Test

* Fix IP in Test

* Fix IP in Test
  • Loading branch information
nisdas authored and nalepae committed Apr 24, 2024
1 parent e498183 commit 92de4a1
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 11 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",
"receive_sidecar.go",
"service.go",
"tracked_proposer.go",
"weak_subjectivity_checks.go",
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type BlobReceiver interface {
ReceiveBlob(context.Context, blocks.VerifiedROBlob) error
}

// DataColumnReceiver interface defines the methods of chain service for receiving new
// data columns
type DataColumnReceiver interface {
ReceiveDataColumn(context.Context, *ethpb.DataColumnSidecar) error
}

// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
type SlashingReceiver interface {
ReceiveAttesterSlashing(ctx context.Context, slashings *ethpb.AttesterSlashing)
Expand Down
12 changes: 12 additions & 0 deletions beacon-chain/blockchain/receive_sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package blockchain

import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error {
// TODO
return nil
}
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e
return nil
}

// ReceiveDataColumn implements the same method in chain service
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ *ethpb.DataColumnSidecar) error {
return nil
}

// TargetRootForEpoch mocks the same method in the chain service
func (c *ChainService) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) {
return c.TargetRoot, nil
Expand Down
18 changes: 18 additions & 0 deletions beacon-chain/core/blocks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ func VerifyBlockHeaderSignature(beaconState state.BeaconState, header *ethpb.Sig
return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain)
}

func VerifyBlockHeaderSignatureUsingCurrentFork(beaconState state.BeaconState, header *ethpb.SignedBeaconBlockHeader) error {
currentEpoch := slots.ToEpoch(header.Header.Slot)
fork, err := forks.Fork(currentEpoch)
if err != nil {
return err
}
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(header.Header.ProposerIndex)
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain)
}

// VerifyBlockSignatureUsingCurrentFork verifies the proposer signature of a beacon block. This differs
// from the above method by not using fork data from the state and instead retrieving it
// via the respective epoch.
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (

// AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc
AttesterSlashingReceived = 8

// DataColumnSidecarReceived is sent after a data column sidecar is received from gossip or rpc.
DataColumnSidecarReceived = 9
)

// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
Expand Down Expand Up @@ -77,3 +80,7 @@ type ProposerSlashingReceivedData struct {
type AttesterSlashingReceivedData struct {
AttesterSlashing *ethpb.AttesterSlashing
}

type DataColumnSidecarReceivedData struct {
DataColumn *ethpb.DataColumnSidecar
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var gossipTopicMappings = map[string]proto.Message{
SyncCommitteeSubnetTopicFormat: &ethpb.SyncCommitteeMessage{},
BlsToExecutionChangeSubnetTopicFormat: &ethpb.SignedBLSToExecutionChange{},
BlobSubnetTopicFormat: &ethpb.BlobSidecar{},
DataColumnSubnetTopicFormat: &ethpb.DataColumnSidecar{},
}

// GossipTopicMappings is a function to return the assigned data type
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/pubsub_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) {
formatting := []interface{}{digest}

// Special case for attestation subnets which have a second formatting placeholder.
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat {
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat || topic == DataColumnSubnetTopicFormat {
formatting = append(formatting, 0 /* some subnet ID */)
}

Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
GossipBlsToExecutionChangeMessage = "bls_to_execution_change"
// GossipBlobSidecarMessage is the name for the blob sidecar message type.
GossipBlobSidecarMessage = "blob_sidecar"
// GossipDataColumnSidecarMessage is the name for the data column sidecar message type.
GossipDataColumnSidecarMessage = "data_column_sidecar"

// Topic Formats
//
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
Expand All @@ -52,4 +55,6 @@ const (
BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage
// BlobSubnetTopicFormat is the topic format for the blob subnet.
BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d"
// DataColumnSubnetTopicFormat is the topic format for the data column subnet.
DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d"
)
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"subscriber_beacon_blocks.go",
"subscriber_blob_sidecar.go",
"subscriber_bls_to_execution_change.go",
"subscriber_data_column_sidecar.go",
"subscriber_handlers.go",
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
Expand All @@ -47,6 +48,7 @@ go_library(
"validate_beacon_blocks.go",
"validate_blob.go",
"validate_bls_to_execution_change.go",
"validate_data_column_sidecar.go",
"validate_proposer_slashing.go",
"validate_sync_committee_message.go",
"validate_sync_contribution_proof.go",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.BlobSidecar{})]
case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.DataColumnSidecar{})]
}

base := p2p.GossipTopicMappings(topic, 0)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type config struct {
type blockchainService interface {
blockchain.BlockReceiver
blockchain.BlobReceiver
blockchain.DataColumnReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ForkFetcher
Expand Down
125 changes: 115 additions & 10 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,32 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {

// New Gossip Topic in Deneb
if epoch >= params.BeaconConfig().DenebForkEpoch {
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
if features.Get().EnablePeerDAS {
// TODO: Subscribe to persistent column subnets here
if features.Get().EnablePeerDAS {
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
params.BeaconConfig().DataColumnSidecarSubnetCount,
)
} else {
s.subscribeDynamicWithColumnSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
)
}
} else {
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
}
}

Expand Down Expand Up @@ -649,6 +665,87 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
}()
}

// subscribe missing subnets for our persistent columns.
func (s *Service) subscribeColumnSubnet(
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
digest [4]byte,
validate wrappedVal,
handle subHandler,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.DataColumnSidecar{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to column gossip subnet with "+
"column index %d. Searching network for peers subscribed to the subnet.", idx)
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

func (s *Service) subscribeDynamicWithColumnSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().DataColumnSidecarSubnetCount)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)

go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Column subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
ticker.Done()
return
}

wantedSubs := s.retrieveActiveColumnSubnets()
// Resize as appropriate.
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

// subscribe desired column subnets.
for _, idx := range wantedSubs {
s.subscribeColumnSubnet(subscriptions, idx, digest, validate, handle)
}
}
}
}()
}

// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
Expand Down Expand Up @@ -700,6 +797,14 @@ func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 {
return slice.SetUint64(subs)
}

func (*Service) retrieveActiveColumnSubnets() []uint64 {
subs, ok, _ := cache.ColumnSubnetIDs.GetColumnSubnets()
if !ok {
return nil
}
return subs
}

// filters out required peers for the node to function, not
// pruning peers who are in our attestation subnets.
func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
Expand Down
34 changes: 34 additions & 0 deletions beacon-chain/sync/subscriber_data_column_sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sync

import (
"context"
"fmt"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
)

func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error {
b, ok := msg.(*ethpb.DataColumnSidecar)
if !ok {
return fmt.Errorf("message was not type DataColumnSidecar, type=%T", msg)
}

// TODO:Change to new one for data columns
s.setSeenBlobIndex(b.SignedBlockHeader.Header.Slot, b.SignedBlockHeader.Header.ProposerIndex, b.ColumnIndex)

if err := s.cfg.chain.ReceiveDataColumn(ctx, b); err != nil {
return err
}

s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.DataColumnSidecarReceived,
Data: &opfeed.DataColumnSidecarReceivedData{
DataColumn: b,
},
})

return nil
}
Loading

0 comments on commit 92de4a1

Please sign in to comment.