Skip to content

Commit

Permalink
PeerDAS: Misc improvements (#14482)
Browse files Browse the repository at this point in the history
* `retrieveMissingDataColumnsFromPeers`: Improve logging.

* `dataColumnSidecarByRootRPCHandler`: Stop decreasing peer's score if asking for a column we do not custody.

* `dataColumnSidecarByRootRPCHandler`: If a data column is unavailable, stop waiting for it.

This behaviour was useful for peer sampling.
Now, just return the data column if we store it.
If we don't, skip.

* Dirty code comment.

* `retrieveMissingDataColumnsFromPeers`: Improve logs.

* `SendDataColumnsByRangeRequest`: Improve logs.

* `dataColumnSidecarsByRangeRPCHandler`: Improve logs.
  • Loading branch information
nalepae committed Oct 7, 2024
1 parent 983323e commit d41edbb
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 62 deletions.
60 changes: 43 additions & 17 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,13 @@ func (f *blocksFetcher) filterPeersForDataColumns(
dataColumns map[uint64]bool,
peers []peer.ID,
) ([]peer.ID, error) {
// Filter peers based on the percentage of peers to be used in a request.
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
// TODO: Uncomment when we are not in devnet any more.
// TODO: Find a way to have this uncommented without being in devnet.
// // Filter peers based on the percentage of peers to be used in a request.
// peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)

// Filter peers on bandwidth.
peers = f.hasSufficientBandwidth(peers, blocksCount)
// // Filter peers on bandwidth.
// peers = f.hasSufficientBandwidth(peers, blocksCount)

// Select peers which custody ALL wanted columns.
// Basically, it is very unlikely that a non-supernode peer will have custody of all columns.
Expand Down Expand Up @@ -892,6 +894,7 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(
"capacity": f.rateLimiter.Remaining(peer.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(peer),
}).Debug("Requesting data columns")

// We're intentionally abusing the block rate limit here, treating data column requests as if they were block requests.
// Since column requests take more bandwidth than blocks, we should improve how we account for the different kinds
// of requests, more in proportion to the cost of serving them.
Expand All @@ -918,7 +921,6 @@ func (f *blocksFetcher) requestDataColumnsFromPeers(

// If the peer did not return any data columns, go to the next peer.
if len(roDataColumns) == 0 {
log.WithField("peer", peer).Warning("Peer did not return any data columns")
continue
}

Expand Down Expand Up @@ -1017,8 +1019,10 @@ func processRetrievedDataColumns(
// This function:
// - Mutate `bwb` by adding the retrieved data columns.
// - Mutate `missingColumnsFromRoot` by removing the columns that have been retrieved.
// This function returns when all the missing data columns have been retrieved.
func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(ctx context.Context,
// This function returns when all the missing data columns have been retrieved,
// or when the context is canceled.
func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(
ctx context.Context,
bwb []blocks.BlockWithROBlobs,
missingColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
indicesFromRoot map[[fieldparams.RootLength]byte][]int,
Expand Down Expand Up @@ -1048,13 +1052,19 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(ctx context.Context,
}

// Filter peers.
peers, err := f.filterPeersForDataColumns(ctx, blocksCount, missingDataColumns, peers)
filteredPeers, err := f.filterPeersForDataColumns(ctx, blocksCount, missingDataColumns, peers)
if err != nil {
return errors.Wrap(err, "filter peers for data columns")
}

if len(peers) == 0 {
log.Warning("No peers available to retrieve missing data columns, retrying in 5 seconds")
if len(filteredPeers) == 0 {
log.
WithFields(logrus.Fields{
"nonFilteredPeersCount": len(peers),
"filteredPeersCount": len(filteredPeers),
}).
Debug("No peers available to retrieve missing data columns, retrying in 5 seconds")

time.Sleep(5 * time.Second)
continue
}
Expand All @@ -1072,22 +1082,38 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(ctx context.Context,
// Get all the blocks and data columns we should retrieve.
blockFromRoot := blockFromRoot(bwb[firstIndex : lastIndex+1])

// Iterate request over all peers, and exit as soon as at least one data column is retrieved.
roDataColumns, peer, err := f.requestDataColumnsFromPeers(ctx, request, peers)
// Iterate requests over all peers, and exits as soon as at least one data column is retrieved.
roDataColumns, peer, err := f.requestDataColumnsFromPeers(ctx, request, filteredPeers)
if err != nil {
return errors.Wrap(err, "request data columns from peers")
}

if len(roDataColumns) == 0 {
log.Debug("No data columns returned from any peer, retrying in 5 seconds")
time.Sleep(5 * time.Second)
continue
}

// Process the retrieved data columns.
processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb, f.cv)

if len(missingColumnsFromRoot) > 0 {
for root, columns := range missingColumnsFromRoot {
numberOfColumns := params.BeaconConfig().NumberOfColumns

for root, missingColumns := range missingColumnsFromRoot {
missingColumnsCount := uint64(len(missingColumns))
var missingColumnsLog interface{} = "all"

if missingColumnsCount < numberOfColumns {
missingColumnsLog = sortedSliceFromMap(missingColumns)
}

slot := blockFromRoot[root].Block().Slot()
log.WithFields(logrus.Fields{
"peer": peer,
"root": fmt.Sprintf("%#x", root),
"slot": blockFromRoot[root].Block().Slot(),
"columns": columns,
"peer": peer,
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"missingColumns": missingColumnsLog,
}).Debug("Peer did not correctly return data columns")
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_data_column_sidecars_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
defer ticker.Stop()
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)
if err != nil {
log.WithError(err).Info("error in DataColumnSidecarsByRange batch")
log.WithError(err).Info("Error in DataColumnSidecarsByRange batch")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
return err
Expand Down
44 changes: 2 additions & 42 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
s.rateLimiter.add(stream, 1)
requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].ColumnIndex

// Decrease the peer's score if it requests a column that is not custodied.
isCustodied := custodyColumns[requestedIndex]
if !isCustodied {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream)
return types.ErrInvalidColumnIndex
}

// TODO: Differentiate between blobs and columns for our storage engine
// If the data column is nil, it means it is not yet available in the db.
// We wait for it to be available.

// Retrieve the data column from the database.
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)

Expand All @@ -185,38 +174,9 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrap(err, "get column")
}

// If the data column is not found in the db, just skip it.
if err != nil && db.IsNotFound(err) {
fields := logrus.Fields{
"root": fmt.Sprintf("%#x", requestedRoot),
"index": requestedIndex,
}

log.WithFields(fields).Debug("Peer requested data column sidecar by root not found in db, waiting for it to be available")

loop:
for {
select {
case receivedRootIndex := <-rootIndexChan:
if receivedRootIndex.Root == requestedRoot && receivedRootIndex.Index == requestedIndex {
// This is the data column we are looking for.
log.WithFields(fields).Debug("Data column sidecar by root is now available in the db")

break loop
}

case <-ctx.Done():
closeStream(stream, log)
return errors.Errorf("context closed while waiting for data column with root %#x and index %d", requestedRoot, requestedIndex)
}
}

// Retrieve the data column from the db.
dataColumnSidecar, err = s.cfg.blobStorage.GetColumn(requestedRoot, requestedIndex)
if err != nil {
// This time, no error (even not found error) should be returned.
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get column")
}
continue
}

// If any root in the request content references a block earlier than minimum_request_epoch,
Expand Down
18 changes: 16 additions & 2 deletions beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"slices"
"sort"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -340,15 +341,28 @@ func SendDataColumnsByRangeRequest(
) ([]blocks.RODataColumn, error) {
topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
return nil, errors.Wrap(err, "topic from message")
}

var columnsLog interface{} = "all"
numberOfColumns := params.BeaconConfig().NumberOfColumns
columnsCount := uint64(len(req.Columns))

if columnsCount < numberOfColumns {
columns := req.Columns
slices.Sort[[]uint64](columns)
columnsLog = columns
}

log.WithFields(logrus.Fields{
"peer": pid,
"topic": topic,
"startSlot": req.StartSlot,
"count": req.Count,
"columns": req.Columns,
"columns": columnsLog,
"totalCount": req.Count * uint64(len(req.Columns)),
}).Debug("Sending data column by range request")

stream, err := p2pApi.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
Expand Down

0 comments on commit d41edbb

Please sign in to comment.