Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Address the missing order events edge case #863

Merged
merged 12 commits into from
Jul 27, 2020
16 changes: 16 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,22 @@ func (app *App) Start() error {
}
}()

// NOTE(jalextowle): If we are already more than `MaxBlocksStoredInNonArchiveNode`
// blocks behind, there is no need to check for missing order events. In this
// case, we cannot use the `GetBlockByNumber` RPC call with a non-archival
// Ethereum node, so we already have to revalidate all of the orders in the
// database, and we skip revalidation here to avoid doing redundant work.
preliminaryBlocksElapsed, _, err := app.blockWatcher.GetNumberOfBlocksBehind(innerCtx)
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if preliminaryBlocksElapsed < constants.MaxBlocksStoredInNonArchiveNode {
log.WithField("blocksElapsed", preliminaryBlocksElapsed).Info("Checking for missing order events relating to orders stored (this can take a while)...")
if err := app.orderWatcher.RevalidateOrdersForMissingEvents(innerCtx); err != nil {
return err
}
}

// Note: this is a blocking call so we won't continue set up until its finished.
blocksElapsed, err := app.blockWatcher.FastSyncToLatestBlock(innerCtx)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
OFIsPinned OrderField = "isPinned"
OFParsedMakerAssetData OrderField = "parsedMakerAssetData"
OFParsedMakerFeeAssetData OrderField = "parsedMakerFeeAssetData"
OFLastValidatedBlockNumber OrderField = "lastValidatedBlockNumber"
)

type OrderQuery struct {
Expand Down Expand Up @@ -205,6 +206,27 @@ type MiniHeaderFilter struct {
Value interface{} `json:"value"`
}

// GetOldestMiniHeader is a helper method for getting the oldest MiniHeader.
// It returns ErrNotFound if there are no MiniHeaders in the database.
func (db *DB) GetOldestMiniHeader() (*types.MiniHeader, error) {
oldestMiniHeaders, err := db.FindMiniHeaders(&MiniHeaderQuery{
Sort: []MiniHeaderSort{
{
Field: MFNumber,
Direction: Ascending,
},
},
Limit: 1,
})
if err != nil {
return nil, err
}
if len(oldestMiniHeaders) == 0 {
return nil, ErrNotFound
}
return oldestMiniHeaders[0], nil
}

// GetLatestMiniHeader is a helper method for getting the latest MiniHeader.
// It returns ErrNotFound if there are no MiniHeaders in the database.
func (db *DB) GetLatestMiniHeader() (*types.MiniHeader, error) {
Expand Down
31 changes: 19 additions & 12 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,34 +103,42 @@ func New(retentionLimit int, config Config) *Watcher {
}
}

func (w *Watcher) GetNumberOfBlocksBehind(ctx context.Context) (int, int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the function needs to be public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being used in core/core.go

latestBlockProcessed := w.stack.Peek()

// No previously stored block so no blocks have elapsed
if latestBlockProcessed == nil {
return 0, 0, nil
}

latestBlock, err := w.client.HeaderByNumber(nil)
if err != nil {
return 0, 0, err
}
latestBlockProcessedNumber := int(latestBlockProcessed.Number.Int64())
blocksElapsed := int(latestBlock.Number.Int64()) - latestBlockProcessedNumber
return blocksElapsed, latestBlockProcessedNumber, err
}

// FastSyncToLatestBlock checks if the BlockWatcher is behind the latest block, and if so,
// catches it back up. If less than 128 blocks passed, we are able to fetch all missing
// block events and process them. If more than 128 blocks passed, we cannot catch up
// without an archive Ethereum node (see: http://bit.ly/2D11Hr6) so we instead clear
// previously tracked blocks so BlockWatcher starts again from the latest block. This
// function blocks until complete or the context is cancelled.
func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int, err error) {
func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (int, error) {
w.mu.Lock()
if w.wasStartedOnce {
w.mu.Unlock()
return 0, errors.New("Can only fast-sync to latest block before starting BlockWatcher")
}
w.mu.Unlock()

latestBlockProcessed := w.stack.Peek()

// No previously stored block so no blocks have elapsed
if latestBlockProcessed == nil {
return 0, nil
}

latestBlock, err := w.client.HeaderByNumber(nil)
blocksElapsed, latestBlockProcessedNumber, err := w.GetNumberOfBlocksBehind(ctx)
if err != nil {
return 0, err
}

latestBlockProcessedNumber := int(latestBlockProcessed.Number.Int64())
blocksElapsed = int(latestBlock.Number.Int64()) - latestBlockProcessedNumber
if blocksElapsed == 0 {
return blocksElapsed, nil
} else if blocksElapsed < constants.MaxBlocksStoredInNonArchiveNode {
Expand Down Expand Up @@ -310,7 +318,6 @@ func (w *Watcher) SyncToLatestBlock() error {
}
w.blockFeed.Send(allEvents)
}

return syncErr
}

Expand Down
Loading