Skip to content

Commit

Permalink
Save data columns when initial syncing.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Oct 3, 2024
1 parent 4497d34 commit 5038db1
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 56 deletions.
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {

nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), b); err != nil {
return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot())
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ func (s *Service) handleDA(
if err != nil {
return 0, err
}
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil {

nodeID := s.cfg.P2P.NodeID()
if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), rob); err != nil {
return 0, errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)")
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/das/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/das",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
3 changes: 2 additions & 1 deletion beacon-chain/das/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.RO

// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := commitmentsToCheck(b, current)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
Expand Down
105 changes: 77 additions & 28 deletions beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -75,39 +76,58 @@ func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc

// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := fullCommitmentsToCheck(b, current)
func (s *LazilyPersistentStoreColumn) IsDataAvailable(
ctx context.Context,
nodeID enode.ID,
currentSlot primitives.Slot,
block blocks.ROBlock,
) error {
blockCommitments, err := fullCommitmentsToCheck(nodeID, block, currentSlot)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot)
}
// Return early for blocks that are pre-deneb or which do not have any commitments.

// Return early for blocks that do not have any commitments.
if blockCommitments.count() == 0 {
return nil
}

key := keyFromBlock(b)
// Build the cache key for the block.
key := keyFromBlock(block)

// Retrieve the cache entry for the block, or create an empty one if it doesn't exist.
entry := s.cache.ensure(key)

// Delete the cache entry for the block at the end.
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)

// Get the root of the block.
blockRoot := block.Root()

// Wait for the summarizer to be ready before proceeding.
summarizer, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
log.
WithField("root", fmt.Sprintf("%#x", blockRoot)).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
// Get the summary for the block, and set it in the cache entry.
summary := summarizer.Summary(blockRoot)
entry.setDiskSummary(summary)
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
sidecars, err := entry.filterColumns(root, &blockCommitments)
sidecars, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
}
// Do thorough verifications of each BlobSidecar for the block.
// Same as above, we don't save BlobSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, b, sidecars)

// Do thorough verifications of each RODataColumns for the block.
// Same as above, we don't save DataColumnsSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars)
if err != nil {
var me verification.VerificationMultiError
ok := errors.As(err, &me)
Expand All @@ -120,33 +140,62 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre
log.WithFields(lf).
Debug("invalid ColumnSidecars received")
}
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", root)
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot)
}

// Ensure that each column sidecar is written to disk.
for i := range vscs {
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
return errors.Wrapf(err, "failed to save ColumnSidecar index %d for block %#x", vscs[i].ColumnIndex, root)
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot)
}
}
// All ColumnSidecars are persisted - da check succeeds.

// All ColumnSidecars are persisted - data availability check succeeds.
return nil
}

func fullCommitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentsArray, error) {
var ar safeCommitmentsArray
if b.Version() < version.Deneb {
return ar, nil
// fullCommitmentsToCheck returns the commitments to check for a given block.
func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) {
// Return early for blocks that are pre-deneb.
if block.Version() < version.Deneb {
return &safeCommitmentsArray{}, nil
}
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return ar, nil

// Compute the block epoch.
blockSlot := block.Block().Slot()
blockEpoch := slots.ToEpoch(blockSlot)

// Compute the current spoch.
currentEpoch := slots.ToEpoch(currentSlot)

// Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window.
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
return &safeCommitmentsArray{}, nil
}
kc, err := b.Block().Body().BlobKzgCommitments()

// Retrieve the KZG commitments for the block.
kzgCommitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return ar, err
return nil, errors.Wrap(err, "blob KZG commitments")
}
for i := range ar {
ar[i] = kc

// Return early if there are no commitments in the block.
if len(kzgCommitments) == 0 {
return &safeCommitmentsArray{}, nil
}
return ar, nil

// Retrieve the custody columns.
custodySubnetCount := peerdas.CustodySubnetCount()
custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}

// Create a safe commitments array for the custody columns.
commitmentsArray := &safeCommitmentsArray{}
for column := range custodyColumns {
commitmentsArray[column] = kzgCommitments
}

return commitmentsArray, nil
}
3 changes: 2 additions & 1 deletion beacon-chain/das/availability_columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package das
import (
"testing"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestFullCommitmentsToCheck(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co, err := fullCommitmentsToCheck(b, c.slot)
co, err := fullCommitmentsToCheck(enode.ID{}, b, c.slot)
if c.err != nil {
require.ErrorIs(t, err, c.err)
} else {
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/das/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"testing"

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
Expand Down Expand Up @@ -124,18 +125,18 @@ func TestLazilyPersistent_Missing(t *testing.T) {

// Only one commitment persisted, should return error with other indices
require.NoError(t, as.Persist(1, scs[2]))
err := as.IsDataAvailable(ctx, 1, blk)
err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.ErrorIs(t, err, errMissingSidecar)

// All but one persisted, return missing idx
require.NoError(t, as.Persist(1, scs[0]))
err = as.IsDataAvailable(ctx, 1, blk)
err = as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.ErrorIs(t, err, errMissingSidecar)

// All persisted, return nil
require.NoError(t, as.Persist(1, scs...))

require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
require.NoError(t, as.IsDataAvailable(ctx, enode.ID{}, 1, blk))
}

func TestLazilyPersistent_Mismatch(t *testing.T) {
Expand All @@ -150,7 +151,7 @@ func TestLazilyPersistent_Mismatch(t *testing.T) {

// Only one commitment persisted, should return error with other indices
require.NoError(t, as.Persist(1, scs[0]))
err := as.IsDataAvailable(ctx, 1, blk)
err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk)
require.NotNil(t, err)
require.ErrorIs(t, err, errCommitmentMismatch)
}
Expand Down
53 changes: 38 additions & 15 deletions beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,33 +134,37 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return scs, nil
}

func (e *cacheEntry) filterColumns(root [32]byte, kc *safeCommitmentsArray) ([]blocks.RODataColumn, error) {
if e.diskSummary.AllAvailable(kc.count()) {
func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitmentsArray) ([]blocks.RODataColumn, error) {
nonEmptyIndices := commitmentsArray.nonEmptyIndices()
if e.diskSummary.AllDataColumnsAvailable(nonEmptyIndices) {
return nil, nil
}
scs := make([]blocks.RODataColumn, 0, kc.count())

commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)

for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// We already have this blob, we don't need to write it or validate it.
// Skip if we arleady store this data column.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.colScs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
}

if commitmentsArray[i] == nil {
continue
}

if e.colScs[i] == nil {
return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i)
}
if !reflect.DeepEqual(kc[i], e.colScs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, kc[i])

if !reflect.DeepEqual(commitmentsArray[i], e.colScs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, commitmentsArray[i])
}
scs = append(scs, *e.colScs[i])

sidecars = append(sidecars, *e.colScs[i])
}

return scs, nil
return sidecars, nil
}

// safeCommitmentArray is a fixed size array of commitment byte slices. This is helpful for avoiding
Expand All @@ -176,13 +180,32 @@ func (s safeCommitmentArray) count() int {
return fieldparams.MaxBlobsPerBlock
}

// safeCommitmentsArray is a fixed size array of commitments.
// This is helpful for avoiding gratuitous bounds checks.
type safeCommitmentsArray [fieldparams.NumberOfColumns][][]byte

// count returns the number of commitments in the array.
func (s *safeCommitmentsArray) count() int {
count := 0

for i := range s {
if s[i] == nil {
return i
if s[i] != nil {
count++
}
}
return fieldparams.NumberOfColumns

return count
}

// nonEmptyIndices returns a map of indices that are non-nil in the array.
func (s *safeCommitmentsArray) nonEmptyIndices() map[uint64]bool {
columns := make(map[uint64]bool)

for i := range s {
if s[i] != nil {
columns[uint64(i)] = true
}
}

return columns
}
3 changes: 2 additions & 1 deletion beacon-chain/das/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package das
import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
Expand All @@ -14,6 +15,6 @@ import (
// IsDataAvailable guarantees that all blobs committed to in the block have been
// durably persisted before returning a non-error value.
type AvailabilityStore interface {
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
IsDataAvailable(ctx context.Context, nodeID enode.ID, current primitives.Slot, b blocks.ROBlock) error
Persist(current primitives.Slot, sc ...blocks.ROBlob) error
}
3 changes: 2 additions & 1 deletion beacon-chain/das/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package das
import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
Expand All @@ -16,7 +17,7 @@ type MockAvailabilityStore struct {
var _ AvailabilityStore = &MockAvailabilityStore{}

// IsDataAvailable satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests.
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error {
if m.VerifyAvailabilityCallback != nil {
return m.VerifyAvailabilityCallback(ctx, current, b)
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
Loading

0 comments on commit 5038db1

Please sign in to comment.