Skip to content

Commit

Permalink
feat: add watch and walk subcommands to index while traversing chain
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Nov 25, 2020
1 parent 2f13ca1 commit 705965d
Show file tree
Hide file tree
Showing 39 changed files with 1,292 additions and 227 deletions.
159 changes: 159 additions & 0 deletions chain/actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package chain

import (
"context"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
)

type ActorStateProcessor struct {
asp *actorstate.ActorStateProcessor
node lens.API
opener lens.APIOpener
closer lens.APICloser
lastTipSet *types.TipSet
lastStateTree *state.StateTree
}

func NewActorStateProcessor(opener lens.APIOpener, asp *actorstate.ActorStateProcessor) *ActorStateProcessor {
return &ActorStateProcessor{
asp: asp,
opener: opener,
}
}

func (p *ActorStateProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
return nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
}

var result model.PersistableWithTx
var err error

stateTree, err := state.LoadStateTree(p.node.Store(), ts.ParentState())
if err != nil {
return nil, xerrors.Errorf("failed to load state tree: %w", err)
}

if p.lastTipSet != nil && p.lastStateTree != nil {
if p.lastTipSet.Height() > ts.Height() {
// last tipset seen was the child
result, err = p.processStateChanges(ctx, p.lastTipSet, ts, p.lastStateTree, stateTree)
} else if p.lastTipSet.Height() < ts.Height() {
// last tipset seen was the parent
result, err = p.processStateChanges(ctx, ts, p.lastTipSet, stateTree, p.lastStateTree)
} else {
// TODO: record in database that we were unable to process actors for this tipset
log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height())
}
}

p.lastTipSet = ts
p.lastStateTree = stateTree

// TODO: close lens if rpc error
return result, err
}

func (p *ActorStateProcessor) processStateChanges(ctx context.Context, ts *types.TipSet, pts *types.TipSet, stateTree *state.StateTree, parentStateTree *state.StateTree) (model.PersistableWithTx, error) {
log.Debugw("processing state changes", "height", ts.Height(), "parent_height", pts.Height())
if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) {
return nil, xerrors.Errorf("child is not on the same chain")
}

changes, err := state.Diff(parentStateTree, stateTree)
if err != nil {
return nil, xerrors.Errorf("get actor changes: %w", err)
}

ll := log.With("height", int64(ts.Height()))

ll.Debugw("found actor state changes", "count", len(changes))

rawResults := make(PersistableWithTxList, len(changes))
parsedResults := make(PersistableWithTxList, len(changes))

grp, ctx := errgroup.WithContext(ctx)

idx := 0
for str, act := range changes {
lla := ll.With("addr", str, "code", actorstate.ActorNameByCode(act.Code))
lla.Debugw("found actor change")

extracter, ok := actorstate.GetActorStateExtractor(act.Code)
if !ok {
lla.Debugw("skipping change for unsupported actor")
continue
}

addr, err := address.NewFromString(str)
if err != nil {
return nil, xerrors.Errorf("parse address: %w", err)
}

info := actorstate.ActorInfo{
Actor: act,
Address: addr,
ParentStateRoot: pts.ParentState(),
Epoch: ts.Height(),
TipSet: pts.Key(),
ParentTipSet: pts.Parents(),
}

idx := idx // ensure local variable
grp.Go(func() error {
start := time.Now()

// TODO: we have the state trees, can we optimize actor state extraction further?

// Extract raw state
var ae actorstate.ActorExtractor
raw, err := ae.Extract(ctx, info, p.node)
if err != nil {
return xerrors.Errorf("extract raw actor state: %w", err)
}

// Parse state
parsed, err := extracter.Extract(ctx, info, p.node)
if err != nil {
return xerrors.Errorf("extract actor state: %w", err)
}

lla.Debugw("parsed actor change", "time", time.Since(start))
rawResults[idx] = raw
parsedResults[idx] = parsed
return nil
})

}

if err := grp.Wait(); err != nil {
return nil, err
}

return PersistableWithTxList{
rawResults,
parsedResults,
}, nil
}

func (p *ActorStateProcessor) Close() error {
if p.closer != nil {
p.closer()
}
return nil
}
38 changes: 38 additions & 0 deletions chain/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package chain

import (
"context"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/model/blocks"
)

type BlockProcessor struct {
}

func NewBlockProcessor() *BlockProcessor {
return &BlockProcessor{}
}

func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, error) {
var pl PersistableWithTxList
for _, bh := range ts.Blocks() {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

pl = append(pl, blocks.NewBlockHeader(bh))
pl = append(pl, blocks.NewBlockParents(bh))
pl = append(pl, blocks.NewDrandBlockEntries(bh))
}

return pl, nil
}

func (p *BlockProcessor) Close() error {
return nil
}
52 changes: 52 additions & 0 deletions chain/economics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package chain

import (
"context"

"github.com/filecoin-project/lotus/chain/types"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/tasks/chain"
)

type ChainEconomicsProcessor struct {
extracter *chain.ChainEconomicsExtracter
node lens.API
opener lens.APIOpener
closer lens.APICloser
}

func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor {
return &ChainEconomicsProcessor{
opener: opener,
extracter: &chain.ChainEconomicsExtracter{},
}
}

func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
return nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
}
// TODO: close lens if rpc error

ce, err := p.extracter.ProcessTipSet(ctx, p.node, ts)
if err != nil {
return nil, xerrors.Errorf("process tip set: %w", err)
}

return ce, nil
}

func (p *ChainEconomicsProcessor) Close() error {
if p.closer != nil {
p.closer()
}
return nil
}
136 changes: 136 additions & 0 deletions chain/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package chain

import (
"context"
"time"

"github.com/filecoin-project/lotus/chain/types"
"github.com/go-pg/pg/v10"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
)

var log = logging.Logger("chain")

var _ indexer.TipSetObserver = (*TipSetIndexer)(nil)

// A TipSetWatcher waits for tipsets and persists their block data into a database.
type TipSetIndexer struct {
window time.Duration
opener lens.APIOpener
storage Storage
processors map[string]TipSetProcessor
}

func NewTipSetIndexer(o lens.APIOpener, d Storage, window time.Duration, actorCodes []cid.Cid) (*TipSetIndexer, error) {
// TODO: remove the hackiness of having to create a mostly unused processor
asp, err := actorstate.NewActorStateProcessor(nil, nil, 0, 0, 0, 0, actorCodes, false)
if err != nil {
return nil, xerrors.Errorf("new actor state processor: %w", err)
}

return &TipSetIndexer{
storage: d,
processors: map[string]TipSetProcessor{
"blocks": NewBlockProcessor(),
"messages": NewMessageProcessor(o), // does gas outputs too
"actorstate": NewActorStateProcessor(o, asp),
"economics": NewChainEconomicsProcessor(o),
},
window: window,
}, nil
}

func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
if t.window > 0 {
// Do as much indexing as possible in the specified time window (usually one epoch when following head of chain)
// Anything not completed in that time will be marked as incomplete
var cancel func()
ctx, cancel = context.WithTimeout(ctx, t.window)
defer cancel()
}
start := time.Now()

// Run each task concurrently
results := make(chan *TaskResult, len(t.processors))
for name, p := range t.processors {
go t.runProcessor(ctx, p, name, ts, results)
}

data := make(PersistableWithTxList, 0, len(t.processors))

// Gather results
inFlight := len(t.processors)
for inFlight > 0 {
res := <-results
inFlight--
elapsed := time.Since(start)

if res.Error != nil {
log.Errorw("task returned with error", "task", res.Task, "error", res.Error.Error(), "time", elapsed)
continue
}

log.Debugw("task returned with data", "task", res.Task, "time", elapsed)
data = append(data, res.Data)
}

// TODO: persist all returned data asynch

if err := t.storage.Persist(ctx, data); err != nil {
log.Errorw("persistence failed", "error", err)
}

log.Debugw("tipset complete", "total_time", time.Since(start))

return nil
}

func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, name string, ts *types.TipSet, results chan *TaskResult) {
data, err := p.ProcessTipSet(ctx, ts)
if err != nil {
results <- &TaskResult{
Task: name,
Error: ctx.Err(),
}
return
}
results <- &TaskResult{
Task: name,
Data: data,
}
}

type PersistableWithTxList []model.PersistableWithTx

var _ model.PersistableWithTx = (PersistableWithTxList)(nil)

func (pl PersistableWithTxList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
for _, p := range pl {
if p == nil {
continue
}
if err := p.PersistWithTx(ctx, tx); err != nil {
return err
}
}
return nil
}

// A TaskResult is either some data to persist or an error which indicates that the task did not complete
type TaskResult struct {
Task string
Error error
Data model.PersistableWithTx
}

type TipSetProcessor interface {
ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, error)
Close() error
}
Loading

0 comments on commit 705965d

Please sign in to comment.