-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Merged by Bors] - Fix activeset weight calc performance in the proposal handler #5923
Conversation
This fixes multiple concurrent retrievals of the same activeset from the database that were causing high CPU and memory usage. Fixes #5765
ec063da
to
4cc8520
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #5923 +/- ##
=======================================
Coverage 80.7% 80.7%
=======================================
Files 289 288 -1
Lines 29868 29857 -11
=======================================
+ Hits 24107 24117 +10
+ Misses 4164 4152 -12
+ Partials 1597 1588 -9 ☔ View full report in Codecov by Sentry. |
proposals/handler_test.go
Outdated
asCh <- asReq{id: sets[0].Hash()} | ||
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0])) | ||
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0])) | ||
require.NoError(t, err) | ||
|
||
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1])) | ||
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1])) | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead of pre-generating lists of sets and proposals and using p[0], p[1], and so on in respective tests, create only the required set and proposals in the test body?
Also, the EXPECT().GetActiveSet
could be much simpler and explicit (expect to be called an exact number of times) for this test. Currently, it's convoluted and hard to tell whether fetch.GetActiveSet()
is expected to be called once or multiple times (it's also not checked by the test).
Consider something like:
asCh <- asReq{id: sets[0].Hash()} | |
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0])) | |
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0])) | |
require.NoError(t, err) | |
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1])) | |
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1])) | |
require.NoError(t, err) | |
set := types.ATXIDList{{1}, {2}} | |
var p []types.Proposal | |
for _, atx := range set { | |
proposals = append(proposals, gproposal(t, signer, set[0], lid, &types.EpochData{ | |
ActiveSetHash: set.Hash(), | |
Beacon: types.Beacon{1}, | |
})) | |
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0])) | |
// NOTE: the tests expects it to be called only once (no `AnyTimes()` anymore) | |
th.mf.EXPECT().GetActiveSet(gomock.Any(), set.Hash()).DoAndReturn( | |
func(ctx context.Context, got types.Hash32) error { | |
require.NoError(t, activesets.Add(th.db, got, &types.EpochActiveSet{ | |
Epoch: lid.GetEpoch(), | |
Set: set, | |
})) | |
for _, id := range set { | |
th.atxsdata.AddAtx(lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}}) | |
} | |
return nil | |
}) | |
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0])) | |
require.NoError(t, err) | |
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1])) | |
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1])) | |
require.NoError(t, err) | |
// maybe also something like: | |
require.True(t, th.mockController.Satisfied()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of GetActiveSet
mock handler is to be able to pause fetching so that multiple request processing for the same activeset can be checked. For the simpler one-request-after-another case, a simpler impl can be used, but the more complicated one will still have to be kept, so that it can be used for the concurrent cases. And the proposals are already pre-generated, just earlier in the outer test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without asCh <- asReq{id: sets[0].Hash()}
, GetActiveSet
doesn't go ahead, but I'll try to ensure it's called only once (although th.HandleSyncedProposal
would block if it was called 2nd time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no concurrent calls to HandleSyncedProposal
in this test case. I'm proposing not to try a generic approach that would fit all test cases, but create simple and explicit expectations per test case. There is more code, but the tests are more readable and assert more i.e how many times an expectation is called. IMHO, .AnyTimes()
should be avoided when possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the test by adding a fixture to make it more readable, hopefully, so that the intent of each test case is not lost among EXPECT
clutter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the tests check that GetActiveSet
is called just once in each case except refetch (failed fetch / cancelation and then another handler call)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the intent of each test case is not lost among EXPECT clutter
The EXPECTs show the intent of the tests (what is expected to happen) ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I mean repeated large EXPECT
constructs sometimes obscure the intent b/c of "just too much text". That needs some balance :)
proposals/handler.go
Outdated
} else { | ||
// mark calculation as running | ||
h.pendingWeightCalc[id] = nil | ||
h.weightCalcLock.Unlock() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the else
is unnecessary because the code under if
always returns.
} else { | |
// mark calculation as running | |
h.pendingWeightCalc[id] = nil | |
h.weightCalcLock.Unlock() | |
} | |
} | |
// mark calculation as running | |
h.pendingWeightCalc[id] = nil | |
h.weightCalcLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're correct, sorry
Fixed
proposals/handler_test.go
Outdated
asCh <- asReq{id: sets[0].Hash()} | ||
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0])) | ||
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0])) | ||
require.NoError(t, err) | ||
|
||
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1])) | ||
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1])) | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the intent of each test case is not lost among EXPECT clutter
The EXPECTs show the intent of the tests (what is expected to happen) ;)
bors merge |
## Motivation Proposal handler may cause high CPU and memory usage.
Pull request successfully merged into develop. Build succeeded: |
func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) { | ||
h.weightCalcLock.Lock() | ||
totalWeight, exists := h.activeSets.Get(id) | ||
if exists { | ||
h.weightCalcLock.Unlock() | ||
return totalWeight, nil | ||
} | ||
|
||
var ch chan uint64 | ||
chs, exists := h.pendingWeightCalc[id] | ||
if exists { | ||
// The calculation is running or the activeset is being fetched, | ||
// subscribe. | ||
// Avoid any blocking on the channel by making it buffered, also so that | ||
// we don't have to wait on it in case the context is canceled | ||
ch = make(chan uint64, 1) | ||
h.pendingWeightCalc[id] = append(chs, ch) | ||
h.weightCalcLock.Unlock() | ||
|
||
// need to wait for the calculation which is already running to finish | ||
select { | ||
case <-ctx.Done(): | ||
return 0, ctx.Err() | ||
case totalWeight, ok := <-ch: | ||
if !ok { | ||
// Channel closed, fetch / calculation failed. | ||
// The actual error will be logged by the initiator of the | ||
// initial fetch / calculation, let's not make an | ||
// impression it happened multiple times and use a simpler | ||
// message | ||
return totalWeight, errors.New("error getting activeset weight") | ||
} | ||
return totalWeight, nil | ||
} | ||
} | ||
|
||
// mark calculation as running | ||
h.pendingWeightCalc[id] = nil | ||
h.weightCalcLock.Unlock() | ||
|
||
success := false | ||
defer func() { | ||
h.weightCalcLock.Lock() | ||
// this is guaranteed not to block b/c each channel is buffered | ||
for _, ch := range h.pendingWeightCalc[id] { | ||
if success { | ||
ch <- totalWeight | ||
} | ||
close(ch) | ||
} | ||
delete(h.pendingWeightCalc, id) | ||
h.weightCalcLock.Unlock() | ||
}() | ||
|
||
if err := h.fetcher.GetActiveSet(ctx, id); err != nil { | ||
return 0, err | ||
} | ||
set, err := activesets.Get(h.db, id) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if len(set.Set) == 0 { | ||
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject) | ||
} | ||
|
||
computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set) | ||
for i := range used { | ||
if !used[i] { | ||
return 0, fmt.Errorf( | ||
"missing atx %s in active set", | ||
set.Set[i].ShortString(), | ||
) | ||
} | ||
} | ||
totalWeight = computed | ||
h.activeSets.Add(id, totalWeight) | ||
success = true // totalWeight will be sent to the subscribers | ||
|
||
return totalWeight, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit late to the discussion but I feel like this could have been done simpler using the golang.org/x/sync/singleflight
package:
func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
result, err, _ := h.activeSetGroup.Do(id.String(), func() (any, error) {
totalWeight, exists := h.activeSets.Get(id)
if exists {
return totalWeight, nil
}
if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return uint64(0), err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return uint64(0), err
}
if len(set.Set) == 0 {
return uint64(0), fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
}
computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return uint64(0), fmt.Errorf("missing atx %s in active set", set.Set[i])
}
}
h.activeSets.Add(id, computed)
return computed, nil
})
if err != nil {
h.activeSetGroup.Forget(id.String())
}
return result.(uint64), err
}
No mutex or channels needed 🙂
Proposal handler may cause high CPU and memory usage. Backport of #5923.
…ler (#5923) (#5931) * Fix activeset weight calc performance in the proposal handler Proposal handler may cause high CPU and memory usage. Backport of #5923. * Fix typo --------- Co-authored-by: Matthias <[email protected]>
Motivation
Proposal handler may cause high CPU and memory usage.
Description
As per the suggestion in #5765, the first time an active set is encountered when processing a ballot, it starts to be processed (fetch / store / calculate weight), and the concurrent attempts to process the same active set subscribe to the results.
Fixes #5765
Test Plan
Verified on a mainnet node.
TODO