Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcher: add batchSubmitter.checkExpectedProgress #12430

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,19 @@ func (s *channelManager) PendingDABytes() int64 {
}
return int64(f)
}

// CheckExpectedProgress uses the supplied syncStatus to infer
// whether the node providing the status has made the expected
// safe head progress given fully submitted channels held in
// state.
func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error {
for _, ch := range m.channelQueue {
if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured
!ch.isTimedOut() &&
syncStatus.CurrentL1.Number > ch.maxInclusionBlock &&
syncStatus.SafeL2.Number < ch.LatestL2().Number {
return errors.New("safe head did not make expected progress")
}
}
return nil
}
54 changes: 54 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,57 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) {

require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
}

func TestChannelManager_CheckExpectedProgress(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

channelMaxInclusionBlockNumber := uint64(3)
channelLatestSafeBlockNumber := uint64(11)

// Prepare a (dummy) fully submitted channel
// with
// maxInclusionBlock and latest safe block number as above
A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(int64(channelLatestSafeBlockNumber))})
_, err = A.AddBlock(a0)
require.NoError(t, err)
A.maxInclusionBlock = channelMaxInclusionBlockNumber
A.Close()
A.channelBuilder.frames = nil
A.channelBuilder.frameCursor = 0
require.True(t, A.isFullySubmitted())

m.channelQueue = append(m.channelQueue, A)

// The current L1 number implies that
// channel A above should have been derived
// from, so we expect safe head to progress to
// the channelLatestSafeBlockNumber.
// Since the safe head moved to 11, there is no error:
ss := eth.SyncStatus{
CurrentL1: eth.L1BlockRef{Number: channelMaxInclusionBlockNumber + 1},
SafeL2: eth.L2BlockRef{Number: channelLatestSafeBlockNumber},
}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)

// If the currentL1 is as above but the
// safe head is less than channelLatestSafeBlockNumber,
// the method should return an error:
ss.SafeL2 = eth.L2BlockRef{Number: channelLatestSafeBlockNumber - 1}
err = m.CheckExpectedProgress(ss)
require.Error(t, err)

// If the safe head is still less than channelLatestSafeBlockNumber
// but the currentL1 is _equal_ to the channelMaxInclusionBlockNumber
// there should be no error as that block is still being derived from:
ss.CurrentL1 = eth.L1BlockRef{Number: channelMaxInclusionBlockNumber}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)
}
30 changes: 22 additions & 8 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,17 +471,20 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR

l.state.pruneSafeBlocks(syncStatus.SafeL2)
l.state.pruneChannels(syncStatus.SafeL2)

err = l.state.CheckExpectedProgress(*syncStatus)
if err != nil {
l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}

if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}

l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-ctx.Done():
l.Log.Warn("main loop returning")
Expand Down Expand Up @@ -579,6 +582,17 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
}
}

func (l *BatchSubmitter) waitNodeSyncAndClearState() {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
}

// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
Expand Down
2 changes: 1 addition & 1 deletion op-service/eth/sync_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package eth
type SyncStatus struct {
// CurrentL1 is the L1 block that the derivation process is last idled at.
// This may not be fully derived into L2 data yet.
// The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// The safe L2 blocks were produced/included fully from the L1 chain up to _but excluding_ this L1 block.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 L1BlockRef `json:"current_l1"`
// CurrentL1Finalized is a legacy sync-status attribute. This is deprecated.
Expand Down