Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reward: SupplyManager correctly handles block rewinds #67

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
// to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
}

// Rewind the supply checkpoint
newLastSupplyCheckpointNumber := header.Number.Uint64() - (header.Number.Uint64() % params.SupplyCheckpointInterval)
bc.db.WriteLastSupplyCheckpointNumber(newLastSupplyCheckpointNumber)
hyunsooda marked this conversation as resolved.
Show resolved Hide resolved

return bc.CurrentBlock().Number().Uint64(), nil
}

Expand All @@ -629,6 +634,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if bc.Config().Istanbul.ProposerPolicy == params.WeightedRandom && !bc.Config().IsKaiaForkEnabled(new(big.Int).SetUint64(num)) && params.IsStakingUpdateInterval(num) {
bc.db.DeleteStakingInfo(num)
}
bc.db.DeleteSupplyCheckpoint(num)
}

// If SetHead was only called as a chain reparation method, try to skip
Expand Down
2 changes: 2 additions & 0 deletions node/cn/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (b *CNAPIBackend) SetHead(number uint64) error {
b.cn.protocolManager.Downloader().Cancel()
b.cn.protocolManager.SetSyncStop(true)
defer b.cn.protocolManager.SetSyncStop(false)
b.cn.supplyManager.Stop()
defer b.cn.supplyManager.Start()
return doSetHead(b.cn.blockchain, b.cn.engine, b.cn.governance, b.gpo, number)
}

Expand Down
13 changes: 13 additions & 0 deletions node/cn/api_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ func testGov() *governance.MixedEngine {
return governance.NewMixedEngine(config, db)
}

type testSupplyManager struct{}

func (sm *testSupplyManager) Start() {
}

func (sm *testSupplyManager) Stop() {
}

func (sm *testSupplyManager) GetTotalSupply(num uint64) (*reward.TotalSupply, error) {
return &reward.TotalSupply{}, nil
}

func TestCNAPIBackend_SetHead(t *testing.T) {
mockCtrl, mockBlockChain, _, api := newCNAPIBackend(t)
defer mockCtrl.Finish()
Expand All @@ -183,6 +195,7 @@ func TestCNAPIBackend_SetHead(t *testing.T) {
api.cn.protocolManager = pm
api.cn.engine = gxhash.NewFullFaker()
api.cn.governance = testGov()
api.cn.supplyManager = &testSupplyManager{}
api.gpo = gasprice.NewOracle(api, gasprice.Config{}, nil, api.cn.governance)

number := uint64(123)
Expand Down
4 changes: 1 addition & 3 deletions node/cn/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ func New(ctx *node.ServiceContext, config *Config) (*CN, error) {
// NewStakingManager is called with proper non-nil parameters
reward.NewStakingManager(cn.blockchain, governance, cn.chainDB)
}
// Note: archive nodes might have TrieBlockInterval == 128, then SupplyManager will store checkpoints every 128 blocks.
// Still it is not a problem since SupplyManager can re-accumulate from the nearest checkpoint.
cn.supplyManager = reward.NewSupplyManager(cn.blockchain, cn.governance, cn.chainDB, config.TrieBlockInterval)
cn.supplyManager = reward.NewSupplyManager(cn.blockchain, cn.governance, cn.chainDB)

// Governance states which are not yet applied to the db remains at in-memory storage
// It disappears during the node restart, so restoration is needed before the sync starts
Expand Down
3 changes: 2 additions & 1 deletion params/governance_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ const (
// The prefix for governance cache
GovernanceCachePrefix = "governance"

CheckpointInterval = 1024
CheckpointInterval = 1024 // For Istanbul snapshot
SupplyCheckpointInterval = 128 // for SupplyManager tracking native token supply
)

const (
Expand Down
56 changes: 19 additions & 37 deletions reward/supply_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ import (
"github.com/kaiachain/kaia/common"
"github.com/kaiachain/kaia/contracts/contracts/system_contracts/rebalance"
"github.com/kaiachain/kaia/event"
"github.com/kaiachain/kaia/params"
"github.com/kaiachain/kaia/storage/database"
)

var (
supplyCacheSize = 86400 // A day; Some total supply consumers might want daily supply.
supplyLogInterval = uint64(102400) // Periodic total supply log.
supplyReaccLimit = uint64(1024) // Re-accumulate from the last accumulated block.
zeroBurnAddress = common.HexToAddress("0x0")
deadBurnAddress = common.HexToAddress("0xdead")

Expand Down Expand Up @@ -104,7 +104,7 @@ type supplyManager struct {

// NewSupplyManager creates a new supply manager.
// The TotalSupply data is stored every checkpointInterval blocks.
func NewSupplyManager(chain blockChain, gov governanceHelper, db database.DBManager, checkpointInterval uint) *supplyManager {
func NewSupplyManager(chain blockChain, gov governanceHelper, db database.DBManager) *supplyManager {
checkpointCache, _ := lru.NewARC(supplyCacheSize)
memoCache, _ := lru.NewARC(10)

Expand All @@ -113,7 +113,7 @@ func NewSupplyManager(chain blockChain, gov governanceHelper, db database.DBMana
chainHeadChan: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
gov: gov,
db: db,
checkpointInterval: uint64(checkpointInterval),
checkpointInterval: uint64(params.SupplyCheckpointInterval),
checkpointCache: checkpointCache,
memoCache: memoCache,
quitCh: make(chan struct{}, 1), // make sure Stop() doesn't block if catchup() has exited before Stop()
Expand All @@ -139,11 +139,6 @@ func (sm *supplyManager) GetCheckpoint(num uint64) (*database.SupplyCheckpoint,
return checkpoint.(*database.SupplyCheckpoint), nil
}

lastNum := sm.db.ReadLastSupplyCheckpointNumber()
if lastNum < num { // soft deleted
return nil, errNoCheckpoint
}

checkpoint, err := sm.getCheckpointUncached(num)
if err != nil {
return nil, err
Expand Down Expand Up @@ -342,16 +337,18 @@ func (sm *supplyManager) catchup() {
case head := <-sm.chainHeadChan:
headNum = head.Block.NumberU64()

supply, err := sm.accumulateReward(lastNum, headNum, lastCheckpoint, true)
if err != nil {
if err != errSupplyManagerQuit {
logger.Error("Total supply accumulate failed", "from", lastNum, "to", headNum, "err", err)
if lastNum < headNum {
supply, err := sm.accumulateReward(lastNum, headNum, lastCheckpoint, true)
if err != nil {
if err != errSupplyManagerQuit {
logger.Error("Total supply accumulate failed", "from", lastNum, "to", headNum, "err", err)
}
return
}
return
}

lastNum = headNum
lastCheckpoint = supply
lastNum = headNum
lastCheckpoint = supply
}
}
}
}
Expand Down Expand Up @@ -381,36 +378,21 @@ func (sm *supplyManager) totalSupplyFromState(num uint64) (*big.Int, error) {
}

func (sm *supplyManager) getCheckpointUncached(num uint64) (*database.SupplyCheckpoint, error) {
// Read from DB
checkpoint := sm.db.ReadSupplyCheckpoint(num)
if checkpoint != nil {
return checkpoint, nil
}

// Trace back to the last stored supply checkpoint.
var fromNum uint64
var fromAcc *database.SupplyCheckpoint

// Fast path using checkpointInterval
if checkpoint := sm.db.ReadSupplyCheckpoint(num - num%sm.checkpointInterval); checkpoint != nil {
fromNum = num - num%sm.checkpointInterval
fromAcc = checkpoint
} else {
// Slow path in case the checkpoint has changed or checkpoint is missing.
for i := uint64(1); i < supplyReaccLimit; i++ {
checkpoint = sm.db.ReadSupplyCheckpoint(num - i)
if checkpoint != nil {
fromNum = num - i
fromAcc = checkpoint
break
}
}
}
if fromAcc == nil {
// Re-accumulate from the the nearest checkpoint
fromNum := num - (num % sm.checkpointInterval)
fromCheckpoint := sm.db.ReadSupplyCheckpoint(fromNum)
if fromCheckpoint == nil {
return nil, errNoCheckpoint
}

logger.Trace("on-demand reaccumulating rewards", "from", fromNum, "to", num)
return sm.accumulateReward(fromNum, num, fromAcc, false)
return sm.accumulateReward(fromNum, num, fromCheckpoint, false)
}

// accumulateReward calculates the total supply from the last block to the current block.
Expand Down
Loading
Loading