Skip to content

Commit

Permalink
core, core/rawdb, ethdb: add transfer logs in freezer
Browse files Browse the repository at this point in the history
  • Loading branch information
CaraWang committed Sep 19, 2024
1 parent 7fca5f1 commit 6cb4bec
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 41 deletions.
30 changes: 18 additions & 12 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
exlru "github.com/hashicorp/golang-lru"
"golang.org/x/crypto/sha3"

"golang.org/x/exp/slices"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock"
Expand All @@ -56,7 +58,6 @@ import (
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -121,6 +122,8 @@ const (
maxBeyondBlocks = 2048
prefetchTxNumber = 100

transferLogsCacheLimit = 32

diffLayerFreezerRecheckInterval = 3 * time.Second
maxDiffForkDist = 11 // Maximum allowed backward distance from the chain head

Expand Down Expand Up @@ -295,6 +298,8 @@ type BlockChain struct {
// Cache for the blocks that failed to pass MPT root verification
badBlockCache *lru.Cache[common.Hash, time.Time]

transferLogsCache *lru.Cache[common.Hash, []*types.TransferLog]

// trusted diff layers
diffLayerCache *exlru.Cache // Cache for the diffLayers
diffLayerChanCache *exlru.Cache // Cache for the difflayer channel
Expand Down Expand Up @@ -383,6 +388,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
vmConfig: vmConfig,
diffQueue: prque.New[int64, *types.DiffLayer](nil),
diffQueueBuffer: make(chan *types.DiffLayer),

transferLogsCache: lru.NewCache[common.Hash, []*types.TransferLog](transferLogsCacheLimit),
}
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.forker = NewForkChoice(bc, shouldPreserve)
Expand Down Expand Up @@ -1128,6 +1135,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteBlobSidecars(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
rawdb.DeleteTransferLogs(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
Expand All @@ -1153,6 +1161,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.sidecarsCache.Purge()
bc.transferLogsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
Expand Down Expand Up @@ -1561,7 +1570,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if first.NumberU64() == 1 {
if frozen, _ := bc.db.BlockStore().Ancients(); frozen == 0 {
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db.BlockStore(), []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
tfLogs, err := rawdb.ReadTransferLogs(bc.db, bc.genesisBlock.Hash(), frozen)
if err != nil {
return 0, err
}
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td, tfLogs)
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
Expand All @@ -1579,7 +1592,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db.BlockStore(), blockChain, receiptChain, td)
writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td, nil)
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
Expand Down Expand Up @@ -1661,6 +1674,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
}
// We don't have transfer logs for fast sync blocks
rawdb.WriteMissingTransferLogs(batch, block.Hash(), block.NumberU64())

// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts)
Expand Down Expand Up @@ -3273,12 +3288,3 @@ func (bc *BlockChain) WriteTransferLogs(hash common.Hash, number uint64, transfe
defer bc.wg.Done()
rawdb.WriteTransferLogs(bc.db, hash, number, transferLogs)
}

// GetTransferLogs retrieves the transfer logs for all transactions in a given block.
func (bc *BlockChain) GetTransferLogs(hash common.Hash) []*types.TransferLog {
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil
}
return rawdb.ReadTransferLogs(bc.db, hash, *number)
}
14 changes: 14 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -534,3 +535,16 @@ func (bc *BlockChain) AncientTail() (uint64, error) {
}
return tail, nil
}

func (bc *BlockChain) GetTransferLogs(hash common.Hash) ([]*types.TransferLog, error) {
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil, ethereum.NotFound
}
transferLogs, err := rawdb.ReadTransferLogs(bc.db, hash, *number)
if err != nil {
return nil, err
}
bc.transferLogsCache.Add(hash, transferLogs)
return transferLogs, nil
}
1 change: 1 addition & 0 deletions core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo
rawdb.WriteTd(db.BlockStore(), block.Hash(), block.NumberU64(), block.Difficulty())
rawdb.WriteBlock(db.BlockStore(), block)
rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil)
rawdb.WriteTransferLogs(db.BlockStore(), block.Hash(), block.NumberU64(), nil)
rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash())
rawdb.WriteHeadFastBlockHash(db.BlockStore(), block.Hash())
Expand Down
98 changes: 80 additions & 18 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ import (
// Write and Delete operations should carefully ensure that the database being used is BlockStore.
// 3) Ancient Data: When using a multi-database, Ancient data will use the BlockStore.

var (
errNotFound = errors.New("not found")
errMissingTransferLogs = errors.New("missing transfer logs")
)

// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
var data []byte
Expand Down Expand Up @@ -799,38 +804,80 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64) [][]*types.Log {
return logs
}

// ReadTransferLogsRLP retrieves all the transfer logs belonging to a block in RLP encoding.
func ReadTransferLogsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(ChainFreezerTransferLogTable, number)
if len(data) > 0 {
h, _ := db.Ancient(ChainFreezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(blockTransferLogsKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(ChainFreezerTransferLogTable, number)
if len(data) > 0 {
h, _ := db.Ancient(ChainFreezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
return nil // Can't find the data anywhere.
}

// ReadTransferLogs retrieves all the transfer logs belonging to a block.
func ReadTransferLogs(db ethdb.KeyValueReader, hash common.Hash, number uint64) []*types.TransferLog {
func ReadTransferLogs(db ethdb.Reader, hash common.Hash, number uint64) ([]*types.TransferLog, error) {
// Retrieve the flattened transfer log slice
data, _ := db.Get(append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...))
data := ReadTransferLogsRLP(db, hash, number)
if len(data) == 0 {
return nil
return nil, errNotFound
}
transferLogs := []*types.TransferLog{}
if err := rlp.DecodeBytes(data, &transferLogs); err != nil {
log.Error("Invalid transfer log array RLP", "hash", hash, "err", err)
return nil
if string(data) == errMissingTransferLogs.Error() {
return nil, errMissingTransferLogs
}
log.Error("Invalid transfer log array RLP", "hash", hash, "number", number, "err", err)
return nil, err
}
return transferLogs
return transferLogs, nil
}

// WriteTransferLogs stores all the transfer logs belonging to a block.
func WriteTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64, transferLogs []*types.TransferLog) {
bytes, err := rlp.EncodeToBytes(transferLogs)
if err != nil {
log.Crit("Failed to encode block transfer logs", "err", err)
log.Crit("Failed to encode block transfer logs", "hash", hash, "number", number, "err", err)
}
// Store the flattened transfer log slice
if err := db.Put(blockTransferLogsKey(number, hash), bytes); err != nil {
log.Crit("Failed to store block transfer logs", "hash", hash, "number", number, "err", err)
}
}

// WriteMissingTransferLogs stores missing transfer logs message for a block.
func WriteMissingTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
bytes := []byte(errMissingTransferLogs.Error())
// Store the flattened transfer log slice
key := append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
if err := db.Put(key, bytes); err != nil {
log.Crit("Failed to store block transfer logs", "err", err)
if err := db.Put(blockTransferLogsKey(number, hash), bytes); err != nil {
log.Crit("Failed to store block transfer logs", "hash", hash, "number", number, "err", err)
}
}

// DeleteTransferLogs removes all transfer logs associated with a block hash.
func DeleteTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)); err != nil {
log.Crit("Failed to delete block transfer logs", "err", err)
if err := db.Delete(blockTransferLogsKey(number, hash)); err != nil {
log.Crit("Failed to delete block transfer logs", "hash", hash, "number", number, "err", err)
}
}

Expand Down Expand Up @@ -859,7 +906,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
}

// WriteAncientBlocksWithBlobs writes entire block data with blobs into ancient store and returns the total written size.
func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int, transferLogs []*types.TransferLog) (int64, error) {
// find cancun index, it's used for new added blob ancient table
cancunIndex := -1
for i, block := range blocks {
Expand All @@ -876,7 +923,7 @@ func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block,
err error
)
if cancunIndex > 0 {
preSize, err = WriteAncientBlocks(db, blocks[:cancunIndex], receipts[:cancunIndex], td)
preSize, err = WriteAncientBlocks(db, blocks[:cancunIndex], receipts[:cancunIndex], td, transferLogs)
if err != nil {
return preSize, err
}
Expand All @@ -896,12 +943,12 @@ func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block,
blocks = blocks[cancunIndex:]
receipts = receipts[cancunIndex:]
}
postSize, err := WriteAncientBlocks(db, blocks, receipts, tdSum)
postSize, err := WriteAncientBlocks(db, blocks, receipts, tdSum, transferLogs)
return preSize + postSize, err
}

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int, transferLogs []*types.TransferLog) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
Expand All @@ -917,7 +964,7 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
if err := writeAncientBlock(op, block, header, stReceipts, tdSum, transferLogs); err != nil {
return err
}
}
Expand Down Expand Up @@ -975,7 +1022,7 @@ func DeleteBlobSidecars(db ethdb.KeyValueWriter, hash common.Hash, number uint64
}
}

func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int, transferLogs []*types.TransferLog) error {
num := block.NumberU64()
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
Expand All @@ -997,6 +1044,21 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type
return fmt.Errorf("can't append block %d blobs: %v", num, err)
}
}
// Transfer logs might be nil when fast sync.
// To keep complete ancient table, we append the specific string to indicate nil transfer logs.
var transferLogBlob []byte
if transferLogs != nil {
var err error
transferLogBlob, err = rlp.EncodeToBytes(transferLogs)
if err != nil {
log.Crit("Failed to RLP encode block transfer logs", "err", err)
}
} else {
transferLogBlob = []byte(errMissingTransferLogs.Error())
}
if err := op.AppendRaw(ChainFreezerTransferLogTable, num, transferLogBlob); err != nil {
return fmt.Errorf("can't append block %d transfer logs: %v", num, err)
}
return nil
}

Expand Down
Loading

0 comments on commit 6cb4bec

Please sign in to comment.