Skip to content

Commit

Permalink
PeerDAS: Fix initial sync with super nodes (#14495)
Browse files Browse the repository at this point in the history
* Improve logging.

* `retrieveMissingDataColumnsFromPeers`: Limit to `512` items per request.

* `retrieveMissingDataColumnsFromPeers`: Allow `nil` peers.

Before this commit:
If, when this funcion is called, we are not yet connected to enough peers, then `peers` will be possibly not be satisfaying,
and, if new peers are connected, we will never see them.

After this commit:
If `peers` is `nil`, then we regularly check for all connected peers.
If `peers` is not `nil`, then we use them.
  • Loading branch information
nalepae committed Nov 22, 2024
1 parent 0cfa081 commit 450af13
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 60 deletions.
117 changes: 63 additions & 54 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
}

if coreTime.PeerDASIsActive(start) {
connectedPeers := f.p2p.Peers().Connected()
response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, connectedPeers)
response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil)
return response
}

Expand Down Expand Up @@ -850,7 +849,23 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
peers[i], peers[j] = peers[j], peers[i]
})

var columnsLog interface{} = "all"
columnsCount := uint64(len(request.Columns))
numberOfColumns := params.BeaconConfig().NumberOfColumns
if columnsCount < numberOfColumns {
columnsLog = request.Columns
}

log := log.WithFields(logrus.Fields{
"start": request.StartSlot,
"count": request.Count,
"columns": columnsLog,
"items": request.Count * columnsCount,
})

for _, peer := range peers {
log := log.WithField("peer", peer)

if ctx.Err() != nil {
return nil, "", ctx.Err()
}
Expand All @@ -861,12 +876,9 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
defer l.Unlock()

log.WithFields(logrus.Fields{
"peer": peer,
"start": request.StartSlot,
"count": request.Count,
"capacity": f.rateLimiter.Remaining(peer.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(peer),
}).Debug("Requesting data columns")
}).Debug("Data columns by range - requesting")

// We're intentionally abusing the block rate limit here, treating data column requests as if they were block requests.
// Since column requests take more bandwidth than blocks, we should improve how we account for the different kinds
Expand All @@ -883,32 +895,28 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
}()

if err != nil {
log.WithError(err).WithField("peer", peer).Warning("Could not wait for bandwidth")
log.WithError(err).Warning("Data columns by range - could not wait for bandwidth")
continue
}

roDataColumns, err := prysmsync.SendDataColumnsByRangeRequest(ctx, f.clock, f.p2p, peer, f.ctxMap, request)
if err != nil {
log.WithField("peer", peer).WithError(err).Warning("Could not send data columns by range request")
log.WithError(err).Warning("Data columns by range - could not send data columns by range request")
continue
}

// If the peer did not return any data columns, go to the next peer.
if len(roDataColumns) == 0 {
log.WithFields(logrus.Fields{
"peer": peer,
"start": request.StartSlot,
"count": request.Count,
}).Debug("Peer did not returned any data columns")
log.Debug("Data columns by range - peer did not returned any data columns")

continue
}

// We have received at least one data columns from the peer.
// We have received at least one data columns from the peer. This is the happy path.
return roDataColumns, peer, nil
}

// No peer returned any data columns.
// No peer returned any data columns. This this the unhappy path.
return nil, "", nil
}

Expand Down Expand Up @@ -1008,15 +1016,13 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
indicesFromRoot map[[fieldparams.RootLength]byte][]int,
peers []peer.ID,
) error {
const delay = 5 * time.Second

columnsCount := 0
for _, columns := range missingColumnsFromRoot {
columnsCount += len(columns)
}
const (
delay = 5 * time.Second
batchSize = 512
)

start := time.Now()
log.WithField("columnsCount", columnsCount).Debug("Retrieving missing data columns from peers - start")
log.Debug("Retrieving missing data columns from peers - start")

for len(missingColumnsFromRoot) > 0 {
if ctx.Err() != nil {
Expand All @@ -1041,18 +1047,41 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
}
}

// Get a sorted slice of missing data columns.
missingDataColumnsSlice := sortedSliceFromMap(missingDataColumns)
missingDataColumnsCount := uint64(len(missingDataColumnsSlice))

numberOfColumns := params.BeaconConfig().NumberOfColumns
var requestedColumnsLog interface{} = "all"

if missingDataColumnsCount < numberOfColumns {
requestedColumnsLog = missingDataColumnsSlice
}

// Reduce blocks count until the total number of elements is less than the batch size.
for missingDataColumnsCount*blocksCount > batchSize {
blocksCount /= 2
}

// If no peer is specified, get all connected peers.
peersToFilter := peers
if peersToFilter == nil {
peersToFilter = f.p2p.Peers().Connected()
}

// Filter peers.
filteredPeers, err := f.peersWithSlotAndDataColumns(peers, lastSlot, missingDataColumns)
filteredPeers, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns)
if err != nil {
return errors.Wrap(err, "peers with slot and data columns")
}

if len(filteredPeers) == 0 {
log.
WithFields(logrus.Fields{
"peers": filteredPeers,
"delay": delay,
"targetSlot": lastSlot,
"peers": peersToFilter,
"filteredPeers": filteredPeers,
"delay": delay,
"targetSlot": lastSlot,
}).
Warning("No peers available to retrieve missing data columns, retrying later")

Expand All @@ -1067,26 +1096,27 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
request := &p2ppb.DataColumnSidecarsByRangeRequest{
StartSlot: startSlot,
Count: blocksCount,
Columns: sortedSliceFromMap(missingDataColumns),
Columns: missingDataColumnsSlice,
}

// Get all the blocks and data columns we should retrieve.
blockFromRoot := blockFromRoot(bwb[firstIndex : lastIndex+1])

// Iterate requests over all peers, and exits as soon as at least one data column is retrieved.
roDataColumns, peer, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers)
roDataColumns, _, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers)
if err != nil {
return errors.Wrap(err, "request data columns from peers")
}

if len(roDataColumns) == 0 {
log.
WithFields(logrus.Fields{
"peers": filteredPeers,
"delay": delay,
"startSlot": startSlot,
"count": blocksCount,
"columns": sortedSliceFromMap(missingDataColumns),
"peers": peers,
"filteredPeers": filteredPeers,
"delay": delay,
"start": startSlot,
"count": blocksCount,
"columns": requestedColumnsLog,
}).
Warning("No data columns returned from any peer, retrying later")

Expand All @@ -1096,27 +1126,6 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(

// Process the retrieved data columns.
processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb, f.cv)

if len(missingColumnsFromRoot) > 0 {
numberOfColumns := params.BeaconConfig().NumberOfColumns

for root, missingColumns := range missingColumnsFromRoot {
missingColumnsCount := uint64(len(missingColumns))
var missingColumnsLog interface{} = "all"

if missingColumnsCount < numberOfColumns {
missingColumnsLog = sortedSliceFromMap(missingColumns)
}

slot := blockFromRoot[root].Block().Slot()
log.WithFields(logrus.Fields{
"peer": peer,
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"missingColumns": missingColumnsLog,
}).Debug("Peer did not returned all requested data columns")
}
}
}

log.WithField("duration", time.Since(start)).Debug("Retrieving missing data columns from peers - success")
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,14 @@ func SendDataColumnsByRangeRequest(
columnsLog = columns
}

log.WithFields(logrus.Fields{
log := log.WithFields(logrus.Fields{
"peer": pid,
"topic": topic,
"startSlot": req.StartSlot,
"count": req.Count,
"columns": columnsLog,
"totalCount": req.Count * uint64(len(req.Columns)),
}).Debug("Sending data column by range request")
})

stream, err := p2pApi.Send(ctx, req, topic, pid)
if err != nil {
Expand Down Expand Up @@ -391,19 +391,19 @@ func SendDataColumnsByRangeRequest(
}

if err != nil {
log.WithError(err).WithField("peer", pid).Debug("Error reading chunked data column sidecar")
log.WithError(err).Debug("Error reading chunked data column sidecar")
break
}

if roDataColumn == nil {
log.WithError(err).WithField("peer", pid).Debug("Validation error")
log.WithError(err).Debug("Validation error")
continue
}

if i >= max {
// The response MUST contain no more than `reqCount` blocks.
// (`reqCount` is already capped by `maxRequestDataColumnSideCar`.)
log.WithError(err).WithField("peer", pid).Debug("Response contains more data column sidecars than maximum")
log.WithError(err).Debug("Response contains more data column sidecars than maximum")
break
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error {
}
// Do not return an error for ping requests.
if err := s.sendPingRequest(ctx, id); err != nil && !isUnwantedError(err) {
log.WithError(err).Debug("Could not ping peer")
log.WithError(err).WithField("pid", id).Debug("Could not ping peer")
}
return nil
}
Expand Down

0 comments on commit 450af13

Please sign in to comment.