-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Merged by Bors] - Simplify code in fetch
and syncer
pkgs
#5453
Conversation
59dfb09
to
1eb03ff
Compare
1eb03ff
to
6ab4d1a
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## develop #5453 +/- ##
========================================
Coverage 79.5% 79.6%
========================================
Files 268 268
Lines 26591 26445 -146
========================================
- Hits 21145 21054 -91
+ Misses 3917 3875 -42
+ Partials 1529 1516 -13 ☔ View full report in Codecov by Sentry. |
syncer/interface.go
Outdated
GetMaliciousIDs(context.Context, []p2p.Peer) <-chan fetch.Result | ||
GetLayerData(context.Context, []p2p.Peer, types.LayerID) (<-chan fetch.Result, error) | ||
GetLayerOpinions(context.Context, []p2p.Peer, types.LayerID) (<-chan fetch.Result, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be easier to have these methods only take 1 peer as parameter and return the data
of the response directly instead of a fetch.Result
. This completely gets rid of poll
and makes the caller side also simpler. As an example PollLayerOpinions
becomes:
func (d *DataFetch) PollLayerOpinions(
ctx context.Context,
lid types.LayerID,
needCert bool,
peers []p2p.Peer,
) ([]*fetch.LayerOpinion, []*types.Certificate, error) {
logger := d.logger.WithContext(ctx).WithFields(lid)
opinions := make([]*fetch.LayerOpinion, 0, len(peers))
success := false
var combinedErr error
for _, peer := range peers {
resp, err := d.fetcher.GetLayerOpinion(ctx, peer, lid)
if err != nil {
opnsPeerError.Inc()
logger.With().Debug("received peer error for layer opinions", log.Err(err))
if !success {
combinedErr = errors.Join(combinedErr, err)
}
continue
}
logger.Debug("received layer opinions from peer")
var lo fetch.LayerOpinion
if err := codec.Decode(resp, &lo); err != nil {
logger.With().Debug("error decoding LayerOpinion", log.Err(err))
if !success {
combinedErr = errors.Join(combinedErr, err)
}
continue
}
lo.SetPeer(peer)
opinions = append(opinions, &lo)
success = true
combinedErr = nil
}
if !needCert {
return opinions, nil, combinedErr
}
certs := make([]*types.Certificate, 0, len(opinions))
peerCerts := map[types.BlockID][]p2p.Peer{}
for _, opinion := range opinions {
if opinion.Certified == nil {
continue
}
if _, ok := peerCerts[*opinion.Certified]; !ok {
peerCerts[*opinion.Certified] = []p2p.Peer{}
}
peerCerts[*opinion.Certified] = append(peerCerts[*opinion.Certified], opinion.Peer())
// note that we want to fetch block certificate for types.EmptyBlockID as well,
// but we don't need to register hash for the actual block fetching
if *opinion.Certified != types.EmptyBlockID {
d.fetcher.RegisterPeerHashes(
opinion.Peer(),
[]types.Hash32{opinion.Certified.AsHash32()},
)
}
}
for bid, bidPeers := range peerCerts {
cert, err := d.fetcher.GetCert(ctx, lid, bid, bidPeers)
if err != nil {
certPeerError.Inc()
continue
}
certs = append(certs, cert)
}
return opinions, certs, combinedErr
}
It removes the need to process data from the channel. And now since all the control flow is in PollLayerOpinions
we could re-add an errgroup
to process all concurrently that we want to process concurrently. opinions
and peerCerts
could for instance become channels that are being read from while layer opinions are still being fetched from peers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code from your comment is oversimplified as it makes the queries serially. In reality, you would need to add an errgroup and collect results on a channel. But it still could be simpler then the current approach, let me test it out 👍 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I meant by
we could re-add an
errgroup
to process all concurrently that we want to process concurrently.opinions
andpeerCerts
could for instance become channels that are being read from while layer opinions are still being fetched from peers.
🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fasmat Updated as suggested.
my hope was that someone would rewrite it to use streaming both for reads and writes, rather then doing style improvements. it can be done even in backward compatible manner. for some reason it is getting avoided though. |
@dshulyak I plan to switch to full streaming with no intermediate buffers as a part of syncv2 work (starting with chunked streaming, but I intend to skip intermediate buffering altogether in the final version). But I'm not sure it's worth trying to switch current fetcher to "full streaming" before that's done, as many parts of the existing fetcher will no longer be necessary, and it's possible to do a simple interim solution to avoid deadline errors. Also, the |
@dshulyak I spotted a few places where code was unnecessarily convoluted or inefficient in the process of trying to understand how it works and decided to make it simpler. These are not merely "style improvements" - the code should be more efficient now. |
fetch::requester
interfacefetch
and syncer
pkgs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
bors merge |
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
This PR was included in a batch that was canceled, it will be automatically retried |
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
Build failed: |
bors merge |
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
Build failed (retrying...):
|
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
Pull request successfully merged into develop. Build succeeded: |
fetch
and syncer
pkgsfetch
and syncer
pkgs
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
## Motivation The `requester::Request(...)` interface from the `fetch` package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface. ## Changes - make `fetch::requester::Request(...)` blocking and remove the callbacks. It returns the result directly now, - make `GetMaliciousIDs`, `GetLayerData` and `GetLayerOpinions` from `syncer::fetcher` interface blocking, - remove `fetch::poll(...)` - refactor the code and tests to use the new approach ## Test Plan existing tests pass
Motivation
The
requester::Request(...)
interface from thefetch
package seems unnecessarily complicated with its callback methods. It can be made blocking instead, simplifying the code using this interface.Changes
fetch::requester::Request(...)
blocking and remove the callbacks. It returns the result directly now,GetMaliciousIDs
,GetLayerData
andGetLayerOpinions
fromsyncer::fetcher
interface blocking,fetch::poll(...)
Test Plan
existing tests pass