Skip to content

Commit

Permalink
Caplin: Improved synced data to be thread-safe and use only one beaco…
Browse files Browse the repository at this point in the history
…n state as buffer (#12519)

Reduces memory usage by 600MB too...

Reduces races in services and fixes a weird bug in bls execution service
  • Loading branch information
Giulio2002 authored Oct 31, 2024
1 parent b8b040d commit d4af6de
Show file tree
Hide file tree
Showing 34 changed files with 315 additions and 317 deletions.
23 changes: 12 additions & 11 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
return nil, err
}
defer tx.Rollback()
headState := a.syncedData.HeadState()
if headState == nil {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("beacon node is still syncing"),
)
}

committeeIndex, err := beaconhttp.Uint64FromQueryParams(r, "committee_index")
if err != nil {
Expand All @@ -160,6 +153,16 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
committeeIndex = &zero
}

headState, cn := a.syncedData.HeadState()
defer cn()

if headState == nil {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("beacon node is still syncing"),
)
}

attestationData, err := a.attestationProducer.ProduceAndCacheAttestationData(
tx,
headState,
Expand Down Expand Up @@ -229,15 +232,13 @@ func (a *ApiHandler) GetEthV3ValidatorBlock(
}
}

s := a.syncedData.HeadState()
if s == nil {
baseBlockRoot := a.syncedData.HeadRoot()
if baseBlockRoot == (libcommon.Hash{}) {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("node is syncing"),
)
}

baseBlockRoot := a.syncedData.HeadRoot()
sourceBlock, err := a.blockReader.ReadBlockByRoot(ctx, tx, baseBlockRoot)
if err != nil {
log.Warn("Failed to get source block", "err", err, "root", baseBlockRoot)
Expand Down
10 changes: 9 additions & 1 deletion cl/beacon/handler/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,15 @@ func (a *ApiHandler) GetEth1V1BuilderStatesExpectedWithdrawals(w http.ResponseWr
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("beacon node is syncing"))
}
if root == headRoot {
return newBeaconResponse(state.ExpectedWithdrawals(a.syncedData.HeadState(), state.Epoch(a.syncedData.HeadState()))).WithFinalized(false), nil
headState, cn := a.syncedData.HeadState()
defer cn()
if headState == nil {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("node is syncing"),
)
}
return newBeaconResponse(state.ExpectedWithdrawals(headState, state.Epoch(headState))).WithFinalized(false), nil
}
lookAhead := 1024
for currSlot := *slot + 1; currSlot < *slot+uint64(lookAhead); currSlot++ {
Expand Down
4 changes: 3 additions & 1 deletion cl/beacon/handler/committees.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea
}
resp := make([]*committeeResponse, 0, a.beaconChainCfg.SlotsPerEpoch*a.beaconChainCfg.MaxCommitteesPerSlot)
isFinalized := slot <= a.forkchoiceStore.FinalizedSlot()
s, cn := a.syncedData.HeadState()
defer cn()

if a.forkchoiceStore.LowestAvailableSlot() <= slot {
// non-finality case
s := a.syncedData.HeadState()
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("node is syncing"))
}
Expand Down
4 changes: 3 additions & 1 deletion cl/beacon/handler/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func defaultHarnessOpts(c harnessConfig) []beacontest.HarnessOption {
sm.OnHeadState(postState)
var s *state.CachingBeaconState
for s == nil {
s = sm.HeadState()
var cn func()
s, cn = sm.HeadState()
cn()
}
s.SetSlot(789274827847783)

Expand Down
3 changes: 2 additions & 1 deletion cl/beacon/handler/duties_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
if err != nil {
return nil, err
}
s := a.syncedData.HeadState()
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("node is syncing"))
}
Expand Down
18 changes: 6 additions & 12 deletions cl/beacon/handler/duties_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err)
}
s := a.syncedData.HeadState()
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("node is syncing"))
}
Expand Down Expand Up @@ -83,13 +84,6 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
With("dependent_root", dependentRoot), nil
}

// We need to compute our duties
state := a.syncedData.HeadState()
if state == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("beacon node is syncing"))

}

expectedSlot := epoch * a.beaconChainCfg.SlotsPerEpoch

duties := make([]proposerDuties, a.beaconChainCfg.SlotsPerEpoch)
Expand All @@ -100,7 +94,7 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) %
a.beaconChainCfg.EpochsPerHistoricalVector
// Input for the seed hash.
mix := state.GetRandaoMix(int(mixPosition))
mix := s.GetRandaoMix(int(mixPosition))
input := shuffling2.GetSeed(a.beaconChainCfg, mix, epoch, a.beaconChainCfg.DomainBeaconProposer)
slotByteArray := make([]byte, 8)
binary.LittleEndian.PutUint64(slotByteArray, slot)
Expand All @@ -113,7 +107,7 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
hash.Write(inputWithSlot)
seed := hash.Sum(nil)

indices := state.GetActiveValidatorsIndices(epoch)
indices := s.GetActiveValidatorsIndices(epoch)

// Write the seed to an array.
seedArray := [32]byte{}
Expand All @@ -123,12 +117,12 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
// Do it in parallel
go func(i, slot uint64, indicies []uint64, seedArray [32]byte) {
defer wg.Done()
proposerIndex, err := shuffling2.ComputeProposerIndex(state.BeaconState, indices, seedArray)
proposerIndex, err := shuffling2.ComputeProposerIndex(s.BeaconState, indices, seedArray)
if err != nil {
panic(err)
}
var pk libcommon.Bytes48
pk, err = state.ValidatorPublicKey(int(proposerIndex))
pk, err = s.ValidatorPublicKey(int(proposerIndex))
if err != nil {
panic(err)
}
Expand Down
35 changes: 18 additions & 17 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,23 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
return
}

headState := a.syncedData.HeadState()
if headState == nil {
beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w)
return
}
failures := []poolingFailure{}
for i, attestation := range req {
headState, cn := a.syncedData.HeadState()
defer cn()
if headState == nil {
beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w)
return
}
var (
slot = attestation.Data.Slot
epoch = a.ethClock.GetEpochAtSlot(slot)
attClVersion = a.beaconChainCfg.GetCurrentStateVersion(epoch)
cIndex = attestation.Data.CommitteeIndex
committeeCountPerSlot = headState.CommitteeCount(slot / a.beaconChainCfg.SlotsPerEpoch)
)

cn()
if attClVersion.AfterOrEqual(clparams.ElectraVersion) {
index, err := attestation.ElectraSingleCommitteeIndex()
if err != nil {
Expand Down Expand Up @@ -191,9 +194,9 @@ func (a *ApiHandler) PostEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r
}

func (a *ApiHandler) PostEthV1BeaconPoolAttesterSlashings(w http.ResponseWriter, r *http.Request) {
clversion := a.syncedData.HeadState().Version()
clVersion := a.beaconChainCfg.GetCurrentStateVersion(a.ethClock.GetCurrentEpoch())

req := cltypes.NewAttesterSlashing(clversion)
req := cltypes.NewAttesterSlashing(clVersion)
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -338,18 +341,21 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s := a.syncedData.HeadState()
if s == nil {
http.Error(w, "node is not synced", http.StatusServiceUnavailable)
return
}

failures := []poolingFailure{}
for idx, v := range msgs {
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
http.Error(w, "node is not synced", http.StatusServiceUnavailable)
return
}
publishingSubnets, err := subnets.ComputeSubnetsForSyncCommittee(s, v.ValidatorIndex)
if err != nil {
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
cn()
for _, subnet := range publishingSubnets {
if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation in syncCommittee service", "err", err)
Expand Down Expand Up @@ -392,11 +398,6 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s := a.syncedData.HeadState()
if s == nil {
http.Error(w, "node is not synced", http.StatusServiceUnavailable)
return
}
failures := []poolingFailure{}
var err error
for idx, v := range msgs {
Expand Down
11 changes: 6 additions & 5 deletions cl/beacon/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/beacon/synced_data"
sync_mock_services "github.com/erigontech/erigon/cl/beacon/synced_data/mock_services"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
Expand All @@ -49,7 +50,7 @@ func TestPoolAttesterSlashings(t *testing.T) {
_, _, _, _, _, handler, _, syncedDataMgr, _, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), false)
mockBeaconState := &state.CachingBeaconState{BeaconState: raw.New(&clparams.BeaconChainConfig{})}
mockBeaconState.SetVersion(clparams.DenebVersion)
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState).AnyTimes()
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState, synced_data.EmptyCancel).AnyTimes()

server := httptest.NewServer(handler.mux)
defer server.Close()
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestPoolProposerSlashings(t *testing.T) {
_, _, _, _, _, handler, _, syncedDataMgr, _, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), false)
mockBeaconState := &state.CachingBeaconState{BeaconState: raw.New(&clparams.BeaconChainConfig{})}
mockBeaconState.SetVersion(clparams.DenebVersion)
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState).AnyTimes()
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState, synced_data.EmptyCancel).AnyTimes()

server := httptest.NewServer(handler.mux)
defer server.Close()
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestPoolVoluntaryExits(t *testing.T) {
_, _, _, _, _, handler, _, syncedDataMgr, _, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), false)
mockBeaconState := &state.CachingBeaconState{BeaconState: raw.New(&clparams.BeaconChainConfig{})}
mockBeaconState.SetVersion(clparams.DenebVersion)
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState).AnyTimes()
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState, synced_data.EmptyCancel).AnyTimes()

server := httptest.NewServer(handler.mux)
defer server.Close()
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestPoolBlsToExecutionChainges(t *testing.T) {
_, _, _, _, _, handler, _, syncedDataMgr, _, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), false)
mockBeaconState := &state.CachingBeaconState{BeaconState: raw.New(&clparams.BeaconChainConfig{})}
mockBeaconState.SetVersion(clparams.DenebVersion)
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState).AnyTimes()
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState, synced_data.EmptyCancel).AnyTimes()

server := httptest.NewServer(handler.mux)
defer server.Close()
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestPoolAggregatesAndProofs(t *testing.T) {
_, _, _, _, _, handler, _, syncedDataMgr, _, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), false)
mockBeaconState := &state.CachingBeaconState{BeaconState: raw.New(&clparams.BeaconChainConfig{})}
mockBeaconState.SetVersion(clparams.DenebVersion)
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState).AnyTimes()
syncedDataMgr.(*sync_mock_services.MockSyncedData).EXPECT().HeadState().Return(mockBeaconState, synced_data.EmptyCancel).AnyTimes()

server := httptest.NewServer(handler.mux)
defer server.Close()
Expand Down
13 changes: 8 additions & 5 deletions cl/beacon/handler/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ func (a *ApiHandler) PostEthV1ValidatorSyncCommitteeSubscriptions(w http.Respons
w.WriteHeader(http.StatusOK)
return
}
headState := a.syncedData.HeadState()
if headState == nil {
http.Error(w, "head state not available", http.StatusServiceUnavailable)
return
}

var err error
// process each sub request
for _, subRequest := range req {
Expand All @@ -71,11 +67,18 @@ func (a *ApiHandler) PostEthV1ValidatorSyncCommitteeSubscriptions(w http.Respons
syncnets = append(syncnets, uint64(i))
}
} else {
headState, cn := a.syncedData.HeadState()
defer cn()
if headState == nil {
http.Error(w, "head state not available", http.StatusServiceUnavailable)
return
}
syncnets, err = subnets.ComputeSubnetsForSyncCommittee(headState, subRequest.ValidatorIndex)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
cn()
}

// subscribe to subnets
Expand Down
9 changes: 6 additions & 3 deletions cl/beacon/handler/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ func (a *ApiHandler) writeValidatorsResponse(
}

if blockId.Head() { // Lets see if we point to head, if yes then we need to look at the head state we always keep.
s := a.syncedData.HeadState()
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
http.Error(w, errors.New("node is not synced").Error(), http.StatusServiceUnavailable)
return
Expand Down Expand Up @@ -455,7 +456,8 @@ func (a *ApiHandler) GetEthV1BeaconStatesValidator(w http.ResponseWriter, r *htt
}

if blockId.Head() { // Lets see if we point to head, if yes then we need to look at the head state we always keep.
s := a.syncedData.HeadState()
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("node is not synced"))
}
Expand Down Expand Up @@ -576,7 +578,8 @@ func (a *ApiHandler) getValidatorBalances(ctx context.Context, w http.ResponseWr
isOptimistic := a.forkchoiceStore.IsRootOptimistic(blockRoot)

if blockId.Head() { // Lets see if we point to head, if yes then we need to look at the head state we always keep.
s := a.syncedData.HeadState()
s, cn := a.syncedData.HeadState()
defer cn()
if s == nil {
http.Error(w, errors.New("node is not synced").Error(), http.StatusServiceUnavailable)
return
Expand Down
7 changes: 4 additions & 3 deletions cl/beacon/synced_data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"github.com/erigontech/erigon/cl/phase1/core/state"
)

type CancelFn func()

//go:generate mockgen -typed=true -destination=./mock_services/synced_data_mock.go -package=mock_services . SyncedData
type SyncedData interface {
OnHeadState(newState *state.CachingBeaconState) error
HeadState() *state.CachingBeaconState
HeadStateReader() abstract.BeaconStateReader
HeadStateMutator() abstract.BeaconStateMutator
HeadState() (*state.CachingBeaconState, CancelFn)
HeadStateReader() (abstract.BeaconStateReader, CancelFn)
Syncing() bool
HeadSlot() uint64
HeadRoot() common.Hash
Expand Down
Loading

0 comments on commit d4af6de

Please sign in to comment.