Skip to content

Commit

Permalink
add prune ancient feature (#862)
Browse files Browse the repository at this point in the history
* add prune ancient feature

* change notes and log for pr suggest

* change StableStateBloc to SafePointBlock and enhanced 'pruneancient' flag hints

* change pruneancient usage

* fix note misspelling

* fix set SafePointBlockNumber by mpt height replace current block number

Co-authored-by: user <[email protected]>
  • Loading branch information
joeylichang and actioncli authored Jun 29, 2022
1 parent ba120d0 commit d6b9bdb
Show file tree
Hide file tree
Showing 24 changed files with 563 additions and 134 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var (
utils.CachePreimagesFlag,
utils.PersistDiffFlag,
utils.DiffBlockFlag,
utils.PruneAncientDataFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
Expand Down
4 changes: 2 additions & 2 deletions cmd/geth/pruneblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func testOfflineBlockPruneWithAmountReserved(t *testing.T, amountReserved uint64
t.Fatalf("Failed to back up block: %v", err)
}

dbBack, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, newAncientPath, "", false, true, false)
dbBack, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, newAncientPath, "", false, true, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func testOfflineBlockPruneWithAmountReserved(t *testing.T, amountReserved uint64

func BlockchainCreator(t *testing.T, chaindbPath, AncientPath string, blockRemain uint64) (ethdb.Database, []*types.Block, []*types.Block, []types.Receipts, []*big.Int, uint64, *core.BlockChain) {
//create a database with ancient freezer
db, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, AncientPath, "", false, false, false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, AncientPath, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
Expand Down
13 changes: 12 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ var (
Usage: "The number of blocks should be persisted in db (default = 86400)",
Value: uint64(86400),
}
PruneAncientDataFlag = cli.BoolFlag{
Name: "pruneancient",
Usage: "Prune ancient data, recommends to the user who don't care about the ancient data. Note that once be turned on, the ancient data will not be recovered again",
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1621,6 +1625,13 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(DiffBlockFlag.Name) {
cfg.DiffBlock = ctx.GlobalUint64(DiffBlockFlag.Name)
}
if ctx.GlobalIsSet(PruneAncientDataFlag.Name) {
if cfg.SyncMode == downloader.FullSync {
cfg.PruneAncientData = ctx.GlobalBool(PruneAncientDataFlag.Name)
} else {
log.Crit("pruneancient parameter didn't take effect for current syncmode")
}
}
if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" {
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
Expand Down Expand Up @@ -1914,7 +1925,7 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFree
chainDb, err = stack.OpenDatabase(name, cache, handles, "", readonly)
} else {
name := "chaindata"
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly, disableFreeze, false)
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly, disableFreeze, false, false)
}
if err != nil {
Fatalf("Could not open database: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, int(bc.cacheConfig.TriesInMemory), head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
}
// write safe point block number
rawdb.WriteSafePointBlockNumber(bc.db, bc.CurrentBlock().NumberU64())
// do options before start any routine
for _, option := range options {
bc = option(bc)
Expand Down Expand Up @@ -543,6 +545,7 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}

// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
Expand Down Expand Up @@ -927,6 +930,7 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
// read from kvdb, has nothing to do with ancientdb type
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()

// Add the block to the canonical chain number scheme and mark as the head
Expand Down Expand Up @@ -1258,13 +1262,17 @@ func (bc *BlockChain) Stop() {
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, recent.NumberU64())
}
}
}
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, bc.CurrentBlock().NumberU64())
}
}
for !bc.triegc.Empty() {
Expand Down Expand Up @@ -1764,6 +1772,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
rawdb.WriteSafePointBlockNumber(bc.db, chosen)
lastWrite = chosen
bc.gcproc = 0
}
Expand Down Expand Up @@ -2354,6 +2363,9 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
for i := len(hashes) - 1; i >= 0; i-- {
// Append the next block to our batch
block := bc.GetBlock(hashes[i], numbers[i])
if block == nil {
log.Crit("Importing heavy sidechain block is nil", "hash", hashes[i], "number", numbers[i])
}

blocks = append(blocks, block)
memory += block.Size()
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)

db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
Expand Down Expand Up @@ -1832,7 +1832,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
db.Close()

// Start a new blockchain back up and see where the repait leads us
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false, false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)

db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
os.RemoveAll(datadir)

db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
db.Close()

// Start a new blockchain back up and see where the repair leads us
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false, false, false)
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false, false, false, false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}
Expand Down
18 changes: 9 additions & 9 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func TestFastVsFullChains(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -835,7 +835,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false)
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -1702,7 +1702,7 @@ func TestBlockchainRecovery(t *testing.T) {
}
defer os.Remove(frdir)

ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -1759,7 +1759,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -1958,7 +1958,7 @@ func testInsertKnownChainData(t *testing.T, typ string) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false)
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -2238,7 +2238,7 @@ func TestTransactionIndices(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -2266,7 +2266,7 @@ func TestTransactionIndices(t *testing.T) {
// Init block chain with external ancients, check all needed indices has been indexed.
limit := []uint64{0, 32, 64, 128}
for _, l := range limit {
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand All @@ -2286,7 +2286,7 @@ func TestTransactionIndices(t *testing.T) {
}

// Reconstruct a block chain which only reserves HEAD-64 tx indices
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down Expand Up @@ -2365,7 +2365,7 @@ func TestSkipStaleTxIndicesInFastSync(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWrit
headHeader = hc.GetHeader(headHash, headNumber)
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
// backtracking to ancientdb
if frozen, _ := hc.chainDb.Ancients(); frozen == headNumber {
break
}
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func TestAncientStorage(t *testing.T) {
}
defer os.Remove(frdir)

db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
Expand Down
87 changes: 87 additions & 0 deletions core/rawdb/accessors_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rawdb

import (
"encoding/json"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -27,6 +28,12 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)

// FreezerType enumerator
const (
EntireFreezerType uint64 = iota // classic ancient type
PruneFreezerType // prune ancient type
)

// ReadDatabaseVersion retrieves the version number of the database.
func ReadDatabaseVersion(db ethdb.KeyValueReader) *uint64 {
var version uint64
Expand Down Expand Up @@ -138,3 +145,83 @@ func PopUncleanShutdownMarker(db ethdb.KeyValueStore) {
log.Warn("Failed to clear unclean-shutdown marker", "err", err)
}
}

// ReadOffSetOfCurrentAncientFreezer return prune block start
func ReadOffSetOfCurrentAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfCurrentAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}

// WriteOffSetOfCurrentAncientFreezer write prune block start
func WriteOffSetOfCurrentAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfCurrentAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}

// ReadOffSetOfLastAncientFreezer return last prune block start
func ReadOffSetOfLastAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfLastAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}

// WriteOffSetOfLastAncientFreezer wirte before prune block start
func WriteOffSetOfLastAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfLastAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}

// ReadFrozenOfAncientFreezer return freezer block number
func ReadFrozenOfAncientFreezer(db ethdb.KeyValueReader) uint64 {
fozen, _ := db.Get(frozenOfAncientDBKey)
if fozen == nil {
return 0
}
return new(big.Int).SetBytes(fozen).Uint64()
}

// WriteFrozenOfAncientFreezer write freezer block number
func WriteFrozenOfAncientFreezer(db ethdb.KeyValueWriter, frozen uint64) {
if err := db.Put(frozenOfAncientDBKey, new(big.Int).SetUint64(frozen).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}

// ReadSafePointBlockNumber return the number of block that roothash save to disk
func ReadSafePointBlockNumber(db ethdb.KeyValueReader) uint64 {
num, _ := db.Get(LastSafePointBlockKey)
if num == nil {
return 0
}
return new(big.Int).SetBytes(num).Uint64()
}

// WriteSafePointBlockNumber write the number of block that roothash save to disk
func WriteSafePointBlockNumber(db ethdb.KeyValueWriter, number uint64) {
if err := db.Put(LastSafePointBlockKey, new(big.Int).SetUint64(number).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}

// ReadAncientType return freezer type
func ReadAncientType(db ethdb.KeyValueReader) uint64 {
data, _ := db.Get(pruneAncientKey)
if data == nil {
return EntireFreezerType
}
return new(big.Int).SetBytes(data).Uint64()
}

// WriteAncientType write freezer type
func WriteAncientType(db ethdb.KeyValueWriter, flag uint64) {
if err := db.Put(pruneAncientKey, new(big.Int).SetUint64(flag).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}
6 changes: 6 additions & 0 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
// signal received.
func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
// short circuit for invalid range
if offset := db.AncientOffSet(); offset > from {
from = offset
}
if from >= to {
return
}
Expand Down Expand Up @@ -272,6 +275,9 @@ func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, inte
// signal received.
func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
// short circuit for invalid range
if offset := db.AncientOffSet(); offset > from {
from = offset
}
if from >= to {
return
}
Expand Down
Loading

0 comments on commit d6b9bdb

Please sign in to comment.