-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add watch and walk commands to index chain during traversal #249
Conversation
fb524bf
to
a1f710a
Compare
a6d91b3
to
705965d
Compare
e3d4865
to
1fe39e2
Compare
Sorry for the far too large PR. When I started I thought I could reuse the existing message, gas economy and actor state change tasks but they are too coupled to the random access indexing mode that we currently use. I ended up extracting most of the message parsing and actor state change detection into new tasks that keep track of the last tipset and state tree they saw so they can perform appropriate diffs. The key areas that need particular review are:
|
I forgot to mention that neither of these these commands use the existing visor processing tables. They don't read the database at all, just write results. |
type ActorStateProcessor struct { | ||
node lens.API | ||
opener lens.APIOpener | ||
closer lens.APICloser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems a bit scary that each processor holds the closer, since that's often going to be a shared handle that closes it fully for many lenses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closer here is for the API instance that is returned when opener is called. It's not the same as the closer created when the opener is created. That is deferred in the top level command.
start := time.Now() | ||
|
||
// Run each task concurrently | ||
results := make(chan *ActorStateResult, len(changes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what closes / cleans up this channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It goes out of scope when the function returns and is garbage collected as normal. There's never any need to close a channel unless it is to signal that readers should stop waiting on it.
if len(errorsDetected) != 0 { | ||
report.ErrorsDetected = errorsDetected | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you append errors on the report directly, rather than in a separate variable you then append to the report here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrorsDetected is an interface{} unfortunately
main.go
Outdated
defaultName := "visor_" + version.String() | ||
hostname, err := os.Hostname() | ||
if err == nil { | ||
defaultName += "_" + hostname + "_" + strconv.Itoa(os.Getpid()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider fmt.Sprintf("%s_%s_%d", defaultName, hostname, os.Getpid())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
p.node = node | ||
p.closer = closer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to postpone this until ProcessTipSet
and not init within New
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to re-open the lens whenever it has been closed after an error. This is the reconnection logic for the lotus API.
for inFlight > 0 { | ||
res := <-results | ||
inFlight-- | ||
elapsed := time.Since(start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This elapsed will not be accurate since the for loop will block receieves for all buffered results. Maybe this is negligble given we're just appending to pre-alloced slices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove all elapsed times in favour of proper metrics collection in a future change
} | ||
p.node = node | ||
p.closer = closer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why postpone this setup until ProcessTipSet
and not within New
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, it's not a setup it's re-connecting a lens that may have been closed due to an error.
ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), | ||
|
||
// TODO: is SizeBytes really needed here? | ||
SizeBytes: msgSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: No, it's enough to be in Messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments from my first pass, going to give a second look tomorrow. Looks good so far.
} else { | ||
log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height()) | ||
} | ||
} | ||
|
||
p.lastTipSet = ts | ||
p.lastStateTree = stateTree | ||
|
||
if err != nil { | ||
log.Errorw("error received while processing actors, closing lens", "error", err) | ||
if cerr := p.Close(); cerr != nil { | ||
log.Errorw("error received while closing lens", "error", cerr) | ||
} | ||
} | ||
return data, report, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can result in all returned values being nil
, perhaps the error message should be filled with a message similar to the error log above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No error and no data is a valid outcome. The very first tipset will have no data since we wait until we have seen two to perform a diff.
// ProcessTipSet processes a tipset. If error is non-nil then the processor encountered a fatal error. | ||
// Any data returned must be accompanied by a processing report. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If error is non-nil then the processor encountered a fatal error.
It could also be the case that no errors, fatal or not were encountered, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The only errors that should be returned here are ones that signal that the processor cannot continue.
// TODO: the following closure is in place to handle the potential for panic | ||
// in ipld-prime. Can be removed once fixed upstream. | ||
// tracking issue: https://github.com/ipld/go-ipld-prime/issues/97 | ||
func() { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
err = xerrors.Errorf("recovered panic: %v", r) | ||
} | ||
}() | ||
params, method, err = statediff.ParseParams(m.Params, int(m.Method), actor) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw ipld/go-ipld-prime#97 has been closed via ipld/go-ipld-prime#99. Perhaps this is no longer needed? cc @willscott since I have a hunch this TODO was from you.
lens/lotus/api.go
Outdated
// No attempt at deduplication of messages is made. | ||
func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { | ||
if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { | ||
return nil, xerrors.Errorf("child is not on the same chain") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be helpful to add some info about the tipset(s) in this message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
lens/lotus/api.go
Outdated
// Get receipts for parent messages | ||
rcpts, err := aw.ChainGetParentReceipts(ctx, ts.Cids()[0]) | ||
if err != nil { | ||
return nil, xerrors.Errorf("get parent messages: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get parent receipts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
lens/util/repo.go
Outdated
// No attempt at deduplication of messages is made. | ||
func GetExecutedMessagesForTipset(ctx context.Context, cs *store.ChainStore, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) { | ||
if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) { | ||
return nil, xerrors.Errorf("child is not on the same chain") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto previous comment about helpful error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
model/visor/report.go
Outdated
) | ||
|
||
type ProcessingReport struct { | ||
tableName struct{} `pg:"visor_processing_reports"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to pass linting add // nolint: structcheck,unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
model/visor/report.go
Outdated
if _, err := tx.ModelContext(ctx, &l). | ||
OnConflict("do nothing"). | ||
Insert(); err != nil { | ||
return fmt.Errorf("persisting processing report: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processing report list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
This adds two new commands to visor to index while performing an in-order traversal of the filecoin chain.
The
watch
command watches and follows the head of the chain with a given confidence levelThe
walk
command walks backwards between two epochsBoth commands can be configured to perform a number of tasks as they encounter tipsets. They can remember the previous tipset and extraction result to make diffing or referring to earlier messages more efficient. Tasks are performed concurrently where possible.
The
watch
command attempts to do everything within a single epoch. Thewalk
command has no deadline.Tasks available to the new commands:
blocks
- extract block header, block parents and drand entry information from each tipset encounteredmessages
- extract messages, receipts, block messages, parsed messages and gas outputs from each parent/child pair of tipsets encounteredchaineconomics
- extracts the circulating supply of FIL from each tipset encountered.actorstatesraw
- extracts raw actor information and JSON of the actor state for each actor that changes state between two tipsetsactorstatesparsed
- parses actor state into separate tables for each actor that changes state between two tipsetsactorstates
- combinesactorstatesraw
andactorstatesparsed
into a single taskThe
actorstates
task is split into two parts.actorstatesparsed
requires much more time to execute thanactorstatesraw
.Test with something like:
or
Results of the
walk
andwatch
command are stored in a newvisor_processing_reports
table. One row is written for each epoch and task combination with one of three statuses:OK
- task completed successfullyINFO
- task completed successfully but some additional information can be found in thestatus_information
column.ERRROR
- task failed to complete fully. More details can be found in theerrors_detected
column.Performance notes:
Currently the speed of requesting blocks from Lotus prevents anything except basic block indexing from completing in a single epoch. With a fast block cache (such as lotus-cpr) in front of the API is it feasible to watch the chain head with the
blocks
,messages
,chaineconomics
andactorstatesraw
tasks enabled. Further optimization of actor state diffing is needed to enable theactorstatesparsed
task to reliably complete within a single epoch.Still to do: