Skip to content

Commit

Permalink
[AV-1682] State sync server side snapshot optimization (#784)
Browse files Browse the repository at this point in the history
* add account to LeafsRequest

* add snapshot iterators

* step 1: read from the snapshot

* refactor: move FillAccounts to trie package

* add UT

* step 2: partial data from the snapshot

* tests: add snapshot cleanup

* bugfixes and nits

* pr comments: iterator key len and comments

* pr comments: test cleanup and renaming

* leafs_request: adding comments and reordering args

* leafs_request: fix grammar in comment

* refactor: move IncrOne to utils

* leafs_request: refactor and comment improvements

* add storage trie test

* test: move cleanup to test body

* add "done" check to prefixedIterator

* Readability improvements via builder struct

* iterator: add value

* bugfix (on ctx expiry)

* nits

* remove root check optimization

* add stat

* refactor: rb.readLeafsFromSnapshot

* Improve readability via a builder struct (#793)

* test: move cleanup to test body

* add "done" check to prefixedIterator

* Readability improvements via builder struct

* iterator: add value

* bugfix (on ctx expiry)

* nits

* remove root check optimization

* add stat

* refactor: rb.readLeafsFromSnapshot

* nits and more comments

* typo

* test: add assertRangeProofIsValid

* cleaner buffer allocation

* use sync pool for response allocations

* use snapshot.Tree to get iterators to the snapshot (#794)

* use snapshot.Tree to get iterators to the snapshot

* iterator wrappers to clean up code duplication

* minor simplification

* fix iterator Error call

* fix storage iterator creation to be thread safe

* use sync pool for snapKeys/snapVals

* handle late init. of sync'ed client snapshot

* pass proof around explicitly (#795)

* pass proof around explicitly

* close memdb on err paths

* bump min sync version (we changed LeafsRequest)

* Revert "use sync pool for snapKeys/snapVals"

This reverts commit f73138e0d36087962ae30a0b2c0b6a80f54e3a04.

* clear out slices before returning them to pool

* switch to using interface over passing Fn (#796)
  • Loading branch information
darioush authored Jun 3, 2022
1 parent 7b28739 commit 36b069f
Show file tree
Hide file tree
Showing 24 changed files with 991 additions and 328 deletions.
8 changes: 7 additions & 1 deletion core/state/snapshot/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,16 @@ type diskStorageIterator struct {
// is always false.
func (dl *diskLayer) StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool) {
pos := common.TrimRightZeroes(seek[:])

// create prefix to be rawdb.SnapshotStoragePrefix + account[:]
prefix := make([]byte, len(rawdb.SnapshotStoragePrefix)+common.HashLength)
copy(prefix, rawdb.SnapshotStoragePrefix)
copy(prefix[len(rawdb.SnapshotStoragePrefix):], account[:])

return &diskStorageIterator{
layer: dl,
account: account,
it: dl.diskdb.NewIterator(append(rawdb.SnapshotStoragePrefix, account.Bytes()...), pos),
it: dl.diskdb.NewIterator(prefix, pos),
}, false
}

Expand Down
15 changes: 15 additions & 0 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,21 @@ func (t *Tree) DiskRoot() common.Hash {
return t.diskRoot()
}

func (t *Tree) DiskAccountIterator(seek common.Hash) AccountIterator {
t.lock.Lock()
defer t.lock.Unlock()

return t.disklayer().AccountIterator(seek)
}

func (t *Tree) DiskStorageIterator(account common.Hash, seek common.Hash) StorageIterator {
t.lock.Lock()
defer t.lock.Unlock()

it, _ := t.disklayer().StorageIterator(account, seek)
return it
}

// NewTestTree creates a *Tree with a pre-populated diskLayer
func NewTestTree(diskdb ethdb.KeyValueStore, blockHash, root common.Hash) *Tree {
base := &diskLayer{
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/atomic_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *trie.Database, targetHeight ui
numLeaves := 0
mockClient := syncclient.NewMockClient(
message.Codec,
handlers.NewLeafsRequestHandler(serverTrieDB, message.Codec, handlerstats.NewNoopHandlerStats()),
handlers.NewLeafsRequestHandler(serverTrieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()),
nil,
nil,
)
Expand Down
5 changes: 3 additions & 2 deletions plugin/evm/message/leafs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (nt NodeType) String() string {
// NodeType outlines which trie to read from state/atomic.
type LeafsRequest struct {
Root common.Hash `serialize:"true"`
Account common.Hash `serialize:"true"`
Start []byte `serialize:"true"`
End []byte `serialize:"true"`
Limit uint16 `serialize:"true"`
Expand All @@ -50,8 +51,8 @@ type LeafsRequest struct {

func (l LeafsRequest) String() string {
return fmt.Sprintf(
"LeafsRequest(Root=%s, Start=%s, End=%s, Limit=%d, NodeType=%s)",
l.Root, common.Bytes2Hex(l.Start), common.Bytes2Hex(l.End), l.Limit, l.NodeType,
"LeafsRequest(Root=%s, Account=%s, Start=%s, End=%s, Limit=%d, NodeType=%s)",
l.Root, l.Account, common.Bytes2Hex(l.Start), common.Bytes2Hex(l.End), l.Limit, l.NodeType,
)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/message/leafs_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMarshalLeafsRequest(t *testing.T) {
NodeType: StateTrieNode,
}

base64LeafsRequest := "AAAAAAAAAAAAAAAAAAAAAABpbSBST09UaW5nIGZvciB5YQAAACBS/fwHIYJlTxY/Xw+aYh1ylWbHTRADfE17uwQH0eLGSQAAACCBhVrYaB0NhtHpHgAWeTnLZpTSxCKs0gigByk5SH9pmQQAAQ=="
base64LeafsRequest := "AAAAAAAAAAAAAAAAAAAAAABpbSBST09UaW5nIGZvciB5YQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIFL9/AchgmVPFj9fD5piHXKVZsdNEAN8TXu7BAfR4sZJAAAAIIGFWthoHQ2G0ekeABZ5OctmlNLEIqzSCKAHKTlIf2mZBAAB"

leafsRequestBytes, err := Codec.Marshal(Version, leafsRequest)
assert.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,13 @@ func (vm *VM) setAppRequestHandlers() {
Cache: vm.config.StateSyncServerTrieCache,
},
)
syncRequestHandler := handlers.NewSyncHandler(vm.chain.BlockChain().GetBlock, evmTrieDB, vm.atomicTrie.TrieDB(), vm.networkCodec, handlerstats.NewHandlerStats(metrics.Enabled))
syncRequestHandler := handlers.NewSyncHandler(
vm.chain.BlockChain(),
evmTrieDB,
vm.atomicTrie.TrieDB(),
vm.networkCodec,
handlerstats.NewHandlerStats(metrics.Enabled),
)
vm.Network.SetRequestHandler(syncRequestHandler)
}

Expand Down
2 changes: 1 addition & 1 deletion sync/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

var (
StateSyncVersion = version.NewDefaultApplication(constants.PlatformName, 1, 7, 11)
StateSyncVersion = version.NewDefaultApplication(constants.PlatformName, 1, 7, 12)
errEmptyResponse = errors.New("empty response")
errTooManyBlocks = errors.New("response contains more blocks than requested")
errHashMismatch = errors.New("hash does not match expected value")
Expand Down
41 changes: 16 additions & 25 deletions sync/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,16 @@ func TestGetBlocks(t *testing.T) {
}
}

func buildGetter(blocks []*types.Block) func(hash common.Hash, height uint64) *types.Block {
return func(blockHash common.Hash, blockHeight uint64) *types.Block {
requestedBlock := blocks[blockHeight]
if requestedBlock.Hash() != blockHash {
fmt.Printf("ERROR height=%d, hash=%s, parentHash=%s, reqHash=%s\n", blockHeight, blockHash, requestedBlock.ParentHash(), requestedBlock.Hash())
return nil
}

return requestedBlock
func buildGetter(blocks []*types.Block) handlers.BlockProvider {
return &handlers.TestBlockProvider{
GetBlockFn: func(blockHash common.Hash, blockHeight uint64) *types.Block {
requestedBlock := blocks[blockHeight]
if requestedBlock.Hash() != blockHash {
fmt.Printf("ERROR height=%d, hash=%s, parentHash=%s, reqHash=%s\n", blockHeight, blockHash, requestedBlock.ParentHash(), requestedBlock.Hash())
return nil
}
return requestedBlock
},
}
}

Expand All @@ -396,7 +397,7 @@ func TestGetLeafs(t *testing.T) {
largeTrieRoot, largeTrieKeys, _ := trie.GenerateTrie(t, trieDB, 100_000, common.HashLength)
smallTrieRoot, _, _ := trie.GenerateTrie(t, trieDB, leafsLimit, common.HashLength)

handler := handlers.NewLeafsRequestHandler(trieDB, message.Codec, handlerstats.NewNoopHandlerStats())
handler := handlers.NewLeafsRequestHandler(trieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats())
client := NewClient(&ClientConfig{
NetworkClient: &mockNetwork{},
Codec: message.Codec,
Expand Down Expand Up @@ -611,21 +612,11 @@ func TestGetLeafs(t *testing.T) {
if _, err := message.Codec.Unmarshal(response, &leafResponse); err != nil {
t.Fatal(err)
}
leafResponse.Keys = leafResponse.Keys[1:]
leafResponse.Vals = leafResponse.Vals[1:]

tr, err := trie.New(largeTrieRoot, trieDB)
if err != nil {
t.Fatal(err)
}
leafResponse.ProofKeys, leafResponse.ProofVals, err = handlers.GenerateRangeProof(tr, leafResponse.Keys[0], leafResponse.Keys[len(leafResponse.Keys)-1])
if err != nil {
t.Fatal(err)
}

modifiedResponse, err := message.Codec.Marshal(message.Version, leafResponse)
modifiedRequest := request
modifiedRequest.Start = leafResponse.Keys[1]
modifiedResponse, err := handler.OnLeafsRequest(context.Background(), ids.GenerateTestNodeID(), 2, modifiedRequest)
if err != nil {
t.Fatal(err)
t.Fatal("unexpected error in calling leafs request handler", err)
}
return modifiedResponse
},
Expand Down Expand Up @@ -791,7 +782,7 @@ func TestGetLeafsRetries(t *testing.T) {
trieDB := trie.NewDatabase(memorydb.New())
root, _, _ := trie.GenerateTrie(t, trieDB, 100_000, common.HashLength)

handler := handlers.NewLeafsRequestHandler(trieDB, message.Codec, handlerstats.NewNoopHandlerStats())
handler := handlers.NewLeafsRequestHandler(trieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats())
mockNetClient := &mockNetwork{}

const maxAttempts = 8
Expand Down
19 changes: 4 additions & 15 deletions sync/client/leaf_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/ava-labs/coreth/plugin/evm/message"
"github.com/ava-labs/coreth/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -40,6 +41,7 @@ type OnSyncFailure func(error) error
// LeafSyncTask represents a complete task to be completed by the leaf syncer.
type LeafSyncTask struct {
Root common.Hash // Root of the trie to sync
Account common.Hash // Account hash of the trie to sync (only applicable to storage tries)
Start []byte // Starting key to request new leaves
NodeType message.NodeType // Specifies the message type (atomic/state trie) for the leaf syncer to send
OnStart OnStart // Callback when tasks begins, returns true if work can be skipped
Expand Down Expand Up @@ -115,6 +117,7 @@ func (c *CallbackLeafSyncer) syncTask(ctx context.Context, task *LeafSyncTask) e

leafsResponse, err := c.client.GetLeafs(message.LeafsRequest{
Root: root,
Account: task.Account,
Start: start,
End: nil, // will request until the end of the trie
Limit: defaultLeafRequestLimit,
Expand Down Expand Up @@ -147,7 +150,7 @@ func (c *CallbackLeafSyncer) syncTask(ctx context.Context, task *LeafSyncTask) e
// Update start to be one bit past the last returned key for the next request.
// Note: since more was true, this cannot cause an overflow.
start = leafsResponse.Keys[len(leafsResponse.Keys)-1]
IncrOne(start)
utils.IncrOne(start)
}
}

Expand Down Expand Up @@ -222,17 +225,3 @@ func (c *CallbackLeafSyncer) addTasks(ctx context.Context, tasks []*LeafSyncTask
}
return nil
}

// IncrOne increments bytes value by one
func IncrOne(bytes []byte) {
index := len(bytes) - 1
for index >= 0 {
if bytes[index] < 255 {
bytes[index]++
break
} else {
bytes[index] = 0
index--
}
}
}
19 changes: 9 additions & 10 deletions sync/handlers/block_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"

"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/peer"
"github.com/ava-labs/coreth/plugin/evm/message"
"github.com/ava-labs/coreth/sync/handlers/stats"
Expand All @@ -26,17 +25,17 @@ const parentLimit = uint16(64)
// BlockRequestHandler is a peer.RequestHandler for message.BlockRequest
// serving requested blocks starting at specified hash
type BlockRequestHandler struct {
stats stats.BlockRequestHandlerStats
network peer.Network
getter func(common.Hash, uint64) *types.Block
codec codec.Manager
stats stats.BlockRequestHandlerStats
network peer.Network
blockProvider BlockProvider
codec codec.Manager
}

func NewBlockRequestHandler(getter func(common.Hash, uint64) *types.Block, codec codec.Manager, handlerStats stats.BlockRequestHandlerStats) *BlockRequestHandler {
func NewBlockRequestHandler(blockProvider BlockProvider, codec codec.Manager, handlerStats stats.BlockRequestHandlerStats) *BlockRequestHandler {
return &BlockRequestHandler{
getter: getter,
codec: codec,
stats: handlerStats,
blockProvider: blockProvider,
codec: codec,
stats: handlerStats,
}
}

Expand Down Expand Up @@ -75,7 +74,7 @@ func (b *BlockRequestHandler) OnBlockRequest(ctx context.Context, nodeID ids.Nod
break
}

block := b.getter(hash, height)
block := b.blockProvider.GetBlock(hash, height)
if block == nil {
b.stats.IncMissingBlockHash()
break
Expand Down
44 changes: 25 additions & 19 deletions sync/handlers/block_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ func TestBlockRequestHandler(t *testing.T) {
}

mockHandlerStats := &stats.MockHandlerStats{}
blockRequestHandler := NewBlockRequestHandler(func(hash common.Hash, height uint64) *types.Block {
blk, ok := blocksDB[hash]
if !ok || blk.NumberU64() != height {
return nil
}
return blk
}, message.Codec, mockHandlerStats)
blockProvider := &TestBlockProvider{
GetBlockFn: func(hash common.Hash, height uint64) *types.Block {
blk, ok := blocksDB[hash]
if !ok || blk.NumberU64() != height {
return nil
}
return blk
},
}
blockRequestHandler := NewBlockRequestHandler(blockProvider, message.Codec, mockHandlerStats)

tests := []struct {
name string
Expand Down Expand Up @@ -163,18 +166,21 @@ func TestBlockRequestHandlerCtxExpires(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
blockRequestCallCount := 0
blockRequestHandler := NewBlockRequestHandler(func(hash common.Hash, height uint64) *types.Block {
blockRequestCallCount++
// cancel ctx after the 2nd call to simulate ctx expiring due to deadline exceeding
if blockRequestCallCount >= cancelAfterNumRequests {
cancel()
}
blk, ok := blocksDB[hash]
if !ok || blk.NumberU64() != height {
return nil
}
return blk
}, message.Codec, stats.NewNoopHandlerStats())
blockProvider := &TestBlockProvider{
GetBlockFn: func(hash common.Hash, height uint64) *types.Block {
blockRequestCallCount++
// cancel ctx after the 2nd call to simulate ctx expiring due to deadline exceeding
if blockRequestCallCount >= cancelAfterNumRequests {
cancel()
}
blk, ok := blocksDB[hash]
if !ok || blk.NumberU64() != height {
return nil
}
return blk
},
}
blockRequestHandler := NewBlockRequestHandler(blockProvider, message.Codec, stats.NewNoopHandlerStats())

responseBytes, err := blockRequestHandler.OnBlockRequest(ctx, ids.GenerateTestNodeID(), 1, message.BlockRequest{
Hash: blocks[10].Hash(),
Expand Down
22 changes: 18 additions & 4 deletions sync/handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/coreth/core/state/snapshot"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/plugin/evm/message"
"github.com/ava-labs/coreth/sync/handlers/stats"
Expand All @@ -17,6 +18,19 @@ import (

var _ message.RequestHandler = &syncHandler{}

type BlockProvider interface {
GetBlock(common.Hash, uint64) *types.Block
}

type SnapshotProvider interface {
Snapshots() *snapshot.Tree
}

type SyncDataProvider interface {
BlockProvider
SnapshotProvider
}

type syncHandler struct {
stateTrieLeafsRequestHandler *LeafsRequestHandler
atomicTrieLeafsRequestHandler *LeafsRequestHandler
Expand All @@ -26,16 +40,16 @@ type syncHandler struct {

// NewSyncHandler constructs the handler for serving state sync.
func NewSyncHandler(
getBlock func(common.Hash, uint64) *types.Block,
provider SyncDataProvider,
evmTrieDB *trie.Database,
atomicTrieDB *trie.Database,
networkCodec codec.Manager,
stats stats.HandlerStats,
) message.RequestHandler {
return &syncHandler{
stateTrieLeafsRequestHandler: NewLeafsRequestHandler(evmTrieDB, networkCodec, stats),
atomicTrieLeafsRequestHandler: NewLeafsRequestHandler(atomicTrieDB, networkCodec, stats),
blockRequestHandler: NewBlockRequestHandler(getBlock, networkCodec, stats),
stateTrieLeafsRequestHandler: NewLeafsRequestHandler(evmTrieDB, provider, networkCodec, stats),
atomicTrieLeafsRequestHandler: NewLeafsRequestHandler(atomicTrieDB, nil, networkCodec, stats),
blockRequestHandler: NewBlockRequestHandler(provider, networkCodec, stats),
codeRequestHandler: NewCodeRequestHandler(evmTrieDB.DiskDB(), networkCodec, stats),
}
}
Expand Down
Loading

0 comments on commit 36b069f

Please sign in to comment.