From ae6d3a86ca19e28acead2c6b5b3d21cd946a62aa Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Wed, 2 Dec 2020 10:21:19 +0000 Subject: [PATCH] feat: add watch and walk commands to index chain during traversal (#249) * feat: add watch and walk subcommands to index while traversing chain * Updates from pr review --- chain/actor.go | 240 ++++++++++++++ chain/block.go | 44 +++ chain/economics.go | 71 ++++ chain/indexer.go | 225 +++++++++++++ chain/message.go | 307 ++++++++++++++++++ chain/storage.go | 39 +++ commands/migrate.go | 2 +- commands/run.go | 30 +- commands/setup.go | 74 +++-- commands/walk.go | 131 ++++++++ commands/watch.go | 114 +++++++ go.mod | 5 +- go.sum | 16 +- lens/carrepo/carrepo.go | 6 +- lens/interface.go | 15 + lens/lotus/api.go | 96 +++++- lens/lotusrepo/repo.go | 24 +- lens/sqlrepo/repo.go | 3 +- lens/util/repo.go | 135 +++++++- main.go | 15 + model/actors/common/task.go | 10 + model/actors/market/task.go | 18 +- model/visor/report.go | 64 ++++ .../migrations/22_visor_processing_reports.go | 30 ++ storage/sql.go | 15 +- tasks/actorstate/actor.go | 3 +- tasks/actorstate/actorstate.go | 15 +- tasks/actorstate/actorstatechange.go | 6 +- tasks/actorstate/init.go | 2 +- tasks/actorstate/market.go | 2 +- tasks/actorstate/miner.go | 4 +- tasks/actorstate/multisig.go | 2 +- tasks/actorstate/power.go | 2 +- tasks/actorstate/reward.go | 2 +- tasks/indexer/chainheadindexer.go | 49 ++- tasks/indexer/chainheadindexer_test.go | 4 +- tasks/indexer/chainhistoryindexer.go | 135 ++------ tasks/indexer/chainhistoryindexer_test.go | 6 +- testutil/lens.go | 5 + 39 files changed, 1746 insertions(+), 220 deletions(-) create mode 100644 chain/actor.go create mode 100644 chain/block.go create mode 100644 chain/economics.go create mode 100644 chain/indexer.go create mode 100644 chain/message.go create mode 100644 chain/storage.go create mode 100644 commands/walk.go create mode 100644 commands/watch.go create mode 100644 model/visor/report.go create mode 100644 storage/migrations/22_visor_processing_reports.go diff --git a/chain/actor.go b/chain/actor.go new file mode 100644 index 000000000..567809f0e --- /dev/null +++ b/chain/actor.go @@ -0,0 +1,240 @@ +package chain + +import ( + "context" + "fmt" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + visormodel "github.com/filecoin-project/sentinel-visor/model/visor" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" +) + +type ActorStateProcessor struct { + node lens.API + opener lens.APIOpener + closer lens.APICloser + extractRaw bool + extractParsed bool + lastTipSet *types.TipSet + lastStateTree *state.StateTree +} + +func NewActorStateProcessor(opener lens.APIOpener, extractRaw bool, extractParsed bool) *ActorStateProcessor { + p := &ActorStateProcessor{ + opener: opener, + extractRaw: extractRaw, + extractParsed: extractParsed, + } + return p +} + +func (p *ActorStateProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + if p.node == nil { + node, closer, err := p.opener.Open(ctx) + if err != nil { + return nil, nil, xerrors.Errorf("unable to open lens: %w", err) + } + p.node = node + p.closer = closer + } + + var data model.PersistableWithTx + var report *visormodel.ProcessingReport + var err error + + stateTree, err := state.LoadStateTree(p.node.Store(), ts.ParentState()) + if err != nil { + return nil, nil, xerrors.Errorf("failed to load state tree: %w", err) + } + + if p.lastTipSet != nil && p.lastStateTree != nil { + if p.lastTipSet.Height() > ts.Height() { + // last tipset seen was the child + data, report, err = p.processStateChanges(ctx, p.lastTipSet, ts, p.lastStateTree, stateTree) + } else if p.lastTipSet.Height() < ts.Height() { + // last tipset seen was the parent + data, report, err = p.processStateChanges(ctx, ts, p.lastTipSet, stateTree, p.lastStateTree) + } else { + log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height()) + } + } + + p.lastTipSet = ts + p.lastStateTree = stateTree + + if err != nil { + log.Errorw("error received while processing actors, closing lens", "error", err) + if cerr := p.Close(); cerr != nil { + log.Errorw("error received while closing lens", "error", cerr) + } + } + return data, report, err +} + +func (p *ActorStateProcessor) processStateChanges(ctx context.Context, ts *types.TipSet, pts *types.TipSet, stateTree *state.StateTree, parentStateTree *state.StateTree) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + log.Debugw("processing state changes", "height", ts.Height(), "parent_height", pts.Height()) + + report := &visormodel.ProcessingReport{ + Height: int64(ts.Height()), + StateRoot: ts.ParentState().String(), + Status: visormodel.ProcessingStatusOK, + } + + if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { + report.ErrorsDetected = xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key()) + return nil, report, nil + } + + changes, err := state.Diff(parentStateTree, stateTree) + if err != nil { + report.ErrorsDetected = xerrors.Errorf("failed to diff state trees: %w", err) + return nil, report, nil + } + + ll := log.With("height", int64(ts.Height())) + + ll.Debugw("found actor state changes", "count", len(changes)) + + start := time.Now() + + // Run each task concurrently + results := make(chan *ActorStateResult, len(changes)) + for addr, act := range changes { + go p.runActorStateExtraction(ctx, ts, pts, addr, act, results) + } + + data := make(PersistableWithTxList, 0, len(changes)) + errorsDetected := make([]*ActorStateError, 0, len(changes)) + skippedActors := 0 + + // Gather results + inFlight := len(changes) + for inFlight > 0 { + res := <-results + inFlight-- + elapsed := time.Since(start) + lla := log.With("height", int64(ts.Height()), "actor", actorstate.ActorNameByCode(res.Code), "address", res.Address) + + if res.Error != nil { + lla.Errorw("actor returned with error", "error", res.Error.Error()) + report.ErrorsDetected = append(errorsDetected, &ActorStateError{ + Code: res.Code.String(), + Name: actorstate.ActorNameByCode(res.Code), + Head: res.Head.String(), + Address: res.Address, + Error: res.Error.Error(), + }) + continue + } + + if res.SkippedParse { + lla.Debugw("skipped actor without extracter") + skippedActors++ + } + + lla.Debugw("actor returned with data", "time", elapsed) + data = append(data, res.Data) + } + + if skippedActors > 0 { + report.StatusInformation = fmt.Sprintf("did not parse %d actors", skippedActors) + } + + if len(errorsDetected) != 0 { + report.ErrorsDetected = errorsDetected + } + + return data, report, nil +} + +func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) { + res := &ActorStateResult{ + Code: act.Code, + Head: act.Head, + Address: addrStr, + } + defer func() { + results <- res + }() + + addr, err := address.NewFromString(addrStr) + if err != nil { + res.Error = xerrors.Errorf("failed to parse address: %w", err) + return + } + + info := actorstate.ActorInfo{ + Actor: act, + Address: addr, + ParentStateRoot: pts.ParentState(), + Epoch: ts.Height(), + TipSet: pts.Key(), + ParentTipSet: pts.Parents(), + } + + // TODO: we have the state trees available, can we optimize actor state extraction further? + + var data PersistableWithTxList + + // Extract raw state + if p.extractRaw { + var ae actorstate.ActorExtractor + raw, err := ae.Extract(ctx, info, p.node) + if err != nil { + res.Error = xerrors.Errorf("failed to extract raw actor state: %w", err) + return + } + data = append(data, raw) + } + + if p.extractParsed { + extracter, ok := actorstate.GetActorStateExtractor(act.Code) + if !ok { + res.SkippedParse = true + } else { + // Parse state + parsed, err := extracter.Extract(ctx, info, p.node) + if err != nil { + res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err) + return + } + + data = append(data, parsed) + } + } + res.Data = data +} + +func (p *ActorStateProcessor) Close() error { + if p.closer != nil { + p.closer() + p.closer = nil + } + p.node = nil + return nil +} + +type ActorStateResult struct { + Code cid.Cid + Head cid.Cid + Address string + Error error + SkippedParse bool + Data model.PersistableWithTx +} + +type ActorStateError struct { + Code string + Name string + Head string + Address string + Error string +} diff --git a/chain/block.go b/chain/block.go new file mode 100644 index 000000000..45cdd7ce0 --- /dev/null +++ b/chain/block.go @@ -0,0 +1,44 @@ +package chain + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/sentinel-visor/model" + "github.com/filecoin-project/sentinel-visor/model/blocks" + visormodel "github.com/filecoin-project/sentinel-visor/model/visor" +) + +type BlockProcessor struct { +} + +func NewBlockProcessor() *BlockProcessor { + return &BlockProcessor{} +} + +func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + var pl PersistableWithTxList + for _, bh := range ts.Blocks() { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + + pl = append(pl, blocks.NewBlockHeader(bh)) + pl = append(pl, blocks.NewBlockParents(bh)) + pl = append(pl, blocks.NewDrandBlockEntries(bh)) + } + + report := &visormodel.ProcessingReport{ + Height: int64(ts.Height()), + StateRoot: ts.ParentState().String(), + } + + return pl, report, nil +} + +func (p *BlockProcessor) Close() error { + return nil +} diff --git a/chain/economics.go b/chain/economics.go new file mode 100644 index 000000000..c49c7290f --- /dev/null +++ b/chain/economics.go @@ -0,0 +1,71 @@ +package chain + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/types" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + chainmodel "github.com/filecoin-project/sentinel-visor/model/chain" + visormodel "github.com/filecoin-project/sentinel-visor/model/visor" +) + +type ChainEconomicsProcessor struct { + node lens.API + opener lens.APIOpener + closer lens.APICloser +} + +func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor { + return &ChainEconomicsProcessor{ + opener: opener, + } +} + +func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + if p.node == nil { + node, closer, err := p.opener.Open(ctx) + if err != nil { + return nil, nil, xerrors.Errorf("unable to open lens: %w", err) + } + p.node = node + p.closer = closer + } + // TODO: close lens if rpc error + + report := &visormodel.ProcessingReport{ + Height: int64(ts.Height()), + StateRoot: ts.ParentState().String(), + } + + supply, err := p.node.StateVMCirculatingSupplyInternal(ctx, ts.Key()) + if err != nil { + log.Errorw("error received while fetching circulating supply messages, closing lens", "error", err) + if cerr := p.Close(); cerr != nil { + log.Errorw("error received while closing lens", "error", cerr) + } + return nil, nil, err + } + + ce := &chainmodel.ChainEconomics{ + ParentStateRoot: ts.ParentState().String(), + VestedFil: supply.FilVested.String(), + MinedFil: supply.FilMined.String(), + BurntFil: supply.FilBurnt.String(), + LockedFil: supply.FilLocked.String(), + CirculatingFil: supply.FilCirculating.String(), + } + + return ce, report, nil +} + +func (p *ChainEconomicsProcessor) Close() error { + if p.closer != nil { + p.closer() + p.closer = nil + } + p.node = nil + return nil +} diff --git a/chain/indexer.go b/chain/indexer.go new file mode 100644 index 000000000..dcd85a3c3 --- /dev/null +++ b/chain/indexer.go @@ -0,0 +1,225 @@ +package chain + +import ( + "context" + "sync" + "time" + + "github.com/filecoin-project/lotus/chain/types" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + visormodel "github.com/filecoin-project/sentinel-visor/model/visor" + "github.com/filecoin-project/sentinel-visor/tasks/indexer" +) + +const ( + ActorStateTask = "actorstates" // task that extracts both raw and parsed actor states + ActorRawStateTask = "actorstatesraw" // task that only extracts raw actor state + ActorParsedStateTask = "actorstatesparsed" // task that only extracts parsed actor state + BlocksTask = "blocks" // task that extracts block data + MessagesTask = "messages" // task that extracts message data + ChainEconomicsTask = "chaineconomics" // task that extracts chain economics data +) + +var log = logging.Logger("chain") + +var _ indexer.TipSetObserver = (*TipSetIndexer)(nil) + +// A TipSetWatcher waits for tipsets and persists their block data into a database. +type TipSetIndexer struct { + window time.Duration + opener lens.APIOpener + storage Storage + processors map[string]TipSetProcessor + name string + persistSlot chan struct{} +} + +// A TipSetIndexer extracts block, message and actor state data from a tipset and persists it to storage. Extraction +// and persistence are concurrent. Extraction of the a tipset can proceed while data from the previous extraction is +// being persisted. The indexer may be given a time window in which to complete data extraction. The name of the +// indexer is used as the reporter in the visor_processing_reports table. +func NewTipSetIndexer(o lens.APIOpener, d Storage, window time.Duration, name string, tasks []string) (*TipSetIndexer, error) { + tsi := &TipSetIndexer{ + storage: d, + window: window, + name: name, + persistSlot: make(chan struct{}, 1), // allow one concurrent persistence job + processors: map[string]TipSetProcessor{}, + } + + for _, task := range tasks { + switch task { + case BlocksTask: + tsi.processors[BlocksTask] = NewBlockProcessor() + case MessagesTask: + tsi.processors[MessagesTask] = NewMessageProcessor(o) + case ChainEconomicsTask: + tsi.processors[ChainEconomicsTask] = NewChainEconomicsProcessor(o) + case ActorStateTask: + tsi.processors[ActorStateTask] = NewActorStateProcessor(o, true, true) + case ActorRawStateTask: + tsi.processors[ActorRawStateTask] = NewActorStateProcessor(o, true, false) + case ActorParsedStateTask: + tsi.processors[ActorRawStateTask] = NewActorStateProcessor(o, false, true) + default: + return nil, xerrors.Errorf("unknown task: %s", task) + } + } + return tsi, nil +} + +// TipSet is called when a new tipset has been discovered +func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error { + var cancel func() + var tctx context.Context // cancellable context for the task + if t.window > 0 { + // Do as much indexing as possible in the specified time window (usually one epoch when following head of chain) + // Anything not completed in that time will be marked as incomplete + tctx, cancel = context.WithTimeout(ctx, t.window) + } else { + // Ensure all goroutines are stopped when we exit + tctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + start := time.Now() + + // Run each task concurrently + results := make(chan *TaskResult, len(t.processors)) + for name, p := range t.processors { + go t.runProcessor(tctx, p, name, ts, results) + } + + ll := log.With("height", int64(ts.Height())) + + // A map to gather the persistable outputs from each task + taskOutputs := make(map[string]PersistableWithTxList, len(t.processors)) + + // Wait for all tasks to complete + inFlight := len(t.processors) + for inFlight > 0 { + res := <-results + inFlight-- + + llt := ll.With("task", res.Task) + + // Was there a fatal error? + if res.Error != nil { + llt.Errorw("task returned with error", "error", res.Error.Error()) + return res.Error + } + + if res.Report == nil { + // Nothing was done for this tipset + llt.Debugw("task returned with no report") + continue + } + + // Fill in some report metadata + res.Report.Reporter = t.name + res.Report.Task = res.Task + res.Report.StartedAt = start + res.Report.CompletedAt = time.Now() + + if res.Report.ErrorsDetected != nil { + res.Report.Status = visormodel.ProcessingStatusError + } else if res.Report.StatusInformation != "" { + res.Report.Status = visormodel.ProcessingStatusInfo + } else { + res.Report.Status = visormodel.ProcessingStatusOK + } + + llt.Infow("task report", "status", res.Report.Status, "time", res.Report.CompletedAt.Sub(res.Report.StartedAt)) + + // Persist the processing report and the data in a single transaction + taskOutputs[res.Task] = PersistableWithTxList{res.Report, res.Data} + } + + if len(taskOutputs) == 0 { + // Nothing to persist + ll.Debugw("tipset complete, nothing to persist", "total_time", time.Since(start)) + return nil + } + + // wait until there is an empty slot before persisting + ll.Debugw("waiting to persist data", "time", time.Since(start)) + select { + case <-ctx.Done(): + return ctx.Err() + case t.persistSlot <- struct{}{}: + // Slot is free so we can continue + } + + // Persist all results + go func() { + // free up the slot when done + defer func() { + <-t.persistSlot + }() + + ll.Debugw("persisting data", "time", time.Since(start)) + var wg sync.WaitGroup + wg.Add(len(taskOutputs)) + + // Persist each processor's data concurrently since they don't overlap + for task, p := range taskOutputs { + go func(task string, p model.PersistableWithTx) { + defer wg.Done() + if err := t.storage.Persist(ctx, p); err != nil { + ll.Errorw("persistence failed", "task", task, "error", err) + return + } + ll.Debugw("task data persisted", "task", task, "time", time.Since(start)) + }(task, p) + } + wg.Wait() + ll.Debugw("tipset complete", "total_time", time.Since(start)) + }() + + return nil +} + +func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, name string, ts *types.TipSet, results chan *TaskResult) { + data, report, err := p.ProcessTipSet(ctx, ts) + if err != nil { + results <- &TaskResult{ + Task: name, + Error: err, + } + return + } + results <- &TaskResult{ + Task: name, + Report: report, + Data: data, + } +} + +func (t *TipSetIndexer) Close() error { + for name, p := range t.processors { + if err := p.Close(); err != nil { + log.Errorw("error received while closing task processor", "error", err, "task", name) + } + } + return nil +} + +// A TaskResult is either some data to persist or an error which indicates that the task did not complete. Partial +// completions are possible provided the Data contains a persistable log of the results. +type TaskResult struct { + Task string + Error error + Report *visormodel.ProcessingReport + Data model.PersistableWithTx +} + +type TipSetProcessor interface { + // ProcessTipSet processes a tipset. If error is non-nil then the processor encountered a fatal error. + // Any data returned must be accompanied by a processing report. + ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) + Close() error +} diff --git a/chain/message.go b/chain/message.go new file mode 100644 index 000000000..6ab8254e7 --- /dev/null +++ b/chain/message.go @@ -0,0 +1,307 @@ +package chain + +import ( + "bytes" + "context" + "fmt" + "math" + "math/big" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/statediff" + "github.com/filecoin-project/statediff/codec/fcjson" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + derivedmodel "github.com/filecoin-project/sentinel-visor/model/derived" + messagemodel "github.com/filecoin-project/sentinel-visor/model/messages" + visormodel "github.com/filecoin-project/sentinel-visor/model/visor" +) + +type MessageProcessor struct { + node lens.API + opener lens.APIOpener + closer lens.APICloser + lastTipSet *types.TipSet +} + +func NewMessageProcessor(opener lens.APIOpener) *MessageProcessor { + return &MessageProcessor{ + opener: opener, + } +} + +func (p *MessageProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + if p.node == nil { + node, closer, err := p.opener.Open(ctx) + if err != nil { + return nil, nil, xerrors.Errorf("unable to open lens: %w", err) + } + p.node = node + p.closer = closer + } + + var data model.PersistableWithTx + var report *visormodel.ProcessingReport + var err error + + if p.lastTipSet != nil { + if p.lastTipSet.Height() > ts.Height() { + // last tipset seen was the child + data, report, err = p.processExecutedMessages(ctx, p.lastTipSet, ts) + } else if p.lastTipSet.Height() < ts.Height() { + // last tipset seen was the parent + data, report, err = p.processExecutedMessages(ctx, ts, p.lastTipSet) + } else { + log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height()) + } + } + + p.lastTipSet = ts + + if err != nil { + log.Errorw("error received while processing messages, closing lens", "error", err) + if cerr := p.Close(); cerr != nil { + log.Errorw("error received while closing lens", "error", cerr) + } + } + return data, report, err +} + +// Note that all this processing is in the context of the parent tipset. The child is only used for receipts +func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) { + report := &visormodel.ProcessingReport{ + Height: int64(pts.Height()), + StateRoot: pts.ParentState().String(), + } + + emsgs, err := p.node.GetExecutedMessagesForTipset(ctx, ts, pts) + if err != nil { + report.ErrorsDetected = xerrors.Errorf("failed to get executed messages: %w", err) + return nil, report, nil + } + + var ( + messageResults = make(messagemodel.Messages, 0, len(emsgs)) + receiptResults = make(messagemodel.Receipts, 0, len(emsgs)) + blockMessageResults = make(messagemodel.BlockMessages, 0, len(emsgs)) + parsedMessageResults = make(messagemodel.ParsedMessages, 0, len(emsgs)) + gasOutputsResults = make(derivedmodel.GasOutputsList, 0, len(emsgs)) + errorsDetected = make([]*MessageError, 0, len(emsgs)) + ) + + var ( + seen = make(map[cid.Cid]bool, len(emsgs)) + + totalGasLimit int64 + totalUniqGasLimit int64 + ) + + for _, m := range emsgs { + // Stop processing if we have been told to cancel + select { + case <-ctx.Done(): + return nil, nil, xerrors.Errorf("context done: %w", ctx.Err()) + default: + } + + // Record which blocks had which messages, regardless of duplicates + for _, blockCid := range m.Blocks { + blockMessageResults = append(blockMessageResults, &messagemodel.BlockMessage{ + Height: int64(m.Height), + Block: blockCid.String(), + Message: m.Cid.String(), + }) + totalGasLimit += m.Message.GasLimit + } + + if seen[m.Cid] { + continue + } + seen[m.Cid] = true + totalUniqGasLimit += m.Message.GasLimit + + var msgSize int + if b, err := m.Message.Serialize(); err == nil { + msgSize = len(b) + } else { + errorsDetected = append(errorsDetected, &MessageError{ + Cid: m.Cid, + Error: xerrors.Errorf("failed to serialize message: %w", err).Error(), + }) + } + + // record all unique messages + msg := &messagemodel.Message{ + Height: int64(m.Height), + Cid: m.Cid.String(), + From: m.Message.From.String(), + To: m.Message.To.String(), + Value: m.Message.Value.String(), + GasFeeCap: m.Message.GasFeeCap.String(), + GasPremium: m.Message.GasPremium.String(), + GasLimit: m.Message.GasLimit, + SizeBytes: msgSize, + Nonce: m.Message.Nonce, + Method: uint64(m.Message.Method), + } + messageResults = append(messageResults, msg) + + rcpt := &messagemodel.Receipt{ + Height: int64(ts.Height()), // this is the child height + Message: msg.Cid, + StateRoot: ts.ParentState().String(), + Idx: int(m.Index), + ExitCode: int64(m.Receipt.ExitCode), + GasUsed: m.Receipt.GasUsed, + } + receiptResults = append(receiptResults, rcpt) + + outputs := p.node.ComputeGasOutputs(m.Receipt.GasUsed, m.Message.GasLimit, m.BlockHeader.ParentBaseFee, m.Message.GasFeeCap, m.Message.GasPremium) + gasOutput := &derivedmodel.GasOutputs{ + // TODO: add Height and ActorName as per https://github.com/filecoin-project/sentinel-visor/pull/270 + // Height: msg.Height, + Cid: msg.Cid, + From: msg.From, + To: msg.To, + Value: msg.Value, + GasFeeCap: msg.GasFeeCap, + GasPremium: msg.GasPremium, + GasLimit: msg.GasLimit, + Nonce: msg.Nonce, + Method: msg.Method, + StateRoot: m.BlockHeader.ParentStateRoot.String(), + ExitCode: rcpt.ExitCode, + GasUsed: rcpt.GasUsed, + ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), + + // TODO: is SizeBytes really needed here? + SizeBytes: msgSize, + BaseFeeBurn: outputs.BaseFeeBurn.String(), + OverEstimationBurn: outputs.OverEstimationBurn.String(), + MinerPenalty: outputs.MinerPenalty.String(), + MinerTip: outputs.MinerTip.String(), + Refund: outputs.Refund.String(), + GasRefund: outputs.GasRefund, + GasBurned: outputs.GasBurned, + } + gasOutputsResults = append(gasOutputsResults, gasOutput) + + method, params, err := p.parseMessageParams(m.Message, m.ToActorCode) + if err == nil { + pm := &messagemodel.ParsedMessage{ + Height: msg.Height, + Cid: msg.Cid, + From: msg.From, + To: msg.To, + Value: msg.Value, + Method: method, + Params: params, + } + parsedMessageResults = append(parsedMessageResults, pm) + } else { + errorsDetected = append(errorsDetected, &MessageError{ + Cid: m.Cid, + Error: xerrors.Errorf("failed to parse message params: %w", err).Error(), + }) + } + } + + newBaseFee := store.ComputeNextBaseFee(pts.Blocks()[0].ParentBaseFee, totalUniqGasLimit, len(pts.Blocks()), pts.Height()) + baseFeeRat := new(big.Rat).SetFrac(newBaseFee.Int, new(big.Int).SetUint64(build.FilecoinPrecision)) + baseFee, _ := baseFeeRat.Float64() + + baseFeeChange := new(big.Rat).SetFrac(newBaseFee.Int, pts.Blocks()[0].ParentBaseFee.Int) + baseFeeChangeF, _ := baseFeeChange.Float64() + + messageGasEconomyResult := &messagemodel.MessageGasEconomy{ + Height: int64(pts.Height()), + StateRoot: pts.ParentState().String(), + GasLimitTotal: totalGasLimit, + GasLimitUniqueTotal: totalUniqGasLimit, + BaseFee: baseFee, + BaseFeeChangeLog: math.Log(baseFeeChangeF) / math.Log(1.125), + GasFillRatio: float64(totalGasLimit) / float64(len(pts.Blocks())*build.BlockGasTarget), + GasCapacityRatio: float64(totalUniqGasLimit) / float64(len(pts.Blocks())*build.BlockGasTarget), + GasWasteRatio: float64(totalGasLimit-totalUniqGasLimit) / float64(len(pts.Blocks())*build.BlockGasTarget), + } + + if len(errorsDetected) != 0 { + report.ErrorsDetected = errorsDetected + } + + return PersistableWithTxList{ + messageResults, + receiptResults, + blockMessageResults, + parsedMessageResults, + gasOutputsResults, + messageGasEconomyResult, + }, report, nil +} + +func (p *MessageProcessor) parseMessageParams(m *types.Message, destCode cid.Cid) (string, string, error) { + actor, ok := statediff.LotusActorCodes[destCode.String()] + if !ok { + actor = statediff.LotusTypeUnknown + } + var params ipld.Node + var method string + var err error + + // TODO: the following closure is in place to handle the potential for panic + // in ipld-prime. Can be removed once fixed upstream. + // tracking issue: https://github.com/ipld/go-ipld-prime/issues/97 + func() { + defer func() { + if r := recover(); r != nil { + err = xerrors.Errorf("recovered panic: %v", r) + } + }() + params, method, err = statediff.ParseParams(m.Params, int(m.Method), actor) + }() + if err != nil && actor != statediff.LotusTypeUnknown { + // fall back to generic cbor->json conversion. + actor = statediff.LotusTypeUnknown + params, method, err = statediff.ParseParams(m.Params, int(m.Method), actor) + } + if method == "Unknown" { + method = fmt.Sprintf("%s.%d", actor, m.Method) + } + if err != nil { + log.Warnf("failed to parse parameters of message %s: %v", m.Cid, err) + // this can occur when the message is not valid cbor + return method, "", err + } + if params == nil { + return method, "", nil + } + + buf := bytes.NewBuffer(nil) + if err := fcjson.Encoder(params, buf); err != nil { + return "", "", xerrors.Errorf("json encode: %w", err) + } + + encoded := string(bytes.ReplaceAll(bytes.ToValidUTF8(buf.Bytes(), []byte{}), []byte{0x00}, []byte{})) + + return method, encoded, nil +} + +func (p *MessageProcessor) Close() error { + if p.closer != nil { + p.closer() + p.closer = nil + } + p.node = nil + return nil +} + +type MessageError struct { + Cid cid.Cid + Error string +} diff --git a/chain/storage.go b/chain/storage.go new file mode 100644 index 000000000..0457a09d5 --- /dev/null +++ b/chain/storage.go @@ -0,0 +1,39 @@ +package chain + +import ( + "context" + + "github.com/go-pg/pg/v10" + + "github.com/filecoin-project/sentinel-visor/model" +) + +type Storage interface { + Persist(ctx context.Context, p model.PersistableWithTx) error +} + +var _ Storage = (*NullStorage)(nil) + +type NullStorage struct { +} + +func (*NullStorage) Persist(ctx context.Context, p model.PersistableWithTx) error { + log.Debugw("Not persisting data") + return nil +} + +type PersistableWithTxList []model.PersistableWithTx + +var _ model.PersistableWithTx = (PersistableWithTxList)(nil) + +func (pl PersistableWithTxList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + for _, p := range pl { + if p == nil { + continue + } + if err := p.PersistWithTx(ctx, tx); err != nil { + return err + } + } + return nil +} diff --git a/commands/migrate.go b/commands/migrate.go index b951ed607..a5bb038f5 100644 --- a/commands/migrate.go +++ b/commands/migrate.go @@ -28,7 +28,7 @@ var Migrate = &cli.Command{ ctx := cctx.Context - db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size")) + db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name")) if err != nil { return xerrors.Errorf("connect database: %w", err) } diff --git a/commands/run.go b/commands/run.go index fbdc728d5..cb61adc02 100644 --- a/commands/run.go +++ b/commands/run.go @@ -7,6 +7,7 @@ import ( "math" "os" "os/signal" + "strings" "syscall" "time" @@ -111,15 +112,15 @@ var Run = &cli.Command{ Usage: "Number of actor state processors to start", EnvVars: []string{"VISOR_ACTORSTATE_WORKERS"}, }, - &cli.StringSliceFlag{ + &cli.StringFlag{ Name: "actorstate-include", - Usage: "List of actor codes that should be procesed by actor state processors", + Usage: "Comma separated list of actor codes that should have their state extracted by actor state processors. Use 'all' to include all supported.", DefaultText: "all supported", EnvVars: []string{"VISOR_ACTORSTATE_INCLUDE"}, }, - &cli.StringSliceFlag{ + &cli.StringFlag{ Name: "actorstate-exclude", - Usage: "List of actor codes that should be not be procesed by actor state processors", + Usage: "Comma separated list of actor codes that should not have their state extracted by actor state processors. Use 'all' to exclude all actors.", DefaultText: "none", EnvVars: []string{"VISOR_ACTORSTATE_EXCLUDE"}, }, @@ -280,9 +281,11 @@ var Run = &cli.Command{ // Add one indexing task to follow the chain head if cctx.Bool("indexhead") { + blockIndexer := indexer.NewTipSetBlockIndexer(rctx.DB) + scheduler.Add(schedule.TaskConfig{ Name: "ChainHeadIndexer", - Task: indexer.NewChainHeadIndexer(rctx.DB, rctx.Opener, cctx.Int("indexhead-confidence")), + Task: indexer.NewChainHeadIndexer(blockIndexer, rctx.Opener, cctx.Int("indexhead-confidence")), Locker: NewGlobalSingleton(ChainHeadIndexerLockID, rctx.DB), // only want one forward indexer anywhere to be running RestartOnFailure: false, RestartOnCompletion: false, @@ -292,9 +295,10 @@ var Run = &cli.Command{ // Add one indexing task to walk the chain history if cctx.Bool("indexhistory") { + blockIndexer := indexer.NewTipSetBlockIndexer(rctx.DB) scheduler.Add(schedule.TaskConfig{ Name: "ChainHistoryIndexer", - Task: indexer.NewChainHistoryIndexer(rctx.DB, rctx.Opener, cctx.Int("indexhistory-batch"), heightFrom, heightTo), + Task: indexer.NewChainHistoryIndexer(blockIndexer, rctx.Opener, heightFrom, heightTo), Locker: NewGlobalSingleton(ChainHistoryIndexerLockID, rctx.DB), // only want one history indexer anywhere to be running RestartOnFailure: false, RestartOnCompletion: false, @@ -446,13 +450,21 @@ var Run = &cli.Command{ // getActorCodes parses the cli flags to obtain a list of actor codes for the actor state processor. We support some // common short names for actors or the cid of the actor code. func getActorCodes(cctx *cli.Context) ([]cid.Cid, error) { - include := cctx.StringSlice("actorstate-include") - exclude := cctx.StringSlice("actorstate-exclude") - if len(include) == 0 && len(exclude) == 0 { + includeStr := cctx.String("actorstate-include") + excludeStr := cctx.String("actorstate-exclude") + if (includeStr == "" || includeStr == "all") && excludeStr == "" { // By default we process all supported actor types return actorstate.SupportedActorCodes(), nil } + if includeStr == "" && excludeStr == "all" { + // Excluding all actors + return []cid.Cid{}, nil + } + + include := strings.Split(includeStr, ",") + exclude := strings.Split(excludeStr, ",") + if len(include) != 0 && len(exclude) != 0 { return nil, fmt.Errorf("cannot specify both actorstate-include and actorstate-exclude") } diff --git a/commands/setup.go b/commands/setup.go index 081cdc820..25456ab86 100644 --- a/commands/setup.go +++ b/commands/setup.go @@ -31,6 +31,7 @@ import ( sqlapi "github.com/filecoin-project/sentinel-visor/lens/sqlrepo" "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/storage" + "github.com/filecoin-project/sentinel-visor/version" ) var log = logging.Logger("visor") @@ -45,36 +46,35 @@ type RunContext struct { // SetupStorageAndAPI setups of the sentinel database and returns a // ready-to-use RunContext with Openers and Closers using the chosen lens. func SetupStorageAndAPI(cctx *cli.Context) (context.Context, *RunContext, error) { - var opener lens.APIOpener // the api opener that is used by tasks - var closer lens.APICloser // a closer that cleans up the opener when exiting the application - var err error - ctx := cctx.Context - - if cctx.String("lens") == "lotus" { - opener, closer, err = vapi.NewAPIOpener(cctx, 10_000) - } else if cctx.String("lens") == "lotusrepo" { - opener, closer, err = repoapi.NewAPIOpener(cctx) - } else if cctx.String("lens") == "carrepo" { - opener, closer, err = carapi.NewAPIOpener(cctx) - } else if cctx.String("lens") == "sql" { - opener, closer, err = sqlapi.NewAPIOpener(cctx) - } else if cctx.String("lens") == "s3" { - opener, closer, err = s3api.NewAPIOpener(cctx) - } + opener, closer, err := setupLens(cctx) if err != nil { - return nil, nil, xerrors.Errorf("get node api: %w", err) + return nil, nil, xerrors.Errorf("setup lens: %w", err) } - db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size")) + db, err := setupDatabase(cctx) if err != nil { closer() - return nil, nil, xerrors.Errorf("new database: %w", err) + return nil, nil, xerrors.Errorf("setup database: %w", err) + } + + return ctx, &RunContext{ + Opener: opener, + Closer: closer, + DB: db, + }, nil +} + +func setupDatabase(cctx *cli.Context) (*storage.Database, error) { + ctx := cctx.Context + db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name")) + if err != nil { + return nil, xerrors.Errorf("new database: %w", err) } if err := db.Connect(ctx); err != nil { if !errors.Is(err, storage.ErrSchemaTooOld) || !cctx.Bool("allow-schema-migration") { - return nil, nil, xerrors.Errorf("connect database: %w", err) + return nil, xerrors.Errorf("connect database: %w", err) } log.Infof("connect database: %v", err.Error()) @@ -83,29 +83,39 @@ func SetupStorageAndAPI(cctx *cli.Context) (context.Context, *RunContext, error) log.Info("Migrating schema to latest version") err := db.MigrateSchema(ctx) if err != nil { - closer() - return nil, nil, xerrors.Errorf("migrate schema: %w", err) + return nil, xerrors.Errorf("migrate schema: %w", err) } // Try to connect again if err := db.Connect(ctx); err != nil { - closer() - return nil, nil, xerrors.Errorf("connect database: %w", err) + return nil, xerrors.Errorf("connect database: %w", err) } } // Make sure the schema is a compatible with what this version of Visor requires if err := db.VerifyCurrentSchema(ctx); err != nil { - closer() db.Close(ctx) - return nil, nil, xerrors.Errorf("verify schema: %w", err) + return nil, xerrors.Errorf("verify schema: %w", err) } - return ctx, &RunContext{ - Opener: opener, - Closer: closer, - DB: db, - }, nil + return db, nil +} + +func setupLens(cctx *cli.Context) (lens.APIOpener, lens.APICloser, error) { + switch cctx.String("lens") { + case "lotus": + return vapi.NewAPIOpener(cctx, 10_000) + case "lotusrepo": + return repoapi.NewAPIOpener(cctx) + case "carrepo": + return carapi.NewAPIOpener(cctx) + case "sql": + return sqlapi.NewAPIOpener(cctx) + case "s3": + return s3api.NewAPIOpener(cctx) + default: + return nil, nil, xerrors.Errorf("unsupported lens type: %s", cctx.String("lens")) + } } func setupTracing(cctx *cli.Context) (func(), error) { @@ -186,6 +196,8 @@ func setupLogging(cctx *cli.Context) error { } + log.Infof("Visor version:%s", version.String()) + return nil } diff --git a/commands/walk.go b/commands/walk.go new file mode 100644 index 000000000..6fd568b0d --- /dev/null +++ b/commands/walk.go @@ -0,0 +1,131 @@ +package commands + +import ( + "context" + "errors" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/filecoin-project/sentinel-visor/schedule" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/chain" + "github.com/filecoin-project/sentinel-visor/tasks/indexer" +) + +var Walk = &cli.Command{ + Name: "walk", + Usage: "Walk a range of the filecoin blockchain and process blocks as they are discovered.", + Flags: []cli.Flag{ + &cli.Int64Flag{ + Name: "from", + Usage: "Limit actor and message processing to tipsets at or above `HEIGHT`", + EnvVars: []string{"VISOR_HEIGHT_FROM"}, + }, + &cli.Int64Flag{ + Name: "to", + Usage: "Limit actor and message processing to tipsets at or below `HEIGHT`", + Value: estimateCurrentEpoch(), + DefaultText: "MaxInt64", + EnvVars: []string{"VISOR_HEIGHT_TO"}, + }, + &cli.StringFlag{ + Name: "tasks", + Usage: "Comma separated list of tasks to run.", + Value: strings.Join([]string{chain.BlocksTask, chain.MessagesTask, chain.ChainEconomicsTask, chain.ActorStateTask}, ","), + EnvVars: []string{"VISOR_WALK_TASKS"}, + }, + }, + Action: walk, +} + +func walk(cctx *cli.Context) error { + // Validate flags + heightFrom := cctx.Int64("from") + heightTo := cctx.Int64("to") + + if heightFrom > heightTo { + return xerrors.Errorf("--from must not be greater than --to") + } + + tasks := strings.Split(cctx.String("tasks"), ",") + + if err := setupLogging(cctx); err != nil { + return xerrors.Errorf("setup logging: %w", err) + } + + if err := setupMetrics(cctx); err != nil { + return xerrors.Errorf("setup metrics: %w", err) + } + + tcloser, err := setupTracing(cctx) + if err != nil { + return xerrors.Errorf("setup tracing: %w", err) + } + defer tcloser() + + lensOpener, lensCloser, err := setupLens(cctx) + if err != nil { + return xerrors.Errorf("setup lens: %w", err) + } + defer func() { + lensCloser() + }() + + var storage chain.Storage = &chain.NullStorage{} + if cctx.String("db") == "" { + log.Warnw("database not specified, data will not be persisted") + } else { + db, err := setupDatabase(cctx) + if err != nil { + return xerrors.Errorf("setup database: %w", err) + } + storage = db + } + + // Set up a context that is canceled when the command is interrupted + ctx, cancel := context.WithCancel(cctx.Context) + defer cancel() + + // Set up a signal handler to cancel the context + go func() { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT) + select { + case <-interrupt: + cancel() + case <-ctx.Done(): + } + }() + + scheduler := schedule.NewScheduler(cctx.Duration("task-delay")) + + tsIndexer, err := chain.NewTipSetIndexer(lensOpener, storage, 0, cctx.String("name"), tasks) + if err != nil { + return xerrors.Errorf("setup indexer: %w", err) + } + defer func() { + if err := tsIndexer.Close(); err != nil { + log.Errorw("failed to close tipset indexer cleanly", "error", err) + } + }() + + scheduler.Add(schedule.TaskConfig{ + Name: "ChainHistoryIndexer", + Task: indexer.NewChainHistoryIndexer(tsIndexer, lensOpener, heightFrom, heightTo), + RestartOnFailure: true, + RestartOnCompletion: false, + RestartDelay: time.Minute, + }) + + // Start the scheduler and wait for it to complete or to be cancelled. + err = scheduler.Run(ctx) + if !errors.Is(err, context.Canceled) { + return err + } + return nil +} diff --git a/commands/watch.go b/commands/watch.go new file mode 100644 index 000000000..3577a7c78 --- /dev/null +++ b/commands/watch.go @@ -0,0 +1,114 @@ +package commands + +import ( + "context" + "errors" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/filecoin-project/sentinel-visor/schedule" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/chain" + "github.com/filecoin-project/sentinel-visor/tasks/indexer" +) + +var Watch = &cli.Command{ + Name: "watch", + Usage: "Watch the head of the filecoin blockchain and process blocks as they arrive.", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "indexhead-confidence", + Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database", + Value: 2, + EnvVars: []string{"VISOR_INDEXHEAD_CONFIDENCE"}, + }, + &cli.StringFlag{ + Name: "tasks", + Usage: "Comma separated list of tasks to run.", + Value: strings.Join([]string{chain.BlocksTask, chain.MessagesTask, chain.ChainEconomicsTask, chain.ActorRawStateTask}, ","), + EnvVars: []string{"VISOR_WATCH_TASKS"}, + }, + }, + Action: watch, +} + +func watch(cctx *cli.Context) error { + tasks := strings.Split(cctx.String("tasks"), ",") + + if err := setupLogging(cctx); err != nil { + return xerrors.Errorf("setup logging: %w", err) + } + + if err := setupMetrics(cctx); err != nil { + return xerrors.Errorf("setup metrics: %w", err) + } + + tcloser, err := setupTracing(cctx) + if err != nil { + return xerrors.Errorf("setup tracing: %w", err) + } + defer tcloser() + + lensOpener, lensCloser, err := setupLens(cctx) + if err != nil { + return xerrors.Errorf("setup lens: %w", err) + } + defer func() { + lensCloser() + }() + + var storage chain.Storage = &chain.NullStorage{} + if cctx.String("db") == "" { + log.Warnw("database not specified, data will not be persisted") + } else { + db, err := setupDatabase(cctx) + if err != nil { + return xerrors.Errorf("setup database: %w", err) + } + storage = db + } + + // Set up a context that is canceled when the command is interrupted + ctx, cancel := context.WithCancel(cctx.Context) + defer cancel() + + // Set up a signal handler to cancel the context + go func() { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT) + select { + case <-interrupt: + cancel() + case <-ctx.Done(): + } + }() + + tsIndexer, err := chain.NewTipSetIndexer(lensOpener, storage, builtin.EpochDurationSeconds*time.Second, cctx.String("name"), tasks) + if err != nil { + return xerrors.Errorf("setup indexer: %w", err) + } + + scheduler := schedule.NewScheduler(cctx.Duration("task-delay")) + scheduler.Add(schedule.TaskConfig{ + Name: "ChainHeadIndexer", + Task: indexer.NewChainHeadIndexer(tsIndexer, lensOpener, cctx.Int("indexhead-confidence")), + // TODO: add locker + // Locker: NewGlobalSingleton(ChainHeadIndexerLockID, rctx.db), // only want one forward indexer anywhere to be running + RestartOnFailure: true, + RestartOnCompletion: true, // we always want the indexer to be running + RestartDelay: time.Minute, + }) + + // Start the scheduler and wait for it to complete or to be cancelled. + err = scheduler.Run(ctx) + if !errors.Is(err, context.Canceled) { + return err + } + return nil +} diff --git a/go.mod b/go.mod index 94db09352..dd0d965c1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( contrib.go.opencensus.io/exporter/prometheus v0.1.0 github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect + github.com/OneOfOne/xxhash v1.2.8 // indirect github.com/fatih/color v1.10.0 // indirect github.com/filecoin-project/go-address v0.0.5-0.20201103152444-f2023ef3f5bb github.com/filecoin-project/go-bitfield v0.2.3-0.20201110211213-fe2c1862e816 @@ -22,12 +23,14 @@ require ( github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 + github.com/ipfs/go-ipfs-blockstore v1.0.3 // indirect github.com/ipfs/go-ipld-cbor v0.0.5 github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4 github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 github.com/jackc/pgx/v4 v4.9.0 github.com/lib/pq v1.8.0 github.com/libp2p/go-libp2p-core v0.7.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multibase v0.0.3 @@ -36,7 +39,7 @@ require ( github.com/raulk/clock v1.1.0 github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/testify v1.6.1 - github.com/urfave/cli/v2 v2.2.0 + github.com/urfave/cli/v2 v2.3.0 github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163 github.com/willscott/carbs v0.0.3 go.opencensus.io v0.22.4 diff --git a/go.sum b/go.sum index f273352e6..7f930089b 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,9 @@ github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K1 github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= +github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= @@ -161,7 +162,6 @@ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -588,7 +588,6 @@ github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0 github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM= github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10= -github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo= github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM= @@ -600,8 +599,9 @@ github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2Is github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU= github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= -github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E= github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= +github.com/ipfs/go-ipfs-blockstore v1.0.3 h1:RDhK6fdg5YsonkpMuMpdvk/pRtOQlrIRIybuQfkvB2M= +github.com/ipfs/go-ipfs-blockstore v1.0.3/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw= @@ -714,7 +714,6 @@ github.com/ipld/go-ipld-prime-proto v0.1.0 h1:j7gjqrfwbT4+gXpHwEx5iMssma3mnctC7Y github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= -github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -733,7 +732,6 @@ github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye47 github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -1216,7 +1214,6 @@ github.com/mdlayher/netlink v0.0.0-20190828143259-340058475d09/go.mod h1:KxeJAFO github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M= github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY= github.com/mdlayher/wifi v0.0.0-20190303161829-b1436901ddee/go.mod h1:Evt/EIne46u9PtQbeTx2NTcqURpr5K4SvKtGmBuDPN8= -github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -1433,7 +1430,6 @@ github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= -github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -1543,11 +1539,11 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= -github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= diff --git a/lens/carrepo/carrepo.go b/lens/carrepo/carrepo.go index 70f214c1d..ff29f1551 100644 --- a/lens/carrepo/carrepo.go +++ b/lens/carrepo/carrepo.go @@ -4,13 +4,13 @@ import ( "context" "fmt" - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/urfave/cli/v2" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/sentinel-visor/lens/util" + "github.com/urfave/cli/v2" "github.com/willscott/carbs" + + "github.com/filecoin-project/sentinel-visor/lens" ) func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) { diff --git a/lens/interface.go b/lens/interface.go index 4b2376e51..da78fcb10 100644 --- a/lens/interface.go +++ b/lens/interface.go @@ -5,14 +5,17 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/specs-actors/actors/util/adt" + "github.com/ipfs/go-cid" ) type API interface { Store() adt.Store api.FullNode ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap, gasPremium abi.TokenAmount) vm.GasOutputs + GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*ExecutedMessage, error) } type APICloser func() @@ -20,3 +23,15 @@ type APICloser func() type APIOpener interface { Open(context.Context) (API, APICloser, error) } + +type ExecutedMessage struct { + Cid cid.Cid + Height abi.ChainEpoch + Message *types.Message + Receipt *types.MessageReceipt + BlockHeader *types.BlockHeader + Blocks []cid.Cid // blocks this message appeared in + Index uint64 // Message and receipt sequence in tipset + FromActorCode cid.Cid // code of the actor the message is from + ToActorCode cid.Cid // code of the actor the message is to +} diff --git a/lens/lotus/api.go b/lens/lotus/api.go index 223587b17..3ac205e05 100644 --- a/lens/lotus/api.go +++ b/lens/lotus/api.go @@ -3,18 +3,19 @@ package lotus import ( "context" - cid "github.com/ipfs/go-cid" - "go.opencensus.io/tag" - "go.opentelemetry.io/otel/api/global" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/specs-actors/actors/util/adt" + cid "github.com/ipfs/go-cid" + "go.opencensus.io/tag" + "go.opentelemetry.io/otel/api/global" + "golang.org/x/xerrors" "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/metrics" @@ -188,3 +189,90 @@ func (aw *APIWrapper) StateVMCirculatingSupplyInternal(ctx context.Context, tsk defer span.End() return aw.FullNode.StateVMCirculatingSupplyInternal(ctx, tsk) } + +// GetExecutedMessagesForTipset returns a list of messages sent as part of pts (parent) with receipts found in ts (child). +// No attempt at deduplication of messages is made. +func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { + return nil, xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key()) + } + + stateTree, err := state.LoadStateTree(aw.Store(), ts.ParentState()) + if err != nil { + return nil, xerrors.Errorf("load state tree: %w", err) + } + + // Build a lookup of actor codes + actorCodes := map[address.Address]cid.Cid{} + if err := stateTree.ForEach(func(a address.Address, act *types.Actor) error { + actorCodes[a] = act.Code + return nil + }); err != nil { + return nil, xerrors.Errorf("iterate actors: %w", err) + } + + getActorCode := func(a address.Address) cid.Cid { + c, ok := actorCodes[a] + if ok { + return c + } + + return cid.Undef + } + + // Build a lookup of which block headers indexed by their cid + blockHeaders := map[cid.Cid]*types.BlockHeader{} + for _, bh := range pts.Blocks() { + blockHeaders[bh.Cid()] = bh + } + + // Build a lookup of which blocks each message appears in + messageBlocks := map[cid.Cid][]cid.Cid{} + + for _, blkCid := range pts.Cids() { + blkMsgs, err := aw.ChainGetBlockMessages(ctx, blkCid) + if err != nil { + return nil, xerrors.Errorf("get block messages: %w", err) + } + + for _, mcid := range blkMsgs.Cids { + messageBlocks[mcid] = append(messageBlocks[mcid], blkCid) + } + } + + // Get messages that were processed in the parent tipset + msgs, err := aw.ChainGetParentMessages(ctx, ts.Cids()[0]) + if err != nil { + return nil, xerrors.Errorf("get parent messages: %w", err) + } + + // Get receipts for parent messages + rcpts, err := aw.ChainGetParentReceipts(ctx, ts.Cids()[0]) + if err != nil { + return nil, xerrors.Errorf("get parent receipts: %w", err) + } + + if len(rcpts) != len(msgs) { + // logic error somewhere + return nil, xerrors.Errorf("mismatching number of receipts: got %d wanted %d", len(rcpts), len(msgs)) + } + + // Start building a list of completed message with receipt + emsgs := make([]*lens.ExecutedMessage, 0, len(msgs)) + + for index, m := range msgs { + emsgs = append(emsgs, &lens.ExecutedMessage{ + Cid: m.Cid, + Height: pts.Height(), + Message: m.Message, + Receipt: rcpts[index], + BlockHeader: blockHeaders[messageBlocks[m.Cid][0]], + Blocks: messageBlocks[m.Cid], + Index: uint64(index), + FromActorCode: getActorCode(m.Message.From), + ToActorCode: getActorCode(m.Message.To), + }) + } + + return emsgs, nil +} diff --git a/lens/lotusrepo/repo.go b/lens/lotusrepo/repo.go index 10f596f6f..3e8ec62a7 100644 --- a/lens/lotusrepo/repo.go +++ b/lens/lotusrepo/repo.go @@ -9,16 +9,13 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/sentinel-visor/lens" - peer "github.com/libp2p/go-libp2p-core/peer" - "github.com/urfave/cli/v2" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/ulimit" marketevents "github.com/filecoin-project/lotus/markets/loggers" @@ -29,6 +26,11 @@ import ( "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/lens/util" ) type APIOpener struct { @@ -66,6 +68,10 @@ func NewAPIOpener(c *cli.Context) (*APIOpener, lens.APICloser, error) { return nil, nil, err } + sf := func() { + lr.Close() + } + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { return nil, nil, err @@ -89,10 +95,6 @@ func NewAPIOpener(c *cli.Context) (*APIOpener, lens.APICloser, error) { rapi.FullNodeAPI.StateAPI.StateManager = sm rapi.FullNodeAPI.StateAPI.StateModuleAPI = &full.StateModule{Chain: cs, StateManager: sm} - sf := func() { - lr.Close() - } - rapi.Context = c.Context rapi.cacheSize = c.Int("lens-cache-hint") return &APIOpener{rapi: &rapi}, sf, nil @@ -112,6 +114,10 @@ func (ra *RepoAPI) ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap, g return vm.ComputeGasOutputs(gasUsed, gasLimit, baseFee, feeCap, gasPremium) } +func (ra *RepoAPI) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + return util.GetExecutedMessagesForTipset(ctx, ra.FullNodeAPI.ChainAPI.Chain, ts, pts) +} + func (ra *RepoAPI) Store() adt.Store { bs := ra.FullNodeAPI.ChainAPI.Chain.Blockstore() cachedStore := bufbstore.NewBufferedBstore(bs) diff --git a/lens/sqlrepo/repo.go b/lens/sqlrepo/repo.go index dd9e540e4..082edaecb 100644 --- a/lens/sqlrepo/repo.go +++ b/lens/sqlrepo/repo.go @@ -1,9 +1,10 @@ package sqlrepo import ( + "github.com/urfave/cli/v2" + "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/lens/util" - "github.com/urfave/cli/v2" ) func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) { diff --git a/lens/util/repo.go b/lens/util/repo.go index 794afc718..0050b81dd 100644 --- a/lens/util/repo.go +++ b/lens/util/repo.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -30,6 +31,7 @@ import ( cbor "github.com/ipfs/go-ipld-cbor" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/urfave/cli/v2" + "golang.org/x/xerrors" ) type APIOpener struct { @@ -45,7 +47,6 @@ func NewAPIOpener(c *cli.Context, bs blockstore.Blockstore, head HeadMthd) (*API if _, _, err := ulimit.ManageFdLimit(); err != nil { return nil, nil, fmt.Errorf("setting file descriptor limit: %s", err) } - r := repo.NewMemory(nil) lr, err := r.Lock(repo.FullNode) @@ -100,12 +101,17 @@ type LensAPI struct { impl.FullNodeAPI context.Context cacheSize int + cs *store.ChainStore } func (ra *LensAPI) ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap, gasPremium abi.TokenAmount) vm.GasOutputs { return vm.ComputeGasOutputs(gasUsed, gasLimit, baseFee, feeCap, gasPremium) } +func (ra *LensAPI) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + return GetExecutedMessagesForTipset(ctx, ra.cs, ts, pts) +} + func (ra *LensAPI) Store() adt.Store { bs := ra.FullNodeAPI.ChainAPI.Chain.Blockstore() bufferedStore := bufbstore.NewBufferedBstore(bs) @@ -214,3 +220,130 @@ func (m fakeVerifier) VerifyWindowPoSt(ctx context.Context, info proof.WindowPoS func (m fakeVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proof abi.RegisteredPoStProof, id abi.ActorID, randomness abi.PoStRandomness, u uint64) ([]uint64, error) { panic("GenerateWinningPoStSectorChallenge not supported") } + +// GetMessagesForTipset returns a list of messages sent as part of pts (parent) with receipts found in ts (child). +// No attempt at deduplication of messages is made. +func GetExecutedMessagesForTipset(ctx context.Context, cs *store.ChainStore, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { + return nil, xerrors.Errorf("child tipset (%s) is not on the same chain as parent (%s)", ts.Key(), pts.Key()) + } + + stateTree, err := state.LoadStateTree(cs.Store(ctx), ts.ParentState()) + if err != nil { + return nil, xerrors.Errorf("load state tree: %w", err) + } + + // Build a lookup of actor codes + actorCodes := map[address.Address]cid.Cid{} + if err := stateTree.ForEach(func(a address.Address, act *types.Actor) error { + actorCodes[a] = act.Code + return nil + }); err != nil { + return nil, xerrors.Errorf("iterate actors: %w", err) + } + + getActorCode := func(a address.Address) cid.Cid { + c, ok := actorCodes[a] + if ok { + return c + } + + return cid.Undef + } + + // Build a lookup of which blocks each message appears in + messageBlocks := map[cid.Cid][]cid.Cid{} + for blockIdx, bh := range pts.Blocks() { + blscids, secpkcids, err := cs.ReadMsgMetaCids(bh.Messages) + if err != nil { + return nil, xerrors.Errorf("read messages for block: %w", err) + } + + for _, c := range blscids { + messageBlocks[c] = append(messageBlocks[c], pts.Cids()[blockIdx]) + } + + for _, c := range secpkcids { + messageBlocks[c] = append(messageBlocks[c], pts.Cids()[blockIdx]) + } + + } + + bmsgs, err := cs.BlockMsgsForTipset(pts) + if err != nil { + return nil, xerrors.Errorf("block messages for tipset: %w", err) + } + + pblocks := pts.Blocks() + if len(bmsgs) != len(pblocks) { + // logic error somewhere + return nil, xerrors.Errorf("mismatching number of blocks returned from block messages, got %d wanted %d", len(bmsgs), len(pblocks)) + } + + count := 0 + for _, bm := range bmsgs { + count += len(bm.BlsMessages) + len(bm.SecpkMessages) + } + + // Start building a list of completed message with receipt + emsgs := make([]*lens.ExecutedMessage, 0, count) + + // bmsgs is ordered by block + var index uint64 + for blockIdx, bm := range bmsgs { + for _, blsm := range bm.BlsMessages { + msg := blsm.VMMessage() + emsgs = append(emsgs, &lens.ExecutedMessage{ + Cid: blsm.Cid(), + Height: pts.Height(), + Message: msg, + BlockHeader: pblocks[blockIdx], + Blocks: messageBlocks[blsm.Cid()], + Index: index, + FromActorCode: getActorCode(msg.From), + ToActorCode: getActorCode(msg.To), + }) + index++ + } + + for _, secm := range bm.SecpkMessages { + msg := secm.VMMessage() + emsgs = append(emsgs, &lens.ExecutedMessage{ + Cid: secm.Cid(), + Height: pts.Height(), + Message: secm.VMMessage(), + BlockHeader: pblocks[blockIdx], + Blocks: messageBlocks[secm.Cid()], + Index: index, + FromActorCode: getActorCode(msg.From), + ToActorCode: getActorCode(msg.To), + }) + index++ + } + + } + + // Retrieve receipts using a block from the child tipset + rs, err := adt.AsArray(cs.Store(ctx), ts.Blocks()[0].ParentMessageReceipts) + if err != nil { + return nil, xerrors.Errorf("amt load: %w", err) + } + + if rs.Length() != uint64(len(emsgs)) { + // logic error somewhere + return nil, xerrors.Errorf("mismatching number of receipts: got %d wanted %d", rs.Length(), len(emsgs)) + } + + // Receipts are in same order as BlockMsgsForTipset + for _, em := range emsgs { + var r types.MessageReceipt + if found, err := rs.Get(em.Index, &r); err != nil { + return nil, err + } else if !found { + return nil, xerrors.Errorf("failed to find receipt %d", em.Index) + } + em.Receipt = &r + } + + return emsgs, nil +} diff --git a/main.go b/main.go index 85191ddca..d86b75d1d 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "os" logging "github.com/ipfs/go-log/v2" @@ -17,6 +18,12 @@ func main() { log.Fatal(err) } + defaultName := "visor_" + version.String() + hostname, err := os.Hostname() + if err == nil { + defaultName = fmt.Sprintf("%s_%s_%d", defaultName, hostname, os.Getpid()) + } + app := &cli.App{ Name: "visor", Usage: "Filecoin Chain Monitoring Utility", @@ -70,6 +77,12 @@ func main() { Value: "", Usage: "A comma delimited list of named loggers and log levels formatted as name:level, for example 'logger1:debug,logger2:info'", }, + &cli.StringFlag{ + Name: "name", + EnvVars: []string{"VISOR_NAME"}, + Value: defaultName, + Usage: "A name that helps to identify this instance of visor.", + }, &cli.BoolFlag{ Name: "tracing", EnvVars: []string{"VISOR_TRACING"}, @@ -115,6 +128,8 @@ func main() { commands.Migrate, commands.Run, commands.Debug, + commands.Watch, + commands.Walk, }, } diff --git a/model/actors/common/task.go b/model/actors/common/task.go index d0a6d67ac..b4a5bed8c 100644 --- a/model/actors/common/task.go +++ b/model/actors/common/task.go @@ -31,3 +31,13 @@ func (a *ActorTaskResult) Persist(ctx context.Context, db *pg.DB) error { return nil }) } + +func (a *ActorTaskResult) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if err := a.Actor.PersistWithTx(ctx, tx); err != nil { + return err + } + if err := a.State.PersistWithTx(ctx, tx); err != nil { + return err + } + return nil +} diff --git a/model/actors/market/task.go b/model/actors/market/task.go index d47fd79b2..1550124a9 100644 --- a/model/actors/market/task.go +++ b/model/actors/market/task.go @@ -23,12 +23,16 @@ func (mtr *MarketTaskResult) Persist(ctx context.Context, db *pg.DB) error { defer stop() return db.RunInTransaction(ctx, func(tx *pg.Tx) error { - if err := mtr.Proposals.PersistWithTx(ctx, tx); err != nil { - return fmt.Errorf("persisting market deal proposal: %w", err) - } - if err := mtr.States.PersistWithTx(ctx, tx); err != nil { - return fmt.Errorf("persisting market deal state: %w", err) - } - return nil + return mtr.PersistWithTx(ctx, tx) }) } + +func (mtr *MarketTaskResult) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if err := mtr.Proposals.PersistWithTx(ctx, tx); err != nil { + return fmt.Errorf("persisting market deal proposal: %w", err) + } + if err := mtr.States.PersistWithTx(ctx, tx); err != nil { + return fmt.Errorf("persisting market deal state: %w", err) + } + return nil +} diff --git a/model/visor/report.go b/model/visor/report.go new file mode 100644 index 000000000..fdf3752da --- /dev/null +++ b/model/visor/report.go @@ -0,0 +1,64 @@ +package visor + +import ( + "context" + "fmt" + "time" + + "github.com/go-pg/pg/v10" + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/label" +) + +const ( + ProcessingStatusOK = "OK" + ProcessingStatusInfo = "INFO" // Processing was successful but the task reported information in the StatusInformation column + ProcessingStatusError = "ERROR" // one or more errors were encountered, data may be incomplete +) + +type ProcessingReport struct { + tableName struct{} `pg:"visor_processing_reports"` // nolint: structcheck,unused + + Height int64 `pg:",pk,use_zero"` + StateRoot string `pg:",pk,notnull"` + + // Reporter is the name of the instance that is reporting the result + Reporter string `pg:",pk,notnull"` + + // Task is the name of the sub task that generated the report + Task string `pg:",pk,notnull"` + + StartedAt time.Time `pg:",pk,use_zero"` + CompletedAt time.Time `pg:",use_zero"` + + Status string `pg:",notnull"` + StatusInformation string + ErrorsDetected interface{} `pg:",type:jsonb"` +} + +func (s *ProcessingReport) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, s). + OnConflict("do nothing"). + Insert(); err != nil { + return fmt.Errorf("persisting processing report: %w", err) + } + return nil +} + +type ProcessingReportList []*ProcessingReport + +func (l ProcessingReportList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if len(l) == 0 { + return nil + } + ctx, span := global.Tracer("").Start(ctx, "ProcessingReportList.PersistWithTx", trace.WithAttributes(label.Int("count", len(l)))) + defer span.End() + + if _, err := tx.ModelContext(ctx, &l). + OnConflict("do nothing"). + Insert(); err != nil { + return fmt.Errorf("persisting processing report list: %w", err) + } + return nil +} diff --git a/storage/migrations/22_visor_processing_reports.go b/storage/migrations/22_visor_processing_reports.go new file mode 100644 index 000000000..900bb0ec0 --- /dev/null +++ b/storage/migrations/22_visor_processing_reports.go @@ -0,0 +1,30 @@ +package migrations + +import ( + "github.com/go-pg/migrations/v8" +) + +// Schema version 22 adds the visor_processing_reports table + +func init() { + up := batch(` +CREATE TABLE IF NOT EXISTS public.visor_processing_reports ( + "height" bigint, + "state_root" text, + "reporter" text, + "task" text, + "started_at" timestamptz NOT NULL, + "completed_at" timestamptz NOT NULL, + "status" text, + "status_information" text, + "errors_detected" jsonb, + PRIMARY KEY ("height","state_root","reporter", "task","started_at") +); +`) + + down := batch(` + DROP TABLE IF EXISTS public.visor_processing_reports; +`) + + migrations.MustRegisterTx(up, down) +} diff --git a/storage/sql.go b/storage/sql.go index eaa086c14..18e0a76cd 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -15,6 +15,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/sentinel-visor/metrics" + "github.com/filecoin-project/sentinel-visor/model" "github.com/filecoin-project/sentinel-visor/model/actors/common" init_ "github.com/filecoin-project/sentinel-visor/model/actors/init" "github.com/filecoin-project/sentinel-visor/model/actors/market" @@ -27,7 +28,6 @@ import ( "github.com/filecoin-project/sentinel-visor/model/derived" "github.com/filecoin-project/sentinel-visor/model/messages" "github.com/filecoin-project/sentinel-visor/model/visor" - "github.com/filecoin-project/sentinel-visor/version" ) var models = []interface{}{ @@ -85,14 +85,14 @@ var ( ErrSchemaTooNew = errors.New("database schema is too new for this version of visor") ) -func NewDatabase(ctx context.Context, url string, poolSize int) (*Database, error) { +func NewDatabase(ctx context.Context, url string, poolSize int, name string) (*Database, error) { opt, err := pg.ParseURL(url) if err != nil { return nil, xerrors.Errorf("parse database URL: %w", err) } opt.PoolSize = poolSize if opt.ApplicationName == "" { - opt.ApplicationName = "visor-" + version.String() + opt.ApplicationName = name } return &Database{ @@ -698,3 +698,12 @@ func (d *Database) MarkTipSetEconomicsComplete(ctx context.Context, tipset strin return nil } + +func (d *Database) Persist(ctx context.Context, p model.PersistableWithTx) error { + stop := metrics.Timer(ctx, metrics.PersistDuration) + defer stop() + + return d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { + return p.PersistWithTx(ctx, tx) + }) +} diff --git a/tasks/actorstate/actor.go b/tasks/actorstate/actor.go index d99ddaaac..86ba6be77 100644 --- a/tasks/actorstate/actor.go +++ b/tasks/actorstate/actor.go @@ -16,7 +16,7 @@ import ( // ActorExtractor extracts common actor state type ActorExtractor struct{} -func (ActorExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (ActorExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { ctx, span := global.Tracer("").Start(ctx, "ActorExtractor") defer span.End() @@ -32,7 +32,6 @@ func (ActorExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateA if err != nil { return nil, err } - log.Debugw("read full actor state", "addr", a.Address.String(), "size", len(state), "code", ActorNameByCode(a.Actor.Code)) return &commonmodel.ActorTaskResult{ Actor: &commonmodel.Actor{ diff --git a/tasks/actorstate/actorstate.go b/tasks/actorstate/actorstate.go index 8b002bc39..c81537453 100644 --- a/tasks/actorstate/actorstate.go +++ b/tasks/actorstate/actorstate.go @@ -65,7 +65,7 @@ type ActorStateAPI interface { // An ActorStateExtractor extracts actor state into a persistable format type ActorStateExtractor interface { - Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) + Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) } // All supported actor state extractors @@ -95,6 +95,13 @@ func SupportedActorCodes() []cid.Cid { return codes } +func GetActorStateExtractor(code cid.Cid) (ActorStateExtractor, bool) { + extractorsMu.Lock() + defer extractorsMu.Unlock() + ase, ok := extractors[code] + return ase, ok +} + func NewActorStateProcessor(d *storage.Database, opener lens.APIOpener, leaseLength time.Duration, batchSize int, minHeight, maxHeight int64, actorCodes []cid.Cid, useLeases bool) (*ActorStateProcessor, error) { p := &ActorStateProcessor{ opener: opener, @@ -307,7 +314,8 @@ func (p *ActorStateProcessor) processActor(ctx context.Context, node lens.API, i if err != nil { return xerrors.Errorf("extract actor state: %w", err) } - if err := data.Persist(ctx, p.storage.DB); err != nil { + + if err := p.storage.Persist(ctx, data); err != nil { return xerrors.Errorf("persisting raw state: %w", err) } @@ -327,10 +335,9 @@ func (p *ActorStateProcessor) processActor(ctx context.Context, node lens.API, i log.Debugw("persisting extracted state", "addr", info.Address.String()) - if err := data.Persist(ctx, p.storage.DB); err != nil { + if err := p.storage.Persist(ctx, data); err != nil { return xerrors.Errorf("persisting extracted state: %w", err) } - return nil } diff --git a/tasks/actorstate/actorstatechange.go b/tasks/actorstate/actorstatechange.go index 78ea683ce..bded41d28 100644 --- a/tasks/actorstate/actorstatechange.go +++ b/tasks/actorstate/actorstatechange.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" - "github.com/go-pg/pg/v10" "github.com/raulk/clock" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -216,10 +215,7 @@ func (p *ActorStateChangeProcessor) processTipSet(ctx context.Context, node lens }) } - ll.Debugw("persisting tipset", "state_changes", len(palist)) - if err := p.storage.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { - return palist.PersistWithTx(ctx, tx) - }); err != nil { + if err := p.storage.Persist(ctx, palist); err != nil { return xerrors.Errorf("persist: %w", err) } diff --git a/tasks/actorstate/init.go b/tasks/actorstate/init.go index bd4d5cf58..bd71a0c13 100644 --- a/tasks/actorstate/init.go +++ b/tasks/actorstate/init.go @@ -25,7 +25,7 @@ func init() { Register(sa2builtin.InitActorCodeID, InitExtractor{}) } -func (InitExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (InitExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { // genesis state. if a.Epoch == 0 { initActorState, err := init_.Load(node.Store(), &a.Actor) diff --git a/tasks/actorstate/market.go b/tasks/actorstate/market.go index 198e2cf7e..b9a1503a2 100644 --- a/tasks/actorstate/market.go +++ b/tasks/actorstate/market.go @@ -26,7 +26,7 @@ func init() { Register(sa2builtin.StorageMarketActorCodeID, StorageMarketExtractor{}) } -func (m StorageMarketExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (m StorageMarketExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { ctx, span := global.Tracer("").Start(ctx, "StorageMarketExtractor") defer span.End() diff --git a/tasks/actorstate/miner.go b/tasks/actorstate/miner.go index 878d58cb6..ff72ffbaa 100644 --- a/tasks/actorstate/miner.go +++ b/tasks/actorstate/miner.go @@ -32,7 +32,7 @@ func init() { Register(sa2builtin.StorageMinerActorCodeID, StorageMinerExtractor{}) } -func (m StorageMinerExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (m StorageMinerExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { // TODO all processing below can, and probably should, be done in parallel. ctx, span := global.Tracer("").Start(ctx, "StorageMinerExtractor") defer span.End() @@ -42,7 +42,7 @@ func (m StorageMinerExtractor) Extract(ctx context.Context, a ActorInfo, node Ac ec, err := NewMinerStateExtractionContext(ctx, a, node) if err != nil { - return nil, err + return nil, xerrors.Errorf("creating miner state extraction context: %w", err) } minerInfoModel, err := ExtractMinerInfo(a, ec) diff --git a/tasks/actorstate/multisig.go b/tasks/actorstate/multisig.go index b60df59c9..bb89487b9 100644 --- a/tasks/actorstate/multisig.go +++ b/tasks/actorstate/multisig.go @@ -22,7 +22,7 @@ func init() { type MultiSigActorExtractor struct{} -func (m MultiSigActorExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (m MultiSigActorExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { ctx, span := global.Tracer("").Start(ctx, "MultiSigActor") defer span.End() diff --git a/tasks/actorstate/power.go b/tasks/actorstate/power.go index 473e35822..0299e7271 100644 --- a/tasks/actorstate/power.go +++ b/tasks/actorstate/power.go @@ -72,7 +72,7 @@ func (p *PowerStateExtractionContext) IsGenesis() bool { return 0 == p.CurrTs.Height() } -func (StoragePowerExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (StoragePowerExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { ctx, span := global.Tracer("").Start(ctx, "StoragePowerExtractor") defer span.End() diff --git a/tasks/actorstate/reward.go b/tasks/actorstate/reward.go index 0328b240b..fdef1dd1a 100644 --- a/tasks/actorstate/reward.go +++ b/tasks/actorstate/reward.go @@ -23,7 +23,7 @@ func init() { Register(sa2builtin.RewardActorCodeID, RewardExtractor{}) } -func (RewardExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.Persistable, error) { +func (RewardExtractor) Extract(ctx context.Context, a ActorInfo, node ActorStateAPI) (model.PersistableWithTx, error) { ctx, span := global.Tracer("").Start(ctx, "RewardExtractor") defer span.End() diff --git a/tasks/indexer/chainheadindexer.go b/tasks/indexer/chainheadindexer.go index a73aacae0..d1905603e 100644 --- a/tasks/indexer/chainheadindexer.go +++ b/tasks/indexer/chainheadindexer.go @@ -10,6 +10,7 @@ import ( lotus_api "github.com/filecoin-project/lotus/api" store "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/metrics" @@ -18,13 +19,18 @@ import ( var log = logging.Logger("indexer") +// A TipSetObserver waits for notifications of new tipsets. +type TipSetObserver interface { + TipSet(ctx context.Context, ts *types.TipSet) error +} + // NewChainHeadIndexer creates a new ChainHeadIndexer. confidence sets the number of tipsets that will be held // in a cache awaiting possible reversion. Tipsets will be written to the database when they are evicted from // the cache due to incoming later tipsets. -func NewChainHeadIndexer(d *storage.Database, opener lens.APIOpener, confidence int) *ChainHeadIndexer { +func NewChainHeadIndexer(obs TipSetObserver, opener lens.APIOpener, confidence int) *ChainHeadIndexer { return &ChainHeadIndexer{ opener: opener, - storage: d, + obs: obs, confidence: confidence, cache: NewTipSetCache(confidence), } @@ -33,7 +39,7 @@ func NewChainHeadIndexer(d *storage.Database, opener lens.APIOpener, confidence // ChainHeadIndexer is a task that indexes blocks by following the chain head. type ChainHeadIndexer struct { opener lens.APIOpener - storage *storage.Database + obs TipSetObserver confidence int // size of tipset cache cache *TipSetCache // caches tipsets for possible reversion } @@ -74,8 +80,6 @@ func (c *ChainHeadIndexer) index(ctx context.Context, headEvents []*lotus_api.He ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexheadblock")) - data := NewUnindexedBlockData() - for _, ch := range headEvents { switch ch.Type { case store.HCCurrent: @@ -85,9 +89,9 @@ func (c *ChainHeadIndexer) index(ctx context.Context, headEvents []*lotus_api.He log.Errorw("tipset cache set current", "error", err.Error()) } - // If we have a zero confidence window then we need to index every tipset we see + // If we have a zero confidence window then we need to notify every tipset we see if c.confidence == 0 { - data.AddTipSet(ch.Val) + c.obs.TipSet(ctx, ch.Val) } case store.HCApply: log.Debugw("add tipset", "height", ch.Val.Height(), "tipset", ch.Val.Key().String()) @@ -96,9 +100,9 @@ func (c *ChainHeadIndexer) index(ctx context.Context, headEvents []*lotus_api.He log.Errorw("tipset cache add", "error", err.Error()) } - // Send the tipset that fell out of the confidence window to the database + // Send the tipset that fell out of the confidence window to the observer if tail != nil { - data.AddTipSet(tail) + c.obs.TipSet(ctx, tail) } case store.HCRevert: @@ -112,12 +116,33 @@ func (c *ChainHeadIndexer) index(ctx context.Context, headEvents []*lotus_api.He log.Debugw("tipset cache", "height", c.cache.Height(), "tail_height", c.cache.TailHeight(), "length", c.cache.Len()) - if data.Size() > 0 { + return nil +} + +var _ TipSetObserver = (*TipSetBlockIndexer)(nil) + +// A TipSetBlockIndexer waits for tipsets and persists their block data into a database. +type TipSetBlockIndexer struct { + data *UnindexedBlockData + storage *storage.Database +} + +func NewTipSetBlockIndexer(d *storage.Database) *TipSetBlockIndexer { + return &TipSetBlockIndexer{ + data: NewUnindexedBlockData(), + storage: d, + } +} + +func (t *TipSetBlockIndexer) TipSet(ctx context.Context, ts *types.TipSet) error { + t.data.AddTipSet(ts) + if t.data.Size() > 0 { // persist the blocks to storage - log.Debugw("persisting batch", "count", data.Size(), "height", data.Height()) - if err := data.Persist(ctx, c.storage.DB); err != nil { + log.Debugw("persisting batch", "count", t.data.Size(), "height", t.data.Height()) + if err := t.data.Persist(ctx, t.storage.DB); err != nil { return xerrors.Errorf("persist: %w", err) } } + return nil } diff --git a/tasks/indexer/chainheadindexer_test.go b/tasks/indexer/chainheadindexer_test.go index fec5fbe70..09b82d540 100644 --- a/tasks/indexer/chainheadindexer_test.go +++ b/tasks/indexer/chainheadindexer_test.go @@ -63,9 +63,9 @@ func TestChainHeadIndexer(t *testing.T) { apitest.MineUntilBlock(ctx, t, node, sn[0], nil) - d := &storage.Database{DB: db} + blockIndexer := NewTipSetBlockIndexer(&storage.Database{DB: db}) t.Logf("initializing indexer") - idx := NewChainHeadIndexer(d, opener, 0) + idx := NewChainHeadIndexer(blockIndexer, opener, 0) newHeads, err := node.ChainNotify(ctx) require.NoError(t, err, "chain notify") diff --git a/tasks/indexer/chainhistoryindexer.go b/tasks/indexer/chainhistoryindexer.go index 912fce411..c06b793c9 100644 --- a/tasks/indexer/chainhistoryindexer.go +++ b/tasks/indexer/chainhistoryindexer.go @@ -1,12 +1,10 @@ package indexer import ( - "container/list" "context" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" - pg "github.com/go-pg/pg/v10" - "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/trace" @@ -15,15 +13,13 @@ import ( "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/metrics" - "github.com/filecoin-project/sentinel-visor/storage" ) -func NewChainHistoryIndexer(d *storage.Database, opener lens.APIOpener, batchSize int, minHeight, maxHeight int64) *ChainHistoryIndexer { +func NewChainHistoryIndexer(obs TipSetObserver, opener lens.APIOpener, minHeight, maxHeight int64) *ChainHistoryIndexer { return &ChainHistoryIndexer{ opener: opener, - storage: d, + obs: obs, finality: 900, - batchSize: batchSize, minHeight: minHeight, maxHeight: maxHeight, } @@ -32,9 +28,8 @@ func NewChainHistoryIndexer(d *storage.Database, opener lens.APIOpener, batchSiz // ChainHistoryIndexer is a task that indexes blocks by following the chain history. type ChainHistoryIndexer struct { opener lens.APIOpener - storage *storage.Database + obs TipSetObserver finality int // epochs after which chain state is considered final - batchSize int // number of blocks to persist in a batch minHeight int64 // limit persisting to tipsets equal to or above this height maxHeight int64 // limit persisting to tipsets equal to or below this height} } @@ -48,125 +43,55 @@ func (c *ChainHistoryIndexer) Run(ctx context.Context) error { } defer closer() - height, err := c.mostRecentlySyncedBlockHeight(ctx) + ts, err := node.ChainHead(ctx) if err != nil { - return xerrors.Errorf("get synced block height: %w", err) + return xerrors.Errorf("get chain head: %w", err) + } + + if int64(ts.Height()) < c.minHeight { + return xerrors.Errorf("cannot walk history, chain head (%d) is earlier than minimum height (%d)", int64(ts.Height()), c.minHeight) } - if err := c.WalkChain(ctx, node, height); err != nil { - return xerrors.Errorf("collect blocks: %w", err) + if int64(ts.Height()) > c.maxHeight { + ts, err = node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(c.maxHeight), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get tipset by height: %w", err) + } + } + + if err := c.WalkChain(ctx, node, ts); err != nil { + return xerrors.Errorf("walk chain: %w", err) } return nil } -func (c *ChainHistoryIndexer) WalkChain(ctx context.Context, node lens.API, maxHeight int64) error { - ctx, span := global.Tracer("").Start(ctx, "ChainHistoryIndexer.WalkChain", trace.WithAttributes(label.Int64("height", maxHeight))) +func (c *ChainHistoryIndexer) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) error { + ctx, span := global.Tracer("").Start(ctx, "ChainHistoryIndexer.WalkChain", trace.WithAttributes(label.Int64("height", c.maxHeight))) defer span.End() ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, "indexhistoryblock")) - // get at most finality tipsets not exceeding maxHeight. These are blocks we have in the database but have not processed. - // Now we are going to walk down the chain from `head` until we have visited all blocks not in the database. - initialTipSets, err := c.storage.UnprocessedIndexedTipSets(ctx, int(maxHeight), c.finality) - if err != nil { - return xerrors.Errorf("get unprocessed blocks: %w", err) - } - log.Debugw("collect initial unprocessed tipsets", "count", len(initialTipSets)) - - // Data extracted from tipsets and block headers awaiting persistence - blockData := NewUnindexedBlockData() - - // A queue of tipsets that are yet to be visited - toVisit := list.New() + log.Debugw("found tipset", "height", ts.Height()) + c.obs.TipSet(ctx, ts) - // Mark all visited blocks from the database as already seen - for _, t := range initialTipSets { - tsk, err := t.TipSetKey() - if err != nil { - return xerrors.Errorf("decode tipsetkey: %w", err) - } - blockData.MarkSeen(tsk) - } - - // walk backwards from head until we find a block that we have - head, err := node.ChainHead(ctx) - if err != nil { - return xerrors.Errorf("get chain head: %w", err) - } - - log.Debugw("head", "height", head.Height()) - toVisit.PushBack(head) - - // TODO: revisit this loop which was designed to collect blocks but could now be a lot simpler since we are - // just walking the chain - for toVisit.Len() > 0 { + var err error + for int64(ts.Height()) >= c.minHeight && ts.Height() > 0 { select { case <-ctx.Done(): return ctx.Err() default: } - ts := toVisit.Remove(toVisit.Back()).(*types.TipSet) - stats.Record(ctx, metrics.EpochsToSync.M(int64(ts.Height()))) - - if ts.Height() != 0 { - // TODO: Look for websocket connection closed error and retry after a delay to avoid hot loop - pts, err := node.ChainGetTipSet(ctx, ts.Parents()) - if err != nil { - return xerrors.Errorf("get tipset: %w", err) - } - - toVisit.PushBack(pts) - } - - if blockData.Seen(ts.Key()) { - continue - } - - if int64(ts.Height()) > c.maxHeight { - log.Debugw("skipping tipset, height above configured maximum", "current_height", ts.Height()) - continue - } - - if int64(ts.Height()) < c.minHeight { - log.Debugw("finishing walk, height below configured minimumm", "current_height", ts.Height()) - break - } - - blockData.AddTipSet(ts) - - if blockData.Size() >= c.batchSize { - log.Debugw("persisting batch", "count", blockData.Size(), "current_height", ts.Height()) - // persist the batch of blocks to storage - - if err := blockData.Persist(ctx, c.storage.DB); err != nil { - return xerrors.Errorf("persist: %w", err) - } - stats.Record(ctx, metrics.HistoricalIndexerHeight.M(int64(blockData.Size()))) - blockData.Reset() + ts, err = node.ChainGetTipSet(ctx, ts.Parents()) + if err != nil { + return xerrors.Errorf("get tipset: %w", err) } - } + log.Debugw("found tipset", "height", ts.Height()) + c.obs.TipSet(ctx, ts) - log.Debugw("persisting final batch", "count", blockData.Size(), "height", blockData.Height()) - if err := blockData.Persist(ctx, c.storage.DB); err != nil { - return xerrors.Errorf("persist: %w", err) } return nil } - -func (c *ChainHistoryIndexer) mostRecentlySyncedBlockHeight(ctx context.Context) (int64, error) { - ctx, span := global.Tracer("").Start(ctx, "ChainHistoryIndexer.mostRecentlySyncedBlockHeight") - defer span.End() - - recent, err := c.storage.MostRecentAddedTipSet(ctx) - if err != nil { - if err == pg.ErrNoRows { - return 0, nil - } - return 0, xerrors.Errorf("query recent synced: %w", err) - } - return recent.Height, nil -} diff --git a/tasks/indexer/chainhistoryindexer_test.go b/tasks/indexer/chainhistoryindexer_test.go index 9de8424dc..6987c786a 100644 --- a/tasks/indexer/chainhistoryindexer_test.go +++ b/tasks/indexer/chainhistoryindexer_test.go @@ -56,12 +56,12 @@ func TestChainHistoryIndexer(t *testing.T) { cids := bhs.Cids() rounds := bhs.Rounds() - d := &storage.Database{DB: db} + blockIndexer := NewTipSetBlockIndexer(&storage.Database{DB: db}) t.Logf("initializing indexer") - idx := NewChainHistoryIndexer(d, opener, 1, 0, 1000) + idx := NewChainHistoryIndexer(blockIndexer, opener, 0, int64(head.Height())) t.Logf("indexing chain") - err = idx.WalkChain(ctx, openedAPI, int64(head.Height())) + err = idx.WalkChain(ctx, openedAPI, head) require.NoError(t, err, "WalkChain") t.Run("block_headers", func(t *testing.T) { diff --git a/testutil/lens.go b/testutil/lens.go index 30afa2e0e..da87d5ab5 100644 --- a/testutil/lens.go +++ b/testutil/lens.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" apitest "github.com/filecoin-project/lotus/api/test" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/specs-actors/actors/util/adt" cid "github.com/ipfs/go-cid" @@ -43,6 +44,10 @@ func (aw *APIWrapper) ComputeGasOutputs(gasUsed, gasLimit int64, baseFee, feeCap return vm.ComputeGasOutputs(gasUsed, gasLimit, baseFee, feeCap, gasPremium) } +func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { + return nil, xerrors.Errorf("GetExecutedMessagesForTipset is not implemented") +} + func (aw *APIWrapper) Get(ctx context.Context, c cid.Cid, out interface{}) error { cu, ok := out.(cbg.CBORUnmarshaler) if !ok {