Skip to content

Commit

Permalink
Fixed cannot alloc memory (#13375)
Browse files Browse the repository at this point in the history
Seems it happens with many allocs in ETL buffers
  • Loading branch information
Giulio2002 authored Jan 11, 2025
1 parent 92fb52c commit bec753d
Showing 1 changed file with 52 additions and 19 deletions.
71 changes: 52 additions & 19 deletions cl/antiquary/beacon_states_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"bytes"
"context"
"io"
"sync"

"github.com/c2h5oh/datasize"
"github.com/klauspost/compress/zstd"

libcommon "github.com/erigontech/erigon-lib/common"
Expand All @@ -36,7 +38,20 @@ import (
"github.com/erigontech/erigon/cl/transition/impl/eth2"
)

var stateAntiquaryBufSz = etl.BufferOptimalSize / 8 // 18 collectors * 256mb / 8 = 512mb in worst case
var stateAntiquaryBufSz = etl.BufferOptimalSize / 16 // 18 collectors * 256mb / 16 = 256mb in worst case

const EnabledPreAllocate = true

var etlBufferPool = &sync.Pool{
New: func() interface{} {
buf := etl.NewSortableBuffer(stateAntiquaryBufSz)
// preallocate 20_000 items with a 2MB overflow buffer
if EnabledPreAllocate {
buf.Prealloc(20_000, int(stateAntiquaryBufSz+2*datasize.MB))
}
return buf
},
}

// RATIONALE: MDBX locks the entire database when writing to it, so we need to minimize the time spent in the write lock.
// so instead of writing the historical states on write transactions, we accumulate them in memory and write them in a single write transaction.
Expand All @@ -63,6 +78,8 @@ type beaconStatesCollector struct {
balancesDumpsCollector *etl.Collector
effectiveBalancesDumpCollector *etl.Collector

buffers []etl.Buffer

buf *bytes.Buffer
compressor *zstd.Encoder

Expand All @@ -76,30 +93,40 @@ func newBeaconStatesCollector(beaconCfg *clparams.BeaconChainConfig, tmpdir stri
if err != nil {
panic(err)
}

var buffers []etl.Buffer
makeETLBuffer := func() etl.Buffer {
b := etlBufferPool.Get().(etl.Buffer)
b.Reset()
buffers = append(buffers, b)
return b
}

return &beaconStatesCollector{
effectiveBalanceCollector: etl.NewCollector(kv.ValidatorEffectiveBalance, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
balancesCollector: etl.NewCollector(kv.ValidatorBalance, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
randaoMixesCollector: etl.NewCollector(kv.RandaoMixes, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
intraRandaoMixesCollector: etl.NewCollector(kv.IntraRandaoMixes, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
proposersCollector: etl.NewCollector(kv.Proposers, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
slashingsCollector: etl.NewCollector(kv.ValidatorSlashings, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
blockRootsCollector: etl.NewCollector(kv.BlockRoot, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
stateRootsCollector: etl.NewCollector(kv.StateRoot, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
slotDataCollector: etl.NewCollector(kv.SlotData, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
epochDataCollector: etl.NewCollector(kv.EpochData, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
inactivityScoresCollector: etl.NewCollector(kv.InactivityScores, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
nextSyncCommitteeCollector: etl.NewCollector(kv.NextSyncCommittee, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
currentSyncCommitteeCollector: etl.NewCollector(kv.CurrentSyncCommittee, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
eth1DataVotesCollector: etl.NewCollector(kv.Eth1DataVotes, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
stateEventsCollector: etl.NewCollector(kv.StateEvents, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
activeValidatorIndiciesCollector: etl.NewCollector(kv.ActiveValidatorIndicies, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
balancesDumpsCollector: etl.NewCollector(kv.BalancesDump, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
effectiveBalancesDumpCollector: etl.NewCollector(kv.EffectiveBalancesDump, tmpdir, etl.NewSortableBuffer(stateAntiquaryBufSz), logger).LogLvl(log.LvlTrace),
effectiveBalanceCollector: etl.NewCollector(kv.ValidatorEffectiveBalance, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
balancesCollector: etl.NewCollector(kv.ValidatorBalance, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
randaoMixesCollector: etl.NewCollector(kv.RandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
intraRandaoMixesCollector: etl.NewCollector(kv.IntraRandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
proposersCollector: etl.NewCollector(kv.Proposers, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
slashingsCollector: etl.NewCollector(kv.ValidatorSlashings, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
blockRootsCollector: etl.NewCollector(kv.BlockRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
stateRootsCollector: etl.NewCollector(kv.StateRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
slotDataCollector: etl.NewCollector(kv.SlotData, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
epochDataCollector: etl.NewCollector(kv.EpochData, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
inactivityScoresCollector: etl.NewCollector(kv.InactivityScores, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
nextSyncCommitteeCollector: etl.NewCollector(kv.NextSyncCommittee, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
currentSyncCommitteeCollector: etl.NewCollector(kv.CurrentSyncCommittee, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
eth1DataVotesCollector: etl.NewCollector(kv.Eth1DataVotes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
stateEventsCollector: etl.NewCollector(kv.StateEvents, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
activeValidatorIndiciesCollector: etl.NewCollector(kv.ActiveValidatorIndicies, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
balancesDumpsCollector: etl.NewCollector(kv.BalancesDump, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
effectiveBalancesDumpCollector: etl.NewCollector(kv.EffectiveBalancesDump, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
logger: logger,
beaconCfg: beaconCfg,

buf: buf,
compressor: compressor,
buffers: buffers,
}
}

Expand Down Expand Up @@ -361,6 +388,12 @@ func (i *beaconStatesCollector) close() {
i.activeValidatorIndiciesCollector.Close()
i.balancesDumpsCollector.Close()
i.effectiveBalancesDumpCollector.Close()
for _, b := range i.buffers {
b.Reset()
}
for _, b := range i.buffers {
etlBufferPool.Put(b)
}
}

// antiquateFullUint64List goes on mdbx as it is full of common repeated patter always and thus fits with 16KB pages.
Expand Down

0 comments on commit bec753d

Please sign in to comment.