From 111dbb53a624def8438555cf495f0ee8e0ab2f5f Mon Sep 17 00:00:00 2001 From: geoknee Date: Fri, 11 Oct 2024 18:33:14 +0100 Subject: [PATCH 1/6] implement batchSubmitter.checkExpectedProgress --- op-batcher/batcher/driver.go | 56 ++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 7b1d6139e265..3271c1cbf23e 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -471,17 +471,18 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR l.state.pruneSafeBlocks(syncStatus.SafeL2) l.state.pruneChannels(syncStatus.SafeL2) + + err = l.checkExpectedProgress(*syncStatus) + if err != nil { + l.waitNodeSyncAndClearState() + continue + } + if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) { - // Wait for any in flight transactions - // to be ingested by the node before - // we start loading blocks again. - err := l.waitNodeSync() - if err != nil { - l.Log.Warn("error waiting for node sync", "err", err) - } - l.clearState(l.shutdownCtx) + l.waitNodeSyncAndClearState() continue } + l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) case <-ctx.Done(): l.Log.Warn("main loop returning") @@ -579,6 +580,45 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { } } +func (l *BatchSubmitter) waitNodeSyncAndClearState() { + // Wait for any in flight transactions + // to be ingested by the node before + // we start loading blocks again. + err := l.waitNodeSync() + if err != nil { + l.Log.Warn("error waiting for node sync", "err", err) + } + l.clearState(l.shutdownCtx) +} + +// checkExpectedProgress uses the supplied syncStatus to infer +// whether the node providing the status has made the expected +// safe head progress given fully submitted channels held in +// state. +func (l *BatchSubmitter) checkExpectedProgress(syncStatus eth.SyncStatus) error { + + // Define a buffer: a delay denominated in L1 blocks + // to allow verifier to process blocks. + // Set to 0 to require immediate safe head advancement. + // Set to 1 to allow e.g. 12s for the safe head to advance + // after the L1 inclusion block was ingested. + // TODO extract this into a config variable + // TODO should we also wait numConfirmations (this is a txmgr config variable, not in scope of the batch submitter) + + BUFFER := uint64(1) + + verifierCurrentL1 := syncStatus.CurrentL1 + for _, ch := range l.state.channelQueue { + if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured + !ch.isTimedOut() && + verifierCurrentL1.Number > ch.maxInclusionBlock+BUFFER && + syncStatus.SafeL2.Number < ch.LatestL2().Number { + return errors.New("safe head did not make expected progress") + } + } + return nil +} + // waitNodeSync Check to see if there was a batcher tx sent recently that // still needs more block confirmations before being considered finalized func (l *BatchSubmitter) waitNodeSync() error { From da5a02ad4ec3231e5d47f651b7d25b9b4a8e3c77 Mon Sep 17 00:00:00 2001 From: geoknee Date: Mon, 18 Nov 2024 13:44:34 +0000 Subject: [PATCH 2/6] remove buffer variable --- op-batcher/batcher/driver.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 3271c1cbf23e..ab5fc03127ea 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -596,22 +596,11 @@ func (l *BatchSubmitter) waitNodeSyncAndClearState() { // safe head progress given fully submitted channels held in // state. func (l *BatchSubmitter) checkExpectedProgress(syncStatus eth.SyncStatus) error { - - // Define a buffer: a delay denominated in L1 blocks - // to allow verifier to process blocks. - // Set to 0 to require immediate safe head advancement. - // Set to 1 to allow e.g. 12s for the safe head to advance - // after the L1 inclusion block was ingested. - // TODO extract this into a config variable - // TODO should we also wait numConfirmations (this is a txmgr config variable, not in scope of the batch submitter) - - BUFFER := uint64(1) - verifierCurrentL1 := syncStatus.CurrentL1 for _, ch := range l.state.channelQueue { if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured !ch.isTimedOut() && - verifierCurrentL1.Number > ch.maxInclusionBlock+BUFFER && + verifierCurrentL1.Number > ch.maxInclusionBlock && syncStatus.SafeL2.Number < ch.LatestL2().Number { return errors.New("safe head did not make expected progress") } From b15ae540d1a472694e20d9113644f4095d9ea062 Mon Sep 17 00:00:00 2001 From: geoknee Date: Mon, 18 Nov 2024 18:36:44 +0000 Subject: [PATCH 3/6] add warning logs when calling waitNodeSyncAndClearState --- op-batcher/batcher/driver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index ab5fc03127ea..2fce2254f356 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -474,11 +474,13 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR err = l.checkExpectedProgress(*syncStatus) if err != nil { + l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err) l.waitNodeSyncAndClearState() continue } if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) { + l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) l.waitNodeSyncAndClearState() continue } From 5052dff7478a002ece6d6941c6148ff2616b1c9e Mon Sep 17 00:00:00 2001 From: geoknee Date: Mon, 18 Nov 2024 22:33:09 +0000 Subject: [PATCH 4/6] push method down into channel manager and add test --- op-batcher/batcher/channel_manager.go | 16 ++++++++ op-batcher/batcher/channel_manager_test.go | 43 ++++++++++++++++++++++ op-batcher/batcher/driver.go | 19 +--------- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 4208a8cd3795..81ee0fb35a51 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -549,3 +549,19 @@ func (s *channelManager) PendingDABytes() int64 { } return int64(f) } + +// CheckExpectedProgress uses the supplied syncStatus to infer +// whether the node providing the status has made the expected +// safe head progress given fully submitted channels held in +// state. +func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error { + for _, ch := range m.channelQueue { + if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured + !ch.isTimedOut() && + syncStatus.CurrentL1.Number > ch.maxInclusionBlock && + syncStatus.SafeL2.Number < ch.LatestL2().Number { + return errors.New("safe head did not make expected progress") + } + } + return nil +} diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 1c742207a5f0..4692aafc121e 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -627,3 +627,46 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) { require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co) } + +func TestChannelManager_CheckExpectedProgress(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + cfg := channelManagerTestConfig(100, derive.SingularBatchType) + cfg.InitNoneCompressor() + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + + // Prepare a (dummy) fully submitted channel + // with maxInclusionBlock = 3 and latest safe block number = 3 + A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) + require.NoError(t, err) + rng := rand.New(rand.NewSource(123)) + a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID) + a0 = a0.WithSeal(&types.Header{Number: big.NewInt(3)}) + _, err = A.AddBlock(a0) + require.NoError(t, err) + A.maxInclusionBlock = 3 + A.Close() + A.channelBuilder.frames = nil + A.channelBuilder.frameCursor = 0 + require.True(t, A.isFullySubmitted()) + + m.channelQueue = append(m.channelQueue, A) + + // The current L1 number implies that + // channel A above should have been derived + // from, so we expect safe head to progress to 3. + // Since the safe head moved to 4, there is no error: + ss := eth.SyncStatus{ + CurrentL1: eth.L1BlockRef{Number: 4}, + SafeL2: eth.L2BlockRef{Number: 4}, + } + err = m.CheckExpectedProgress(ss) + require.NoError(t, err) + + // If the safe head is less than 3 + // the method should return an error: + ss.SafeL2 = eth.L2BlockRef{Number: 1} + + err = m.CheckExpectedProgress(ss) + require.Error(t, err) + +} diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 2fce2254f356..dde08ac84193 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -472,7 +472,7 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR l.state.pruneSafeBlocks(syncStatus.SafeL2) l.state.pruneChannels(syncStatus.SafeL2) - err = l.checkExpectedProgress(*syncStatus) + err = l.state.CheckExpectedProgress(*syncStatus) if err != nil { l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err) l.waitNodeSyncAndClearState() @@ -593,23 +593,6 @@ func (l *BatchSubmitter) waitNodeSyncAndClearState() { l.clearState(l.shutdownCtx) } -// checkExpectedProgress uses the supplied syncStatus to infer -// whether the node providing the status has made the expected -// safe head progress given fully submitted channels held in -// state. -func (l *BatchSubmitter) checkExpectedProgress(syncStatus eth.SyncStatus) error { - verifierCurrentL1 := syncStatus.CurrentL1 - for _, ch := range l.state.channelQueue { - if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured - !ch.isTimedOut() && - verifierCurrentL1.Number > ch.maxInclusionBlock && - syncStatus.SafeL2.Number < ch.LatestL2().Number { - return errors.New("safe head did not make expected progress") - } - } - return nil -} - // waitNodeSync Check to see if there was a batcher tx sent recently that // still needs more block confirmations before being considered finalized func (l *BatchSubmitter) waitNodeSync() error { From ce7fbcf49a4ee4b9c077ebbc51a34041cb0b29ea Mon Sep 17 00:00:00 2001 From: geoknee Date: Tue, 19 Nov 2024 10:41:27 +0000 Subject: [PATCH 5/6] clarify SyncStatus documentation --- op-service/eth/sync_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-service/eth/sync_status.go b/op-service/eth/sync_status.go index f9db1f672b82..e16275920e2b 100644 --- a/op-service/eth/sync_status.go +++ b/op-service/eth/sync_status.go @@ -5,7 +5,7 @@ package eth type SyncStatus struct { // CurrentL1 is the L1 block that the derivation process is last idled at. // This may not be fully derived into L2 data yet. - // The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block. + // The safe L2 blocks were produced/included fully from the L1 chain up to _but excluding_ this L1 block. // If the node is synced, this matches the HeadL1, minus the verifier confirmation distance. CurrentL1 L1BlockRef `json:"current_l1"` // CurrentL1Finalized is a legacy sync-status attribute. This is deprecated. From a2ff417d647aad646548adbb6a13ba6b56404a16 Mon Sep 17 00:00:00 2001 From: geoknee Date: Tue, 19 Nov 2024 13:54:59 +0000 Subject: [PATCH 6/6] improve TestChannelManager_CheckExpectedProgress make parameters "tighter" / more realistic and check an extra case --- op-batcher/batcher/channel_manager_test.go | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 4692aafc121e..32aae1b06dd1 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -634,16 +634,20 @@ func TestChannelManager_CheckExpectedProgress(t *testing.T) { cfg.InitNoneCompressor() m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + channelMaxInclusionBlockNumber := uint64(3) + channelLatestSafeBlockNumber := uint64(11) + // Prepare a (dummy) fully submitted channel - // with maxInclusionBlock = 3 and latest safe block number = 3 + // with + // maxInclusionBlock and latest safe block number as above A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) require.NoError(t, err) rng := rand.New(rand.NewSource(123)) a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID) - a0 = a0.WithSeal(&types.Header{Number: big.NewInt(3)}) + a0 = a0.WithSeal(&types.Header{Number: big.NewInt(int64(channelLatestSafeBlockNumber))}) _, err = A.AddBlock(a0) require.NoError(t, err) - A.maxInclusionBlock = 3 + A.maxInclusionBlock = channelMaxInclusionBlockNumber A.Close() A.channelBuilder.frames = nil A.channelBuilder.frameCursor = 0 @@ -653,20 +657,27 @@ func TestChannelManager_CheckExpectedProgress(t *testing.T) { // The current L1 number implies that // channel A above should have been derived - // from, so we expect safe head to progress to 3. - // Since the safe head moved to 4, there is no error: + // from, so we expect safe head to progress to + // the channelLatestSafeBlockNumber. + // Since the safe head moved to 11, there is no error: ss := eth.SyncStatus{ - CurrentL1: eth.L1BlockRef{Number: 4}, - SafeL2: eth.L2BlockRef{Number: 4}, + CurrentL1: eth.L1BlockRef{Number: channelMaxInclusionBlockNumber + 1}, + SafeL2: eth.L2BlockRef{Number: channelLatestSafeBlockNumber}, } err = m.CheckExpectedProgress(ss) require.NoError(t, err) - // If the safe head is less than 3 + // If the currentL1 is as above but the + // safe head is less than channelLatestSafeBlockNumber, // the method should return an error: - ss.SafeL2 = eth.L2BlockRef{Number: 1} - + ss.SafeL2 = eth.L2BlockRef{Number: channelLatestSafeBlockNumber - 1} err = m.CheckExpectedProgress(ss) require.Error(t, err) + // If the safe head is still less than channelLatestSafeBlockNumber + // but the currentL1 is _equal_ to the channelMaxInclusionBlockNumber + // there should be no error as that block is still being derived from: + ss.CurrentL1 = eth.L1BlockRef{Number: channelMaxInclusionBlockNumber} + err = m.CheckExpectedProgress(ss) + require.NoError(t, err) }