Skip to content

Commit

Permalink
PeerDAS: Batch columns verifications (#14559)
Browse files Browse the repository at this point in the history
* `ColumnAlignsWithBlock`: Split lines.

* Data columns verifications: Batch

* Remove completely `DataColumnBatchVerifier`.

Only `DataColumnsVerifier` (with `s`) on columns remains.
It is the responsability of the function which receive the data column
(either by gossip, by range request or by root request) to verify the
data column wrt. corresponding checks.

* Fix Nishant's comment.
  • Loading branch information
nalepae committed Oct 23, 2024
1 parent f2fe91e commit 0672fb9
Show file tree
Hide file tree
Showing 24 changed files with 1,359 additions and 893 deletions.
53 changes: 36 additions & 17 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,34 +405,53 @@ func DataColumnSidecarsForReconstruct(
return sidecars, nil
}

// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns.
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) {
// Retrieve the number of columns.
numberOfColumns := params.BeaconConfig().NumberOfColumns

if sc.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.DataColumn)
}

if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
return false, errMismatchLength
}

count := len(sc.DataColumn)

commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)

for i := range sc.DataColumn {
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
indices = append(indices, sc.ColumnIndex)
cells = append(cells, kzg.Cell(sc.DataColumn[i]))
proofs = append(proofs, kzg.Bytes48(sc.KzgProof[i]))
for _, sidecar := range sidecars {
// Check if the columns index is not too large
if sidecar.ColumnIndex >= numberOfColumns {
return false, errIndexTooLarge
}

// Check if the KZG commitments size and data column size match.
if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) {
return false, errMismatchLength
}

// Check if the KZG proofs size and data column size match.
if len(sidecar.DataColumn) != len(sidecar.KzgProof) {
return false, errMismatchLength
}

for i := range sidecar.DataColumn {
commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i]))
indices = append(indices, sidecar.ColumnIndex)
cells = append(cells, kzg.Cell(sidecar.DataColumn[i]))
proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i]))
}
}

// Verify all the batch at once.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return false, errors.Wrap(err, "verify cell KZG proof batch")
}

return kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
return verified, nil
}

// CustodySubnetCount returns the number of subnets the node should participate in for custody.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
for i, sidecar := range sCars {
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol})
require.NoError(t, err)
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
Expand Down
49 changes: 15 additions & 34 deletions beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand All @@ -21,22 +20,14 @@ import (
// This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their
// block, at which time they will undergo full verification and be saved to the disk.
type LazilyPersistentStoreColumn struct {
store *filesystem.BlobStorage
cache *cache
verifier ColumnBatchVerifier
nodeID enode.ID
store *filesystem.BlobStorage
cache *cache
}

type ColumnBatchVerifier interface {
VerifiedRODataColumns(ctx context.Context, blk blocks.ROBlock, sc []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error)
}

func NewLazilyPersistentStoreColumn(store *filesystem.BlobStorage, verifier ColumnBatchVerifier, id enode.ID) *LazilyPersistentStoreColumn {
func NewLazilyPersistentStoreColumn(store *filesystem.BlobStorage) *LazilyPersistentStoreColumn {
return &LazilyPersistentStoreColumn{
store: store,
cache: newCache(),
verifier: verifier,
nodeID: id,
store: store,
cache: newCache(),
}
}

Expand Down Expand Up @@ -120,33 +111,23 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
sidecars, err := entry.filterColumns(blockRoot, blockCommitments)
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
}

// Do thorough verifications of each RODataColumns for the block.
// Same as above, we don't save DataColumnsSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars)
if err != nil {
var me verification.VerificationMultiError
ok := errors.As(err, &me)
if ok {
fails := me.Failures()
lf := make(log.Fields, len(fails))
for i := range fails {
lf[fmt.Sprintf("fail_%d", i)] = fails[i].Error()
}
log.WithFields(lf).
Debug("invalid ColumnSidecars received")
}
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot)
// Create verified RO data columns from RO data columns.
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumns))

for _, roDataColumn := range roDataColumns {
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
}

// Ensure that each column sidecar is written to disk.
for i := range vscs {
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot)
for _, verifiedRODataColumn := range verifiedRODataColumns {
if err := s.store.SaveDataColumn(verifiedRODataColumn); err != nil {
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", verifiedRODataColumn.ColumnIndex, blockRoot)
}
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ go_test(
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
Expand Down
76 changes: 38 additions & 38 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -60,7 +61,7 @@ type dataColumnSampler1D struct {
// peerFromColumn maps a column to the peer responsible for custody.
peerFromColumn map[uint64]map[peer.ID]bool
// columnVerifier verifies a column according to the specified requirements.
columnVerifier verification.NewColumnVerifier
columnVerifier verification.NewDataColumnsVerifier
}

// newDataColumnSampler1D creates a new 1D data column sampler.
Expand All @@ -69,7 +70,7 @@ func newDataColumnSampler1D(
clock *startup.Clock,
ctxMap ContextByteVersions,
stateNotifier statefeed.Notifier,
colVerifier verification.NewColumnVerifier,
colVerifier verification.NewDataColumnsVerifier,
) *dataColumnSampler1D {
numColumns := params.BeaconConfig().NumberOfColumns
peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns)
Expand Down Expand Up @@ -265,7 +266,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2)

// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
_, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
_, _, err = d.incrementalDAS(ctx, data, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Failed to run incremental DAS")
}
Expand All @@ -276,21 +277,22 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns.
func (d *dataColumnSampler1D) incrementalDAS(
ctx context.Context,
root [fieldparams.RootLength]byte,
blockProcessedData *statefeed.BlockProcessedData,
columns []uint64,
sampleCount uint64,
) (bool, []roundSummary, error) {
allowedFailures := uint64(0)
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.
blockRoot := blockProcessedData.BlockRoot

start := time.Now()

for round := 1; ; /*No exit condition */ round++ {
if extendedSampleCount > uint64(len(columns)) {
// We already tried to sample all possible columns, this is the unhappy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockRoot),
"round": round - 1,
}).Warning("Some columns are still missing after trying to sample all possible columns")
return false, roundSummaries, nil
Expand All @@ -301,13 +303,13 @@ func (d *dataColumnSampler1D) incrementalDAS(
columnsToSampleCount := extendedSampleCount - firstColumnToSample

log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockRoot),
"columns": columnsToSample,
"round": round,
}).Debug("Start data columns sampling")

// Sample data columns from peers in parallel.
retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample)
retrievedSamples := d.sampleDataColumns(ctx, blockProcessedData, columnsToSample)

missingSamples := make(map[uint64]bool)
for _, column := range columnsToSample {
Expand All @@ -325,7 +327,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
if retrievedSampleCount == columnsToSampleCount {
// All columns were correctly sampled, this is the happy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockRoot),
"neededRounds": round,
"duration": time.Since(start),
}).Debug("All columns were successfully sampled")
Expand All @@ -344,7 +346,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, allowedFailures)

log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockRoot),
"round": round,
"missingColumnsCount": allowedFailures,
"currentSampleIndex": oldExtendedSampleCount,
Expand All @@ -355,7 +357,7 @@ func (d *dataColumnSampler1D) incrementalDAS(

func (d *dataColumnSampler1D) sampleDataColumns(
ctx context.Context,
root [fieldparams.RootLength]byte,
blockProcessedData *statefeed.BlockProcessedData,
columns []uint64,
) map[uint64]bool {
// distribute samples to peer
Expand All @@ -365,10 +367,12 @@ func (d *dataColumnSampler1D) sampleDataColumns(
mu sync.Mutex
wg sync.WaitGroup
)

res := make(map[uint64]bool)

sampleFromPeer := func(pid peer.ID, cols map[uint64]bool) {
defer wg.Done()
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, root, cols)
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, blockProcessedData, cols)

mu.Lock()
for col := range retrieved {
Expand Down Expand Up @@ -414,15 +418,15 @@ func (d *dataColumnSampler1D) distributeSamplesToPeer(
func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
ctx context.Context,
pid peer.ID,
root [fieldparams.RootLength]byte,
blockProcessedData *statefeed.BlockProcessedData,
requestedColumns map[uint64]bool,
) map[uint64]bool {
retrievedColumns := make(map[uint64]bool)

req := make(types.DataColumnSidecarsByRootReq, 0)
for col := range requestedColumns {
req = append(req, &eth.DataColumnIdentifier{
BlockRoot: root[:],
BlockRoot: blockProcessedData.BlockRoot[:],
ColumnIndex: col,
})
}
Expand All @@ -434,22 +438,23 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
return nil
}

// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch instead of looping over columns.
for _, roDataColumn := range roDataColumns {
if verifyColumn(roDataColumn, root, pid, requestedColumns, d.columnVerifier) {
if verifyColumn(roDataColumn, blockProcessedData, pid, requestedColumns, d.columnVerifier) {
retrievedColumns[roDataColumn.ColumnIndex] = true
}
}

if len(retrievedColumns) == len(requestedColumns) {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
"requestedColumns": sortedSliceFromMap(requestedColumns),
}).Debug("Sampled columns from peer successfully")
} else {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
"requestedColumns": sortedSliceFromMap(requestedColumns),
"retrievedColumns": sortedSliceFromMap(retrievedColumns),
}).Debug("Sampled columns from peer with some errors")
Expand Down Expand Up @@ -506,20 +511,22 @@ func selectRandomPeer(peers map[peer.ID]bool) peer.ID {
// the KZG inclusion and the KZG proof.
func verifyColumn(
roDataColumn blocks.RODataColumn,
root [32]byte,
blockProcessedData *statefeed.BlockProcessedData,
pid peer.ID,
requestedColumns map[uint64]bool,
columnVerifier verification.NewColumnVerifier,
dataColumnsVerifier verification.NewDataColumnsVerifier,
) bool {
retrievedColumn := roDataColumn.ColumnIndex

// Filter out columns with incorrect root.
actualRoot := roDataColumn.BlockRoot()
if actualRoot != root {
columnRoot := roDataColumn.BlockRoot()
blockRoot := blockProcessedData.BlockRoot

if columnRoot != blockRoot {
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", root),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
"requestedRoot": fmt.Sprintf("%#x", blockRoot),
"columnRoot": fmt.Sprintf("%#x", columnRoot),
}).Debug("Retrieved root does not match requested root")

return false
Expand All @@ -538,25 +545,18 @@ func verifyColumn(
return false
}

vf := columnVerifier(roDataColumn, verification.SamplingColumnSidecarRequirements)
// Filter out columns which did not pass the KZG inclusion proof verification.
if err := vf.SidecarInclusionProven(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).WithError(err).Debug("Failed to verify KZG inclusion proof for retrieved column")
return false
roBlock := blockProcessedData.SignedBlock.Block()

wrappedBlockDataColumns := []verify.WrappedBlockDataColumn{
{
ROBlock: roBlock,
RODataColumn: roDataColumn,
},
}

// Filter out columns which did not pass the KZG proof verification.
if err := vf.SidecarKzgProofVerified(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).WithError(err).Debug("Failed to verify KZG proof for retrieved column")
if err := verify.DataColumnsAlignWithBlock(wrappedBlockDataColumns, dataColumnsVerifier); err != nil {
return false
}

return true
}
Loading

0 comments on commit 0672fb9

Please sign in to comment.