Skip to content

Commit

Permalink
Fix Nishant's comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Oct 4, 2024
1 parent f42fb87 commit 6560787
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 24 deletions.
28 changes: 19 additions & 9 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,11 @@ func (f *blocksFetcher) blocksWithMissingDataColumnsBoundaries(
}

// custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`.
func (f *blocksFetcher) custodyAllNeededColumns(inputPeers []peer.ID, columns map[uint64]bool) ([]peer.ID, error) {
outputPeers := make([]peer.ID, 0, len(inputPeers))
func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) {
outputPeers := make(map[peer.ID]bool, len(inputPeers))

loop:
for _, peer := range inputPeers {
for peer := range inputPeers {
// Get the node ID from the peer ID.
nodeID, err := p2p.ConvertPeerIDToNodeID(peer)
if err != nil {
Expand All @@ -731,7 +731,7 @@ loop:
}
}

outputPeers = append(outputPeers, peer)
outputPeers[peer] = true
}

return outputPeers, nil
Expand Down Expand Up @@ -842,11 +842,16 @@ func maxInt(slice []int) int {
func (f *blocksFetcher) requestDataColumnsFromPeers(
ctx context.Context,
request *p2ppb.DataColumnSidecarsByRangeRequest,
peers []peer.ID,
peers map[peer.ID]bool,
) ([]blocks.RODataColumn, peer.ID, error) {
peersSlice := make([]peer.ID, 0, len(peers))
for peer := range peers {
peersSlice = append(peersSlice, peer)
}

// Shuffle peers to avoid always querying the same peers
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
f.rand.Shuffle(len(peersSlice), func(i, j int) {
peersSlice[i], peersSlice[j] = peersSlice[j], peersSlice[i]
})

var columnsLog interface{} = "all"
Expand All @@ -863,7 +868,7 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
"items": request.Count * columnsCount,
})

for _, peer := range peers {
for _, peer := range peersSlice {
log := log.WithField("peer", peer)

if ctx.Err() != nil {
Expand Down Expand Up @@ -1071,7 +1076,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
}

// Filter peers.
filteredPeers, err := f.peersWithDataColumns(peersToFilter, lastSlot, missingDataColumns)
filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns)
if err != nil {
return errors.Wrap(err, "peers with slot and data columns")
}
Expand All @@ -1086,6 +1091,11 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
}).
Warning("No peers available to retrieve missing data columns, retrying later")

// If no peers are available, log the descriptions to help debugging.
for _, description := range descriptions {
log.Debug(description)
}

time.Sleep(delay)
continue
}
Expand Down
29 changes: 20 additions & 9 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,25 +1370,36 @@ func TestCustodyAllNeededColumns(t *testing.T) {
4 * params.BeaconConfig().CustodyRequirement,
32 * params.BeaconConfig().CustodyRequirement,
4 * params.BeaconConfig().CustodyRequirement,
32 * params.BeaconConfig().CustodyRequirement}
32 * params.BeaconConfig().CustodyRequirement,
}

peersID := make([]peer.ID, 0, len(custodyCounts))
expected := make(map[peer.ID]bool)

peersID := make(map[peer.ID]bool, len(custodyCounts))
for _, custodyCount := range custodyCounts {
peerRecord, peerID := createPeer(t, len(peersID), custodyCount)
peersID = append(peersID, peerID)
peersID[peerID] = true
p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound)
if custodyCount == 32*params.BeaconConfig().CustodyRequirement {
expected[peerID] = true
}
}

expected := []peer.ID{peersID[1], peersID[3]}

blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{
p2p: p2p,
})
blocksFetcher := newBlocksFetcher(
context.Background(),
&blocksFetcherConfig{
p2p: p2p,
},
)

actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns)
require.NoError(t, err)

require.DeepSSZEqual(t, expected, actual)
require.Equal(t, len(expected), len(actual))
for peerID := range expected {
_, ok := actual[peerID]
require.Equal(t, true, ok)
}
}

func TestCustodyColumns(t *testing.T) {
Expand Down
61 changes: 55 additions & 6 deletions beacon-chain/sync/initial-sync/blocks_fetcher_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,14 @@ func (f *blocksFetcher) calculateHeadAndTargetEpochs() (headEpoch, targetEpoch p
return headEpoch, targetEpoch, peers
}

// peersWithDataColumns returns a list of peers that should custody all needed data columns.
func (f *blocksFetcher) peersWithDataColumns(
// peersWithSlotAndDataColumns returns a list of peers that should custody all needed data columns for the given slot.
func (f *blocksFetcher) peersWithSlotAndDataColumns(
peers []peer.ID,
targetSlot primitives.Slot,
dataColumns map[uint64]bool,
) ([]peer.ID, error) {
) (map[peer.ID]bool, []string, error) {
peersCount := len(peers)

// TODO: Uncomment when we are not in devnet any more.
// TODO: Find a way to have this uncommented without being in devnet.
// // Filter peers based on the percentage of peers to be used in a request.
Expand All @@ -388,11 +390,58 @@ func (f *blocksFetcher) peersWithDataColumns(
// TODO: Modify to retrieve data columns from all possible peers.
// TODO: If a peer does respond some of the request columns, do not re-request responded columns.

// Compute the target epoch from the target slot.
targetEpoch := slots.ToEpoch(targetSlot)

peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, peersCount)
descriptions := make([]string, 0, peersCount)

// Filter out peers with head epoch lower than our target epoch.
// Technically, we should be able to use the head slot from the peer.
// However, our vision of the head slot of the peer is updated twice per epoch
// via P2P messages. So it is likely that we think the peer is lagging behind
// while it is actually not.
// ==> We use the head epoch as a proxy instead.
// However, if the peer is actually lagging for a few slots,
// we may requests some data columns it doesn't have yet.
for _, peer := range peers {
peerChainState, err := f.p2p.Peers().ChainState(peer)

if err != nil {
description := fmt.Sprintf("peer %s: error: %s", peer, err)
descriptions = append(descriptions, description)
continue
}

if peerChainState == nil {
description := fmt.Sprintf("peer %s: chain state is nil", peer)
descriptions = append(descriptions, description)
continue
}

peerHeadEpoch := slots.ToEpoch(peerChainState.HeadSlot)

if peerHeadEpoch < targetEpoch {
description := fmt.Sprintf("peer %s: head epoch %d < target epoch %d", peer, peerHeadEpoch, targetEpoch)
descriptions = append(descriptions, description)
continue
}

peersWithAdmissibleHeadEpoch[peer] = true
}

// Filter out peers that do not have all the data columns needed.
finalPeers, err := f.custodyAllNeededColumns(peers, dataColumns)
finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadEpoch, dataColumns)
if err != nil {
return nil, errors.Wrap(err, "custody all needed columns")
return nil, nil, errors.Wrap(err, "custody all needed columns")
}

for peer := range peersWithAdmissibleHeadEpoch {
if _, ok := finalPeers[peer]; !ok {
description := fmt.Sprintf("peer %s: does not custody all needed columns", peer)
descriptions = append(descriptions, description)
}
}

return finalPeers, nil
return finalPeers, descriptions, nil
}

0 comments on commit 6560787

Please sign in to comment.