Skip to content

Commit

Permalink
move domains/ii registration out of aggregator constructor (#13327)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeepdino008 authored Jan 8, 2025
1 parent cc41076 commit ecbe768
Show file tree
Hide file tree
Showing 22 changed files with 206 additions and 188 deletions.
2 changes: 1 addition & 1 deletion cmd/evm/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func runCmd(ctx *cli.Context) error {
} else {
genesisConfig = new(types.Genesis)
}
agg, err := state2.NewAggregator(context.Background(), datadir.New(os.TempDir()), config3.DefaultStepSize, db, log.New())
agg, err := state2.NewAggregator2(context.Background(), datadir.New(os.TempDir()), config3.DefaultStepSize, db, log.New())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/evm/staterunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func aggregateResultsFromStateTests(
MustOpen()
defer _db.Close()

agg, err := libstate.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, _db, log.New())
agg, err := libstate.NewAggregator2(context.Background(), dirs, config3.DefaultStepSize, _db, log.New())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
blockReader := freezeblocks.NewBlockReader(_allSnapshotsSingleton, _allBorSnapshotsSingleton, _heimdallStoreSingleton, _bridgeStoreSingleton)

txNums := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))
_aggSingleton, err = libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger)
_aggSingleton, err = libstate.NewAggregator2(ctx, dirs, config3.DefaultStepSize, db, logger)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, heimdallStore, bridgeStore)
txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))

agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.DefaultStepSize, rawDB, logger)
agg, err := libstate.NewAggregator2(ctx, cfg.Dirs, config3.DefaultStepSize, rawDB, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/opcode_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func OpcodeTracer(genesis *types.Genesis, blockNum uint64, chaindata string, num
rawChainDb := mdbx.MustOpen(dirs.Chaindata)
defer rawChainDb.Close()

agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawChainDb, log.New())
agg, err := state2.NewAggregator2(context.Background(), dirs, config3.DefaultStepSize, rawChainDb, log.New())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/genesis_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*ty
genesisTmpDB := mdbx.New(kv.TemporaryDB, logger).InMem(dirs.DataDir).MapSize(2 * datasize.GB).GrowthStep(1 * datasize.MB).MustOpen()
defer genesisTmpDB.Close()

agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, genesisTmpDB, logger)
agg, err := state2.NewAggregator2(context.Background(), dirs, config3.DefaultStepSize, genesisTmpDB, logger)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/state/intra_block_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (test *snapshotTest) run() bool {
db := memdb.NewStateDB("")
defer db.Close()

agg, err := stateLib.NewAggregator(context.Background(), datadir.New(""), 16, db, log.New())
agg, err := stateLib.NewAggregator2(context.Background(), datadir.New(""), 16, db, log.New())
if err != nil {
test.err = err
return false
Expand Down
4 changes: 2 additions & 2 deletions core/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *StateSuite) SetUpTest(c *checker.C) {
db := memdb.NewStateDB("")
defer db.Close()

agg, err := stateLib.NewAggregator(context.Background(), datadir.New(""), 16, db, log.New())
agg, err := stateLib.NewAggregator2(context.Background(), datadir.New(""), 16, db, log.New())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.TemporalRwDB, kv.TemporalRwTx, *state.
db := memdb.NewStateDB(tb.TempDir())
tb.Cleanup(db.Close)

agg, err := state.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
agg, err := state.NewAggregator2(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
if err != nil {
tb.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/test/domains_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func testDbAndAggregatorv3(t *testing.T, fpath string, aggStep uint64) (kv.RwDB,
db := mdbx.New(kv.ChainDB, logger).Path(dirs.Chaindata).MustOpen()
t.Cleanup(db.Close)

agg, err := state.NewAggregator(context.Background(), dirs, aggStep, db, logger)
agg, err := state.NewAggregator2(context.Background(), dirs, aggStep, db, logger)
require.NoError(t, err)
t.Cleanup(agg.Close)
err = agg.OpenFolder()
Expand Down
2 changes: 1 addition & 1 deletion core/vm/gas_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func testTemporalDB(t *testing.T) *temporal.DB {

t.Cleanup(db.Close)

agg, err := state3.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New())
agg, err := state3.NewAggregator2(context.Background(), datadir.New(t.TempDir()), 16, db, log.New())
require.NoError(t, err)
t.Cleanup(agg.Close)

Expand Down
4 changes: 2 additions & 2 deletions core/vm/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func Execute(code, input []byte, cfg *Config, tempdir string) ([]byte, *state.In
if !externalState {
db := memdb.NewStateDB(tempdir)
defer db.Close()
agg, err := state3.NewAggregator(context.Background(), datadir.New(tempdir), config3.DefaultStepSize, db, log.New())
agg, err := state3.NewAggregator2(context.Background(), datadir.New(tempdir), config3.DefaultStepSize, db, log.New())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func Create(input []byte, cfg *Config, blockNr uint64) ([]byte, libcommon.Addres

db := memdb.NewStateDB(tmp)
defer db.Close()
agg, err := state3.NewAggregator(context.Background(), datadir.New(tmp), config3.DefaultStepSize, db, log.New())
agg, err := state3.NewAggregator2(context.Background(), datadir.New(tmp), config3.DefaultStepSize, db, log.New())
if err != nil {
return nil, [20]byte{}, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/vm/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.RwDB, kv.RwTx, *stateLib.Aggregator) {
db := memdb.NewStateDB(tb.TempDir())
tb.Cleanup(db.Close)

agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
agg, err := stateLib.NewAggregator2(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
if err != nil {
tb.Fatal(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func testTemporalDB(t testing.TB) *temporal.DB {

t.Cleanup(db.Close)

agg, err := stateLib.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New())
agg, err := stateLib.NewAggregator2(context.Background(), datadir.New(t.TempDir()), 16, db, log.New())
require.NoError(t, err)
t.Cleanup(agg.Close)

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/kv/membatchwithdb/memory_mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.RwDB, kv.RwTx, *stateLib.Aggregator) {
db := memdb.NewStateDB(tb.TempDir())
tb.Cleanup(db.Close)

agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
agg, err := stateLib.NewAggregator2(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New())
if err != nil {
tb.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/kv/temporal/temporaltest/kv_temporal_testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewTestDB(tb testing.TB, dirs datadir.Dirs) (kv.TemporalRwDB, *state.Aggreg
rawDB = memdb.New(dirs.DataDir, kv.ChainDB)
}

agg, err := state.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawDB, log.New())
agg, err := state.NewAggregator2(context.Background(), dirs, config3.DefaultStepSize, rawDB, log.New())
if err != nil {
panic(err)
}
Expand Down
165 changes: 3 additions & 162 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,141 +126,14 @@ func domainIntegrityCheck(name kv.Domain, dirs datadir.Dirs, fromStep, toStep ui
}
}

var dbgCommBtIndex = dbg.EnvBool("AGG_COMMITMENT_BT", false)

func init() {
if dbgCommBtIndex {
cfg := Schema[kv.CommitmentDomain]
cfg.IndexList = AccessorBTree | AccessorExistence
Schema[kv.CommitmentDomain] = cfg
}
}

var Schema = map[kv.Domain]domainCfg{
kv.AccountsDomain: {
name: kv.AccountsDomain, valuesTable: kv.TblAccountVals,

IndexList: AccessorBTree | AccessorExistence,
crossDomainIntegrity: domainIntegrityCheck,
compression: seg.CompressNone,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblAccountHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant

iiCfg: iiCfg{
keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant
},
},
},
kv.StorageDomain: {
name: kv.StorageDomain, valuesTable: kv.TblStorageVals,

IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblStorageHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.StorageDomain.String(),

iiCfg: iiCfg{
keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.StorageDomain.String(),
},
},
},
kv.CodeDomain: {
name: kv.CodeDomain, valuesTable: kv.TblCodeVals,

IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
compressCfg: DomainCompressCfg,
largeValues: true,

hist: histCfg{
valuesTable: kv.TblCodeHistoryVals,
compression: seg.CompressKeys | seg.CompressVals,

historyLargeValues: true,
filenameBase: kv.CodeDomain.String(),

iiCfg: iiCfg{
withExistence: false, compressorCfg: seg.DefaultCfg,
keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx,
filenameBase: kv.CodeDomain.String(),
},
},
},
kv.CommitmentDomain: {
name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals,

IndexList: AccessorHashMap,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblCommitmentHistoryVals,
compression: seg.CompressNone,

snapshotsDisabled: true,
historyLargeValues: false,
filenameBase: kv.CommitmentDomain.String(),

iiCfg: iiCfg{
keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.CommitmentDomain.String(),
},
},
},
kv.ReceiptDomain: {
name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals,

IndexList: AccessorBTree | AccessorExistence,
compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblReceiptHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.ReceiptDomain.String(),

iiCfg: iiCfg{
keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.ReceiptDomain.String(),
},
},
},
}

func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) {
tmpdir := dirs.Tmp
salt, err := getStateIndicesSalt(dirs.Snap)
if err != nil {
return nil, err
}

ctx, ctxCancel := context.WithCancel(ctx)
a := &Aggregator{
return &Aggregator{
ctx: ctx,
ctxCancel: ctxCancel,
onFreeze: func(frozenFileNames []string) {},
dirs: dirs,
tmpdir: tmpdir,
tmpdir: dirs.Tmp,
aggregationStep: aggregationStep,
db: db,
leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()),
Expand All @@ -272,39 +145,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

produce: true,
}

if err := a.registerDomain(kv.AccountsDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.StorageDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.CodeDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.CommitmentDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.ReceiptDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesToIdxPos, salt, dirs, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil {
return nil, err
}
a.KeepRecentTxnsOfHistoriesWithDisabledSnapshots(100_000) // ~1k blocks of history
a.recalcVisibleFiles(a.DirtyFilesEndTxNumMinimax())

return a, nil
}, nil
}

// getStateIndicesSalt - try read salt for all indices from DB. Or fall-back to new salt creation.
Expand Down
Loading

0 comments on commit ecbe768

Please sign in to comment.