Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reward: Fix state regeneration with post-Kaia staking info #43

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions blockchain/system/multicall.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ func (caller *ContractCallerForMultiCall) CallContract(ctx context.Context, call
}

// NewMultiCallContractCaller creates a new instance of ContractCaller for MultiCall contract.
hyeonLewis marked this conversation as resolved.
Show resolved Hide resolved
func NewMultiCallContractCaller(chain backends.BlockChainForCaller, header *types.Header) (*multicall.MultiCallContractCaller, error) {
state, err := chain.StateAt(header.Root)
if err != nil {
return nil, err
}
c := &ContractCallerForMultiCall{state, chain, header}
func NewMultiCallContractCaller(state *state.StateDB, chain backends.BlockChainForCaller, header *types.Header) (*multicall.MultiCallContractCaller, error) {
c := &ContractCallerForMultiCall{state.Copy(), chain, header} // Copy the state to prevent the original state from being modified.
return multicall.NewMultiCallContractCaller(MultiCallAddr, c)
}
4 changes: 2 additions & 2 deletions blockchain/system/multicall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestContractCallerForMultiCall(t *testing.T) {
header := backend.BlockChain().CurrentHeader()
chain := backend.BlockChain()

caller, _ := NewMultiCallContractCaller(chain, header)
state, _ := backend.BlockChain().StateAt(header.Root)
caller, _ := NewMultiCallContractCaller(state, chain, header)
ret, err := caller.MultiCallStakingInfo(&bind.CallOpts{BlockNumber: header.Number})
assert.Nil(t, err)

// Does not affect the original state
state, _ := backend.BlockChain().StateAt(header.Root)
assert.Equal(t, []byte(nil), state.GetCode(MultiCallAddr))

// Mock data
Expand Down
53 changes: 53 additions & 0 deletions common/refcntmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package common

import (
"sync"
)

// RefCntMap is a map with reference counting.
type RefCountingMap struct {
mu sync.RWMutex
values map[interface{}]interface{}
counts map[interface{}]int
}

func NewRefCountingMap() *RefCountingMap {
return &RefCountingMap{
values: make(map[interface{}]interface{}),
counts: make(map[interface{}]int),
}
}

// Get returns the value associated with the given key.
func (r *RefCountingMap) Get(key interface{}) (interface{}, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
value, ok := r.values[key]
return value, ok
}

// Add adds an element to the map and increments its reference count.
func (r *RefCountingMap) Add(key interface{}, value interface{}) {
r.mu.Lock()
defer r.mu.Unlock()
r.values[key] = value
r.counts[key]++
}

// Remove decrements the reference count of the element with the given key.
func (r *RefCountingMap) Remove(key interface{}) {
r.mu.Lock()
defer r.mu.Unlock()
if r.counts[key] > 0 {
r.counts[key]--
}
if r.counts[key] == 0 {
delete(r.values, key)
delete(r.counts, key)
}
}

// Len returns the number of elements in the map.
func (r *RefCountingMap) Len() int {
return len(r.values)
}
21 changes: 20 additions & 1 deletion node/cn/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kaiachain/kaia/blockchain/types"
"github.com/kaiachain/kaia/blockchain/vm"
"github.com/kaiachain/kaia/common"
"github.com/kaiachain/kaia/reward"
statedb2 "github.com/kaiachain/kaia/storage/statedb"
)

Expand Down Expand Up @@ -112,17 +113,29 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD
start = time.Now()
logged time.Time
parent common.Hash

preloadedStakingBlockNums = make([]uint64, 0, origin-current.NumberU64())
)
defer func() {
for _, num := range preloadedStakingBlockNums {
reward.UnloadStakingInfo(num)
}
}()
for current.NumberU64() < origin {
// Print progress logs if long enough time elapsed
if report && time.Since(logged) > 8*time.Second {
logger.Info("Regenerating historical state", "block", current.NumberU64()+1, "target", origin, "remaining", origin-block.NumberU64()-1, "elapsed", time.Since(start))
logger.Info("Regenerating historical state", "block", current.NumberU64()+1, "target", origin, "remaining", origin-current.NumberU64()-1, "elapsed", time.Since(start))
logged = time.Now()
}
// Quit the state regeneration if time limit exceeds
if cn.config.DisableUnsafeDebug && time.Since(start) > cn.config.StateRegenerationTimeLimit {
return nil, fmt.Errorf("this request has queried old states too long since it exceeds the state regeneration time limit(%s)", cn.config.StateRegenerationTimeLimit.String())
}
// Preload StakingInfo from the current block and state. Needed for next block's engine.Finalize() post-Kaia.
preloadedStakingBlockNums = append(preloadedStakingBlockNums, current.NumberU64())
if err := reward.PreloadStakingInfoWithState(current.Header(), statedb); err != nil {
return nil, fmt.Errorf("preloading staking info from block %d failed: %v", current.NumberU64(), err)
}
// Retrieve the next block to regenerate and process it
next := current.NumberU64() + 1
if current = cn.blockchain.GetBlockByNumber(next); current == nil {
Expand All @@ -143,6 +156,12 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD
database.TrieDB().ReferenceRoot(root)
if !common.EmptyHash(parent) {
database.TrieDB().Dereference(parent)
if current.Header().Root != root {
err = fmt.Errorf("mistmatching state root block expected %x reexecuted %x", current.Header().Root, root)
// Logging here because something went wrong when the state roots disagree even if the execution was successful.
logger.Error("incorrectly regenerated historical state", "block", current.NumberU64(), "err", err)
return nil, fmt.Errorf("incorrectly regenerated historical state for block %d: %v", current.NumberU64(), err)
}
}
parent = root
}
Expand Down
75 changes: 74 additions & 1 deletion reward/staking_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type StakingManager struct {
blockchain blockChain
chainHeadChan chan blockchain.ChainHeadEvent
chainHeadSub event.Subscription

// Preloaded stakingInfos are fetched before the GetStakingInfo request.
// This is used when the state is available when preloaded, but not available
// when GetStakingInfo is called. e.g. reexec loop in stateAtBlock.
// Therefore preloaded staking infos must not be evicted,
// and it should be only used temporarily, hence a separate mapping.
preloadedInfo *common.RefCountingMap
}

var (
Expand Down Expand Up @@ -104,6 +111,7 @@ func NewStakingManager(bc blockChain, gh governanceHelper, db stakingInfoDB) *St
governanceHelper: gh,
blockchain: bc,
chainHeadChan: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
preloadedInfo: common.NewRefCountingMap(),
}

// Before migration, staking information of current and before should be stored in DB.
Expand Down Expand Up @@ -234,6 +242,45 @@ func GetStakingInfoOnStakingBlock(stakingBlockNumber uint64) *StakingInfo {
return calcStakingInfo
}

// PreloadStakingInfoWithState fetches the stakingInfo based on the given state trie
// and then stores it in the preloaded map. Because preloaded map does not have eviction policy,
// it should be removed manually after use. Note that preloaded info is for the next block of the given header.
func PreloadStakingInfoWithState(header *types.Header, statedb *state.StateDB) error {
if stakingManager == nil {
return ErrStakingManagerNotSet
}

if !isKaiaForkEnabled(header.Number.Uint64() + 1) {
return nil // no need to preload staking info before kaia fork because we have it in the database.
}

if header.Root != statedb.IntermediateRoot(false) { // Sanity check
return errors.New("must supply the statedb for the given header") // this should not happen
}

num := header.Number.Uint64()
info, err := getStakingInfoFromMultiCallAtState(num, statedb, header)
if err != nil {
return fmt.Errorf("staking info preload failed. root err: %v", err)
}
if info != nil {
stakingManager.preloadedInfo.Add(num, info)
}
logger.Trace("preloaded staking info", "staking block number", num)
return nil
}

// UnloadStakingInfo removes a stakingInfo from the preloaded map.
// Must be executed after PreloadStakingInfoWithState(Header{num}, state).
func UnloadStakingInfo(num uint64) {
if stakingManager == nil {
logger.Error("unable to GetStakingInfo", "err", ErrStakingManagerNotSet)
return
}

stakingManager.preloadedInfo.Remove(num)
}

// updateKaiaStakingInfo updates kaia staking info in cache created from given block number.
// From Kaia fork, not only the staking block number but also the calculation of staking amounts is changed,
// so we need separate update function for kaia staking info.
Expand Down Expand Up @@ -287,8 +334,17 @@ func getStakingInfoFromMultiCall(blockNum uint64) (*StakingInfo, error) {
return nil, fmt.Errorf("failed to get header by number %d", blockNum)
}

statedb, err := stakingManager.blockchain.StateAt(header.Root)
if err != nil {
return nil, fmt.Errorf("failed to get state at number %d. root err: %s", blockNum, err)
}

return getStakingInfoFromMultiCallAtState(blockNum, statedb, header)
}

func getStakingInfoFromMultiCallAtState(blockNum uint64, statedb *state.StateDB, header *types.Header) (*StakingInfo, error) {
// Get staking info from multicall contract
caller, err := system.NewMultiCallContractCaller(stakingManager.blockchain, header)
caller, err := system.NewMultiCallContractCaller(statedb, stakingManager.blockchain, header)
if err != nil {
return nil, fmt.Errorf("failed to create multicall contract caller. root err: %s", err)
}
Expand Down Expand Up @@ -374,6 +430,16 @@ func getStakingInfoFromCache(blockNum uint64) *StakingInfo {
return cachedStakingInfo.(*StakingInfo)
}

if info, ok := stakingManager.preloadedInfo.Get(blockNum); ok {
info := info.(*StakingInfo)
logger.Debug("preloadedInfo hit.", "staking block number", blockNum, "stakingInfo", info)
// Fill in Gini coeff if not set. Modifies the cached object.
if err := fillMissingGiniCoefficient(info, blockNum); err != nil {
blukat29 marked this conversation as resolved.
Show resolved Hide resolved
logger.Warn("Cannot fill in gini coefficient", "staking block number", blockNum, "err", err)
}
return info
}

return nil
}

Expand Down Expand Up @@ -517,6 +583,7 @@ func SetTestStakingManagerWithChain(bc blockChain, gh governanceHelper, db staki
governanceHelper: gh,
blockchain: bc,
chainHeadChan: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -526,6 +593,7 @@ func SetTestStakingManagerWithDB(testDB stakingInfoDB) {
SetTestStakingManager(&StakingManager{
blockchain: &blockchain.BlockChain{},
stakingInfoDB: testDB,
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -537,6 +605,7 @@ func SetTestStakingManagerWithStakingInfoCache(testInfo *StakingInfo) {
SetTestStakingManager(&StakingManager{
blockchain: &blockchain.BlockChain{},
stakingInfoCache: cache,
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -563,3 +632,7 @@ func SetTestAddressBookAddress(addr common.Address) {
func TestGetStakingCacheSize() int {
return GetStakingManager().stakingInfoCache.Len()
}

func TestGetStakingPreloadSize() int {
return GetStakingManager().preloadedInfo.Len()
}
Loading
Loading