Skip to content

Commit

Permalink
run E4 via integration binary (#7063)
Browse files Browse the repository at this point in the history
added subcommand `state_domains` of `bin/integration` to start
processing of existing chaindata with use of state domains and
commitment.
It creates two directory in `datadir`: `state` and `statedb` for files
and mdbx respectively.

This version runs blocks one after another and produces merged files. 
Want to add some metrics to export later.
  • Loading branch information
awskii authored Mar 10, 2023
1 parent 7f6d1c9 commit 36e3c94
Show file tree
Hide file tree
Showing 5 changed files with 818 additions and 5 deletions.
23 changes: 22 additions & 1 deletion cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package commands

import (
"github.com/ledgerwatch/erigon/turbo/cli"
"github.com/spf13/cobra"

"github.com/ledgerwatch/erigon/turbo/cli"

"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/eth/ethconfig"
)
Expand Down Expand Up @@ -31,6 +32,12 @@ var (
experiments []string
chain string // Which chain to use (mainnet, rinkeby, goerli, etc.)

commitmentMode string
commitmentTrie string
commitmentFreq int
startTxNum uint64
traceFromTx uint64

_forceSetHistoryV3 bool
workers, reconWorkers uint64
)
Expand Down Expand Up @@ -142,3 +149,17 @@ func withWorkers(cmd *cobra.Command) {
cmd.Flags().Uint64Var(&workers, "exec.workers", uint64(ethconfig.Defaults.Sync.ExecWorkerCount), "")
cmd.Flags().Uint64Var(&reconWorkers, "recon.workers", uint64(ethconfig.Defaults.Sync.ReconWorkerCount), "")
}

func withStartTx(cmd *cobra.Command) {
cmd.Flags().Uint64Var(&startTxNum, "startTx", 0, "start processing from tx")
}

func withTraceFromTx(cmd *cobra.Command) {
cmd.Flags().Uint64Var(&traceFromTx, "txtrace.from", 0, "start tracing from tx number")
}

func withCommitment(cmd *cobra.Command) {
cmd.Flags().StringVar(&commitmentMode, "commitment.mode", "direct", "defines the way to calculate commitments: 'direct' mode reads from state directly, 'update' accumulate updates before commitment, 'off' actually disables commitment calculation")
cmd.Flags().StringVar(&commitmentTrie, "commitment.trie", "hex", "hex - use Hex Patricia Hashed Trie for commitments, bin - use of binary patricia trie")
cmd.Flags().IntVar(&commitmentFreq, "commitment.freq", 25000, "how many blocks to skip between calculating commitment")
}
9 changes: 5 additions & 4 deletions cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/kv"
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/erigon/turbo/debug"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/torquem-ch/mdbx-go/mdbx"
"golang.org/x/sync/semaphore"

"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/erigon/turbo/debug"
"github.com/ledgerwatch/erigon/turbo/logging"
)

var rootCmd = &cobra.Command{
Expand Down
85 changes: 85 additions & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"time"

"github.com/c2h5oh/datasize"
chain2 "github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/commitment"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand Down Expand Up @@ -481,6 +483,8 @@ func init() {
cmdForceSetHistoryV3.Flags().BoolVar(&_forceSetHistoryV3, "history.v3", false, "")
rootCmd.AddCommand(cmdForceSetHistoryV3)

rootCmd.AddCommand(cmdForceSetHistoryV3)

withDataDir(cmdSetPrune)
withChain(cmdSetPrune)
cmdSetPrune.Flags().StringVar(&pruneFlag, "prune", "hrtc", "")
Expand Down Expand Up @@ -1227,6 +1231,87 @@ func getBlockReader(db kv.RoDB) (blockReader services.FullBlockReader) {
return _blockReaderSingleton
}

var openDomainsOnce sync.Once
var _aggDomainSingleton *libstate.Aggregator

func allDomains(ctx context.Context, db kv.RoDB, mode libstate.CommitmentMode, trie commitment.TrieVariant) (*snapshotsync.RoSnapshots, *libstate.Aggregator) {
openDomainsOnce.Do(func() {
var useSnapshots bool
_ = db.View(context.Background(), func(tx kv.Tx) error {
useSnapshots, _ = snap.Enabled(tx)
return nil
})
dirs := datadir.New(datadirCli)
dir.MustExist(dirs.SnapHistory)

snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true)
_allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapCfg, dirs.Snap)

var err error
_aggDomainSingleton, err = libstate.NewAggregator(filepath.Join(dirs.DataDir, "state"), dirs.Tmp, ethconfig.HistoryV3AggregationStep, mode, trie)
if err != nil {
panic(err)
}
if err = _aggDomainSingleton.ReopenFolder(); err != nil {
panic(err)
}

if useSnapshots {
if err := _allSnapshotsSingleton.ReopenFolder(); err != nil {
panic(err)
}
_allSnapshotsSingleton.LogStat()
//db.View(context.Background(), func(tx kv.Tx) error {
// _aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
// _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
// return histBlockNumProgress
// })
// return nil
//})
}
})
return _allSnapshotsSingleton, _aggDomainSingleton
}

func newDomains(ctx context.Context, db kv.RwDB, mode libstate.CommitmentMode, trie commitment.TrieVariant) (consensus.Engine, ethconfig.Config, *snapshotsync.RoSnapshots, *libstate.Aggregator) {
historyV3, pm := kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
//events := shards.NewEvents()
genesis := core.DefaultGenesisBlockByChainName(chain)

chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "")
_ = genesisBlock // TODO apply if needed here

if _, ok := genesisErr.(*chain2.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
}
//log.Info("Initialised chain configuration", "config", chainConfig)

// Apply special hacks for BSC params
if chainConfig.Parlia != nil {
params.ApplyBinanceSmartChainParams()
}

var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

cfg := ethconfig.Defaults
cfg.HistoryV3 = historyV3
cfg.Prune = pm
cfg.BatchSize = batchSize
cfg.DeprecatedTxPool.Disable = true
cfg.Genesis = core.DefaultGenesisBlockByChainName(chain)
//if miningConfig != nil {
// cfg.Miner = *miningConfig
//}
cfg.Dirs = datadir.New(datadirCli)

allSn, agg := allDomains(ctx, db, mode, trie)
cfg.Snapshot = allSn.Cfg()

engine := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db)
return engine, cfg, allSn, agg
}

func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) (consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
dirs, historyV3, pm := datadir.New(datadirCli), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)

Expand Down
Loading

0 comments on commit 36e3c94

Please sign in to comment.