Skip to content

Commit

Permalink
add pipeline from chain to onroad
Browse files Browse the repository at this point in the history
  • Loading branch information
viteshan committed Jul 19, 2022
1 parent 345a369 commit b52bc69
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 17 deletions.
2 changes: 2 additions & 0 deletions interfaces/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ type DBStatus struct {
Size uint64
Status string
}

type LoadOnroadFn func(fromAddr, toAddr types.Address, hashHeight core.HashHeight) error
46 changes: 45 additions & 1 deletion ledger/chain/index/onroad.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"github.com/vitelabs/go-vite/v2/common/types"
"github.com/vitelabs/go-vite/v2/interfaces"
ledger "github.com/vitelabs/go-vite/v2/interfaces/core"
"github.com/vitelabs/go-vite/v2/ledger/chain/utils"
chain_utils "github.com/vitelabs/go-vite/v2/ledger/chain/utils"
)

// @Deprecated
func (iDB *IndexDB) Load(addrList []types.Address) (map[types.Address]map[types.Address][]ledger.HashHeight, error) {
onRoadData := make(map[types.Address]map[types.Address][]ledger.HashHeight, len(addrList))
for _, addr := range addrList {
Expand Down Expand Up @@ -54,6 +55,49 @@ func (iDB *IndexDB) Load(addrList []types.Address) (map[types.Address]map[types.
return onRoadData, nil
}

func (iDB *IndexDB) LoadRange(addrList []types.Address, loadFn interfaces.LoadOnroadFn) error {
for _, addr := range addrList {
iter := iDB.store.NewIterator(util.BytesPrefix(append([]byte{chain_utils.OnRoadKeyPrefix}, addr.Bytes()...)))
for iter.Next() {
key := iter.Key()
blockHashBytes := key[len(key)-types.HashSize:]

blockHash, err := types.BytesToHash(blockHashBytes)
if err != nil {
return err
}

fromAddr, height, err := iDB.GetAddrHeightByHash(&blockHash)

if err != nil {
return err
}

if fromAddr == nil {
iDB.log.Error(fmt.Sprintf("block hash is %s, fromAddr is %s, height is %d", blockHash, fromAddr, height), "method", "Load")
continue
}
err = loadFn(*fromAddr, addr, ledger.HashHeight{
Height: height,
Hash: blockHash,
})

if err != nil {
return err
}
}

err := iter.Error()
iter.Release()

if err != nil {
return err
}

}
return nil
}

func (iDB *IndexDB) LoadAllHash() (map[types.Address][]types.Hash, error) {
onRoadListMap := make(map[types.Address][]types.Hash)

Expand Down
2 changes: 2 additions & 0 deletions ledger/chain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ type Chain interface {
// ====== OnRoad ======
LoadOnRoad(gid types.Gid) (map[types.Address]map[types.Address][]ledger.HashHeight, error)

LoadOnRoadRange(gid types.Gid, fn interfaces.LoadOnroadFn) error

DeleteOnRoad(toAddress types.Address, sendBlockHash types.Hash)

GetOnRoadBlocksByAddr(addr types.Address, pageNum, pageSize int) ([]*ledger.AccountBlock, error)
Expand Down
18 changes: 18 additions & 0 deletions ledger/chain/onroad.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"github.com/pkg/errors"

"github.com/vitelabs/go-vite/v2/common/types"
"github.com/vitelabs/go-vite/v2/interfaces"
ledger "github.com/vitelabs/go-vite/v2/interfaces/core"
chain_plugins "github.com/vitelabs/go-vite/v2/ledger/chain/plugins"
)

// @Deprecated
func (c *chain) LoadOnRoad(gid types.Gid) (map[types.Address]map[types.Address][]ledger.HashHeight, error) {
addrList, err := c.GetContractList(gid)
if err != nil {
Expand All @@ -28,6 +30,22 @@ func (c *chain) LoadOnRoad(gid types.Gid) (map[types.Address]map[types.Address][

}

func (c *chain) LoadOnRoadRange(gid types.Gid, loadFn interfaces.LoadOnroadFn) error {
addrList, err := c.GetContractList(gid)
if err != nil {
return err
}

err = c.indexDB.LoadRange(addrList, loadFn)
if err != nil {
cErr := fmt.Errorf("c.indexDB.Load failed, addrList is %+v。 Error: %s", addrList, err)
c.log.Error(cErr.Error(), "method", "LoadOnRoad")
return cErr
}

return nil
}

func (c *chain) GetOnRoadBlocksByAddr(addr types.Address, pageNum, pageSize int) ([]*ledger.AccountBlock, error) {
hashList, err := c.indexDB.GetOnRoadHashList(addr, pageNum, pageSize)
if err != nil {
Expand Down
57 changes: 41 additions & 16 deletions ledger/onroad/pool/contract_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/syndtr/goleveldb/leveldb"
"github.com/vitelabs/go-vite/v2/common/types"
"github.com/vitelabs/go-vite/v2/interfaces/core"
ledger "github.com/vitelabs/go-vite/v2/interfaces/core"
"github.com/vitelabs/go-vite/v2/log15"
)
Expand Down Expand Up @@ -38,25 +39,34 @@ func NewContractOnRoadPool(gid types.Gid, chain chainReader, db *leveldb.DB) OnR
}

func (p *contractOnRoadPool) loadOnRoad() error {
p.log.Info("loadOnRoad from chain")
contractMap, err := p.chain.LoadOnRoad(p.gid)
if err != nil {
return err
}
p.log.Info("start loadOnRoad into pool")
// resort the map
for contract, callerMap := range contractMap {
cc, _ := p.cache.LoadOrStore(contract, NewCallerCache(contract, p.storage))
for caller, orList := range callerMap {
if initErr := cc.(*callerCache).initLoad(p.chain, caller, orList); initErr != nil {
p.log.Error("loadOnRoad failed", "err", initErr, "caller", caller)
return err
}
toAddrStat := make(map[types.Address]uint64)
var lastToAddr *types.Address
p.log.Info("start loadOnRoad from chain into onroad")
err := p.chain.LoadOnRoadRange(p.gid, func(fromAddr, toAddr types.Address, hashHeight core.HashHeight) error {
var cc *callerCache
if value, ok := p.cache.Load(toAddr); ok {
cc = value.(*callerCache)
} else {
raw, _ := p.cache.LoadOrStore(toAddr, NewCallerCache(toAddr, p.storage))
cc = raw.(*callerCache)
}
if cc.(*callerCache).len() > 0 {
p.log.Info(fmt.Sprintf("initLoad one caller, len=%v", cc.(*callerCache).len()), "contract", contract)
if cc == nil {
return fmt.Errorf("error load caller cache for %s", toAddr)
}
if toAddrStat[toAddr] == 0 && lastToAddr != nil {
p.log.Info(fmt.Sprintf("initLoad one caller, len=%v", toAddrStat[*lastToAddr]), "contract", *lastToAddr)
}
toAddrStat[toAddr] = toAddrStat[toAddr] + 1
lastToAddr = &toAddr
return cc.initAdd(fromAddr, toAddr, hashHeight)
})
if lastToAddr != nil {
p.log.Info(fmt.Sprintf("initLoad one caller, len=%v", toAddrStat[*lastToAddr]), "contract", *lastToAddr)
}
if err != nil {
return err
}
p.log.Info("end loadOnRoad from chain into onroad")
p.log.Info("success loadOnRoad")
return nil
}
Expand Down Expand Up @@ -258,6 +268,21 @@ func NewCallerCache(address types.Address, storage *onroadStorage) *callerCache
}
}

func (cc *callerCache) initAdd(fromAddr, toAddr types.Address, hashHeight core.HashHeight) error {
isCallerContract := types.IsContractAddr(fromAddr)
or := &orHashHeight{
Height: hashHeight.Height,
Hash: hashHeight.Hash,
}
if !isCallerContract {
index := uint32(0)
or.SubIndex = &index
}
initLog.Debug(fmt.Sprintf("addTx %s", or.String()))
return cc.addTx(&fromAddr, *or, true)
}

// @Deprecated
func (cc *callerCache) initLoad(chain chainReader, caller types.Address, orList []ledger.HashHeight) error {
isCallerContract := types.IsContractAddr(caller)
orSortedList := make(onRoadList, 0)
Expand Down
2 changes: 2 additions & 0 deletions ledger/onroad/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package onroad_pool

import (
"github.com/vitelabs/go-vite/v2/common/types"
"github.com/vitelabs/go-vite/v2/interfaces"
ledger "github.com/vitelabs/go-vite/v2/interfaces/core"
)

Expand All @@ -18,6 +19,7 @@ type OnRoadPool interface {

type chainReader interface {
LoadOnRoad(gid types.Gid) (map[types.Address]map[types.Address][]ledger.HashHeight, error)
LoadOnRoadRange(gid types.Gid, fn interfaces.LoadOnroadFn) error
GetAccountBlockByHash(blockHash types.Hash) (*ledger.AccountBlock, error)
GetCompleteBlockByHash(blockHash types.Hash) (*ledger.AccountBlock, error)
}
10 changes: 10 additions & 0 deletions ledger/onroad/pool/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package onroad_pool

import (
"fmt"

"github.com/vitelabs/go-vite/v2/common/db/xleveldb/errors"
"github.com/vitelabs/go-vite/v2/common/types"
ledger "github.com/vitelabs/go-vite/v2/interfaces/core"
Expand Down Expand Up @@ -61,6 +63,14 @@ type orHashHeight struct {
cachedBlock *ledger.AccountBlock
}

func (or orHashHeight) String() string {
if or.SubIndex == nil {
return fmt.Sprintf("orHashHeight: hash=%s,height=%d,subIndex=nil", or.Hash, or.Height)
} else {
return fmt.Sprintf("orHashHeight: hash=%s,height=%d,subIndex=%d", or.Hash, or.Height, *or.SubIndex)
}
}

func newOrHashHeightFromOnroadTx(tx *OnroadTx) *orHashHeight {
return &orHashHeight{
Hash: tx.FromHash,
Expand Down

0 comments on commit b52bc69

Please sign in to comment.