Skip to content

Commit

Permalink
Data columns verifications: Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Oct 21, 2024
1 parent fe06732 commit aebc3a0
Show file tree
Hide file tree
Showing 19 changed files with 1,303 additions and 776 deletions.
53 changes: 36 additions & 17 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,34 +405,53 @@ func DataColumnSidecarsForReconstruct(
return sidecars, nil
}

// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns.
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) {
// Retrieve the number of columns.
numberOfColumns := params.BeaconConfig().NumberOfColumns

if sc.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.DataColumn)
}

if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
return false, errMismatchLength
}

count := len(sc.DataColumn)

commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)

for i := range sc.DataColumn {
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
indices = append(indices, sc.ColumnIndex)
cells = append(cells, kzg.Cell(sc.DataColumn[i]))
proofs = append(proofs, kzg.Bytes48(sc.KzgProof[i]))
for _, sidecar := range sidecars {
// Check if the columns index is not too large
if sidecar.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
}

// Check if the KZG commitments size and data column size match.
if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) {
return false, errMismatchLength
}

// Check if the KZG proofs size and data column size match.
if len(sidecar.DataColumn) != len(sidecar.KzgProof) {
return false, errMismatchLength
}

for i := range sidecar.DataColumn {
commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i]))
indices = append(indices, sidecar.ColumnIndex)
cells = append(cells, kzg.Cell(sidecar.DataColumn[i]))
proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i]))
}
}

// Verify all the batch at once.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return false, errors.Wrap(err, "verify cell KZG proof batch")
}

return kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
return verified, nil
}

// CustodySubnetCount returns the number of subnets the node should participate in for custody.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
for i, sidecar := range sCars {
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol})
require.NoError(t, err)
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
Expand Down
14 changes: 8 additions & 6 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type dataColumnSampler1D struct {
// peerFromColumn maps a column to the peer responsible for custody.
peerFromColumn map[uint64]map[peer.ID]bool
// columnVerifier verifies a column according to the specified requirements.
columnVerifier verification.NewColumnVerifier
columnVerifier verification.NewDataColumnsVerifier
}

// newDataColumnSampler1D creates a new 1D data column sampler.
Expand All @@ -69,7 +69,7 @@ func newDataColumnSampler1D(
clock *startup.Clock,
ctxMap ContextByteVersions,
stateNotifier statefeed.Notifier,
colVerifier verification.NewColumnVerifier,
colVerifier verification.NewDataColumnsVerifier,
) *dataColumnSampler1D {
numColumns := params.BeaconConfig().NumberOfColumns
peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns)
Expand Down Expand Up @@ -509,7 +509,7 @@ func verifyColumn(
root [32]byte,
pid peer.ID,
requestedColumns map[uint64]bool,
columnVerifier verification.NewColumnVerifier,
dataColumnsVerifier verification.NewDataColumnsVerifier,
) bool {
retrievedColumn := roDataColumn.ColumnIndex

Expand Down Expand Up @@ -538,9 +538,11 @@ func verifyColumn(
return false
}

vf := columnVerifier(roDataColumn, verification.SamplingColumnSidecarRequirements)
// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch.
verifier := dataColumnsVerifier([]blocks.RODataColumn{roDataColumn}, verification.SamplingColumnSidecarRequirements)

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := vf.SidecarInclusionProven(); err != nil {
if err := verifier.SidecarInclusionProven(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
Expand All @@ -550,7 +552,7 @@ func verifyColumn(
}

// Filter out columns which did not pass the KZG proof verification.
if err := vf.SidecarKzgProofVerified(); err != nil {
if err := verifier.SidecarKzgProofVerified(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/data_columns_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
ini, err := iniWaiter.WaitForInitializer(context.Background())
require.NoError(t, err)
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newColumnVerifierFromInitializer(ini))
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newDataColumnsVerifierFromInitializer(ini))

return test, sampler
}
Expand Down
117 changes: 66 additions & 51 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type blocksFetcherConfig struct {
mode syncMode
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
cv verification.NewDataColumnsVerifier
}

// blocksFetcher is a service to fetch chain data from peers.
Expand All @@ -100,7 +100,7 @@ type blocksFetcher struct {
db db.ReadOnlyDatabase
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
cv verification.NewDataColumnsVerifier
blocksPerPeriod uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
Expand Down Expand Up @@ -1155,67 +1155,91 @@ func (f *blocksFetcher) waitForPeersForDataColumns(
return dataColumnsByAdmissiblePeer, nil
}

// processDataColumn mutates `bwbs` argument by adding the data column,
// processDataColumns mutates `bwbs` argument by adding the data column,
// and mutates `missingColumnsByRoot` by removing the data column if the
// data column passes all the check.
func processDataColumn(
func (f *blocksFetcher) processDataColumns(
wrappedBwbsMissingColumns *bwbsMissingColumns,
columnVerifier verification.NewColumnVerifier,
blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
blockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
indicesByRoot map[[fieldparams.RootLength]byte][]int,
dataColumn blocks.RODataColumn,
dataColumns []blocks.RODataColumn,
) bool {
// Extract the block root from the data column.
blockRoot := dataColumn.BlockRoot()

// Find the position of the block in `bwbs` that corresponds to this block root.
indices, ok := indicesByRoot[blockRoot]
if !ok {
// The peer returned a data column that we did not expect.
// This is among others possible when the peer is not on the same fork.
return false
}
// Fiter out data columns:
// - that are not expected and,
// - which correspond to blocks before Deneb.

// Not expected data columns are among others possible when
// the peer is not on the same fork, due to the nature of
// data columns by range requests.
wrappedBlockDataColumns := make([]verify.WrappedBlockDataColumn, 0, len(dataColumns))
for _, dataColumn := range dataColumns {
// Extract the block root from the data column.
blockRoot := dataColumn.BlockRoot()

// Skip if the block root is not expected.
// This is possible when the peer is not on the same fork.
_, ok := indicesByRoot[blockRoot]
if !ok {
continue
}

// Extract the block from the block root.
block, ok := blocksByRoot[blockRoot]
if !ok {
// This should never happen.
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - block not found")
return false
}
// Retrieve the block from the block root.
block, ok := blockByRoot[blockRoot]
if !ok {
// This should never happen.
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - block not found for root")
return false
}

// Skip if the block is before Deneb.
if block.Version() < version.Deneb {
continue
}

// Verify the data column.
if err := verify.ColumnAlignsWithBlock(dataColumn, block, columnVerifier); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"slot": block.Block().Slot(),
"column": dataColumn.ColumnIndex,
}).Warning("Fetch data columns from peers - fetched data column does not align with block")
wrappedBlockDataColumn := verify.WrappedBlockDataColumn{
ROBlock: block,
DataColumn: dataColumn,
}

wrappedBlockDataColumns = append(wrappedBlockDataColumns, wrappedBlockDataColumn)
}

// Verify the data columns.
if err := verify.DataColumnsAlignWithBlock(wrappedBlockDataColumns, f.cv); err != nil {
// TODO: Should we downscore the peer for that?
return false
}

// Populate the corresponding items in `bwbs`.
func() {
mu := &wrappedBwbsMissingColumns.mu
wrappedBwbsMissingColumns.mu.Lock()
defer wrappedBwbsMissingColumns.mu.Unlock()

bwbs := wrappedBwbsMissingColumns.bwbs
missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot

mu.Lock()
defer mu.Unlock()
for _, wrappedBlockDataColumn := range wrappedBlockDataColumns {
dataColumn := wrappedBlockDataColumn.DataColumn

bwbs := wrappedBwbsMissingColumns.bwbs
missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot
// Extract the block root from the data column.
blockRoot := dataColumn.BlockRoot()

// Extract the indices in bwb corresponding to the block root.
indices, ok := indicesByRoot[blockRoot]
if !ok {
// This should never happen.
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - indices not found for root")
return false
}

// Populate the corresponding items in `bwbs`.
for _, index := range indices {
bwbs[index].Columns = append(bwbs[index].Columns, dataColumn)
}

// Remove the column from the missing columns.
delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex)
if len(missingColumnsByRoot[blockRoot]) == 0 {
delete(missingColumnsByRoot, blockRoot)
}
}()
}

return true
}
Expand Down Expand Up @@ -1299,17 +1323,8 @@ func (f *blocksFetcher) fetchDataColumnFromPeer(
return
}

globalSuccess := false

for _, dataColumn := range roDataColumns {
success := processDataColumn(wrappedBwbsMissingColumns, f.cv, blocksByRoot, indicesByRoot, dataColumn)
if success {
globalSuccess = true
}
}

if !globalSuccess {
log.Debug("Fetch data columns from peers - no valid data column returned")
if !f.processDataColumns(wrappedBwbsMissingColumns, blocksByRoot, indicesByRoot, roDataColumns) {
log.Warning("Fetch data columns from peers - at least one data column is invalid")
return
}

Expand Down
10 changes: 3 additions & 7 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func TestBlocksFetcher_scheduleRequest(t *testing.T) {
fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit))
})
}

func TestBlocksFetcher_handleRequest(t *testing.T) {
blockBatchLimit := flags.Get().BlockBatchLimit
chainConfig := struct {
Expand Down Expand Up @@ -1988,14 +1989,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{slot: 38, columnIndex: 6, alterate: true},
{slot: 38, columnIndex: 70},
},
},
(&ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 38,
Count: 1,
Columns: []uint64{6},
}).String(): {
{
{slot: 38, columnIndex: 6},
{slot: 38, columnIndex: 70},
},
},
},
Expand Down Expand Up @@ -2243,7 +2239,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
p2p: p2pSvc,
bs: blobStorageSummarizer,
cv: newColumnVerifierFromInitializer(ini),
cv: newDataColumnsVerifierFromInitializer(ini),
})

// Fetch the data columns from the peers.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type blocksQueueConfig struct {
mode syncMode
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
cv verification.NewDataColumnsVerifier
}

// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.S
mode: mode,
bs: summarizer,
bv: s.newBlobVerifier,
cv: s.newColumnVerifier,
cv: s.newDataColumnsVerifier,
}
queue := newBlocksQueue(ctx, cfg)
if err := queue.start(); err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *Service) processFetchedDataRegSync(
return
}
if coreTime.PeerDASIsActive(startSlot) {
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
bv := verification.NewDataColumnBatchVerifier(s.newDataColumnsVerifier, verification.InitsyncColumnSidecarRequirements)
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -367,7 +367,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
}
var aStore das.AvailabilityStore
if coreTime.PeerDASIsActive(first.Block().Slot()) {
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
bv := verification.NewDataColumnBatchVerifier(s.newDataColumnsVerifier, verification.InitsyncColumnSidecarRequirements)
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
Expand Down
Loading

0 comments on commit aebc3a0

Please sign in to comment.