Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add watch and walk commands to index chain during traversal #249

Merged
merged 2 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions chain/actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package chain

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
)

type ActorStateProcessor struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a bit scary that each processor holds the closer, since that's often going to be a shared handle that closes it fully for many lenses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closer here is for the API instance that is returned when opener is called. It's not the same as the closer created when the opener is created. That is deferred in the top level command.

extractRaw bool
extractParsed bool
lastTipSet *types.TipSet
lastStateTree *state.StateTree
}

func NewActorStateProcessor(opener lens.APIOpener, extractRaw bool, extractParsed bool) *ActorStateProcessor {
p := &ActorStateProcessor{
opener: opener,
extractRaw: extractRaw,
extractParsed: extractParsed,
}
return p
}

func (p *ActorStateProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to postpone this until ProcessTipSet and not init within New?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to re-open the lens whenever it has been closed after an error. This is the reconnection logic for the lotus API.


var data model.PersistableWithTx
var report *visormodel.ProcessingReport
var err error

stateTree, err := state.LoadStateTree(p.node.Store(), ts.ParentState())
if err != nil {
return nil, nil, xerrors.Errorf("failed to load state tree: %w", err)
}

if p.lastTipSet != nil && p.lastStateTree != nil {
if p.lastTipSet.Height() > ts.Height() {
// last tipset seen was the child
data, report, err = p.processStateChanges(ctx, p.lastTipSet, ts, p.lastStateTree, stateTree)
} else if p.lastTipSet.Height() < ts.Height() {
// last tipset seen was the parent
data, report, err = p.processStateChanges(ctx, ts, p.lastTipSet, stateTree, p.lastStateTree)
} else {
log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height())
}
}

p.lastTipSet = ts
p.lastStateTree = stateTree

if err != nil {
log.Errorw("error received while processing actors, closing lens", "error", err)
if cerr := p.Close(); cerr != nil {
log.Errorw("error received while closing lens", "error", cerr)
}
}
return data, report, err
Comment on lines +65 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can result in all returned values being nil, perhaps the error message should be filled with a message similar to the error log above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error and no data is a valid outcome. The very first tipset will have no data since we wait until we have seen two to perform a diff.

}

func (p *ActorStateProcessor) processStateChanges(ctx context.Context, ts *types.TipSet, pts *types.TipSet, stateTree *state.StateTree, parentStateTree *state.StateTree) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
log.Debugw("processing state changes", "height", ts.Height(), "parent_height", pts.Height())

report := &visormodel.ProcessingReport{
Height: int64(ts.Height()),
StateRoot: ts.ParentState().String(),
Status: visormodel.ProcessingStatusOK,
}

if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) {
report.ErrorsDetected = xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key())
return nil, report, nil
}

changes, err := state.Diff(parentStateTree, stateTree)
if err != nil {
report.ErrorsDetected = xerrors.Errorf("failed to diff state trees: %w", err)
return nil, report, nil
}

ll := log.With("height", int64(ts.Height()))

ll.Debugw("found actor state changes", "count", len(changes))

start := time.Now()

// Run each task concurrently
results := make(chan *ActorStateResult, len(changes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what closes / cleans up this channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It goes out of scope when the function returns and is garbage collected as normal. There's never any need to close a channel unless it is to signal that readers should stop waiting on it.

for addr, act := range changes {
go p.runActorStateExtraction(ctx, ts, pts, addr, act, results)
}

data := make(PersistableWithTxList, 0, len(changes))
errorsDetected := make([]*ActorStateError, 0, len(changes))
skippedActors := 0

// Gather results
inFlight := len(changes)
for inFlight > 0 {
res := <-results
inFlight--
elapsed := time.Since(start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This elapsed will not be accurate since the for loop will block receieves for all buffered results. Maybe this is negligble given we're just appending to pre-alloced slices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove all elapsed times in favour of proper metrics collection in a future change

lla := log.With("height", int64(ts.Height()), "actor", actorstate.ActorNameByCode(res.Code), "address", res.Address)

if res.Error != nil {
lla.Errorw("actor returned with error", "error", res.Error.Error())
report.ErrorsDetected = append(errorsDetected, &ActorStateError{
Code: res.Code.String(),
Name: actorstate.ActorNameByCode(res.Code),
Head: res.Head.String(),
Address: res.Address,
Error: res.Error.Error(),
})
continue
}

if res.SkippedParse {
lla.Debugw("skipped actor without extracter")
skippedActors++
}

lla.Debugw("actor returned with data", "time", elapsed)
data = append(data, res.Data)
}

if skippedActors > 0 {
report.StatusInformation = fmt.Sprintf("did not parse %d actors", skippedActors)
}

if len(errorsDetected) != 0 {
report.ErrorsDetected = errorsDetected
}
Comment on lines +151 to +153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you append errors on the report directly, rather than in a separate variable you then append to the report here?

Copy link
Contributor Author

@iand iand Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorsDetected is an interface{} unfortunately


return data, report, nil
}

func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
res := &ActorStateResult{
Code: act.Code,
Head: act.Head,
Address: addrStr,
}
defer func() {
results <- res
}()

addr, err := address.NewFromString(addrStr)
if err != nil {
res.Error = xerrors.Errorf("failed to parse address: %w", err)
return
}

info := actorstate.ActorInfo{
Actor: act,
Address: addr,
ParentStateRoot: pts.ParentState(),
Epoch: ts.Height(),
TipSet: pts.Key(),
ParentTipSet: pts.Parents(),
}

// TODO: we have the state trees available, can we optimize actor state extraction further?

var data PersistableWithTxList

// Extract raw state
if p.extractRaw {
var ae actorstate.ActorExtractor
raw, err := ae.Extract(ctx, info, p.node)
if err != nil {
res.Error = xerrors.Errorf("failed to extract raw actor state: %w", err)
return
}
data = append(data, raw)
}

if p.extractParsed {
extracter, ok := actorstate.GetActorStateExtractor(act.Code)
if !ok {
res.SkippedParse = true
} else {
// Parse state
parsed, err := extracter.Extract(ctx, info, p.node)
if err != nil {
res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err)
return
}

data = append(data, parsed)
}
}
res.Data = data
}

func (p *ActorStateProcessor) Close() error {
if p.closer != nil {
p.closer()
p.closer = nil
}
p.node = nil
return nil
}

type ActorStateResult struct {
Code cid.Cid
Head cid.Cid
Address string
Error error
SkippedParse bool
Data model.PersistableWithTx
}

type ActorStateError struct {
Code string
Name string
Head string
Address string
Error string
}
44 changes: 44 additions & 0 deletions chain/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package chain

import (
"context"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/model/blocks"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
)

type BlockProcessor struct {
}

func NewBlockProcessor() *BlockProcessor {
return &BlockProcessor{}
}

func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
var pl PersistableWithTxList
for _, bh := range ts.Blocks() {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

pl = append(pl, blocks.NewBlockHeader(bh))
pl = append(pl, blocks.NewBlockParents(bh))
pl = append(pl, blocks.NewDrandBlockEntries(bh))
}

report := &visormodel.ProcessingReport{
Height: int64(ts.Height()),
StateRoot: ts.ParentState().String(),
}

return pl, report, nil
}

func (p *BlockProcessor) Close() error {
return nil
}
71 changes: 71 additions & 0 deletions chain/economics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package chain

import (
"context"

"github.com/filecoin-project/lotus/chain/types"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
chainmodel "github.com/filecoin-project/sentinel-visor/model/chain"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
)

type ChainEconomicsProcessor struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
}

func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor {
return &ChainEconomicsProcessor{
opener: opener,
}
}

func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
}
// TODO: close lens if rpc error

report := &visormodel.ProcessingReport{
Height: int64(ts.Height()),
StateRoot: ts.ParentState().String(),
}

supply, err := p.node.StateVMCirculatingSupplyInternal(ctx, ts.Key())
if err != nil {
log.Errorw("error received while fetching circulating supply messages, closing lens", "error", err)
if cerr := p.Close(); cerr != nil {
log.Errorw("error received while closing lens", "error", cerr)
}
return nil, nil, err
}

ce := &chainmodel.ChainEconomics{
ParentStateRoot: ts.ParentState().String(),
VestedFil: supply.FilVested.String(),
MinedFil: supply.FilMined.String(),
BurntFil: supply.FilBurnt.String(),
LockedFil: supply.FilLocked.String(),
CirculatingFil: supply.FilCirculating.String(),
}

return ce, report, nil
}

func (p *ChainEconomicsProcessor) Close() error {
if p.closer != nil {
p.closer()
p.closer = nil
}
p.node = nil
return nil
}
Loading