From 2ab3e004e22fc35c4125a5a0a2c4b9b91fe5ab8c Mon Sep 17 00:00:00 2001 From: Roberto Bayardo Date: Wed, 17 Jul 2024 11:35:53 -0700 Subject: [PATCH] use a txRef type with a isCancel indicator instead of a magic channel id indicator --- op-batcher/batcher/driver.go | 55 ++++++++++---------- op-batcher/batcher/service.go | 12 ++--- op-e2e/eip4844_test.go | 10 ++-- op-e2e/system_fpp_test.go | 5 +- op-e2e/system_test.go | 8 +-- op-service/txmgr/test_txmgr.go | 94 ++++++++++++++++++++++++++++++++++ op-service/txmgr/txmgr.go | 84 ------------------------------ 7 files changed, 136 insertions(+), 132 deletions(-) create mode 100644 op-service/txmgr/test_txmgr.go diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 583334b615640..af1624bb74764 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -28,22 +28,20 @@ import ( var ( ErrBatcherNotRunning = errors.New("batcher is not running") - - // cancelID is the channel ID used to indicate this transaction is a cancellation transaction only - cancelID = derive.ChannelID([derive.ChannelIDLength]byte{0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF}) - cancelTxData = txData{ + emptyTxData = txData{ frames: []frameData{ frameData{ data: []byte{}, - id: frameID{ - chID: cancelID, - frameNumber: 0, - }, }, }, } ) +type txRef struct { + id txID + isCancel bool +} + type L1Client interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) @@ -299,8 +297,8 @@ func (l *BatchSubmitter) loop() { } } - receiptsCh := make(chan txmgr.TxReceipt[txID]) - queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) + receiptsCh := make(chan txmgr.TxReceipt[txRef]) + queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) // start the receipt/result processing loop receiptLoopDone := make(chan struct{}) @@ -314,8 +312,7 @@ func (l *BatchSubmitter) loop() { case r := <-receiptsCh: if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) { l.Log.Info("incompatible tx in txpool") - } else if r.ID[0].chID == cancelID && - txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) { + } else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) { // Set state to TxpoolGood even if the cancellation transaction ended in error // since the stuck transaction could have cleared while we were waiting. l.Log.Info("txpool may no longer be blocked", "err", r.Err) @@ -425,7 +422,7 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { @@ -482,7 +479,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -535,21 +532,21 @@ func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) // cancelBlockingTx creates an empty transaction of appropriate type to cancel out the incompatible // transaction stuck in the txpool. In the future we might send an actual batch transaction instead // of an empty one to avoid wasting the tx fee. -func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { +func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { var candidate *txmgr.TxCandidate var err error if l.Config.UseBlobs { - candidate = l.calldataTxCandidate(cancelTxData.CallData()) - } else if candidate, err = l.blobTxCandidate(cancelTxData); err != nil { + candidate = l.calldataTxCandidate([]byte{}) + } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { panic(err) // this error should not happen } l.Log.Warn("sending a cancellation transaction to unblock txpool") - l.queueTx(cancelTxData, candidate, queue, receiptsCh) + l.queueTx(txData{}, true, candidate, queue, receiptsCh) } // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // The method will block if the queue's MaxPendingTransactions is exceeded. -func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error { +func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { var err error // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. @@ -584,11 +581,11 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que candidate = l.calldataTxCandidate(data) } - l.queueTx(txdata, candidate, queue, receiptsCh) + l.queueTx(txdata, false, candidate, queue, receiptsCh) return nil } -func (l *BatchSubmitter) queueTx(txdata txData, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { +func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false) if err != nil { // we log instead of return an error here because txmgr can do its own gas estimation @@ -597,7 +594,7 @@ func (l *BatchSubmitter) queueTx(txdata txData, candidate *txmgr.TxCandidate, qu candidate.GasLimit = intrinsicGas } - queue.Send(txdata.ID(), *candidate, receiptsCh) + queue.Send(txRef{txdata.ID(), isCancel}, *candidate, receiptsCh) } func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) { @@ -624,12 +621,12 @@ func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate { } } -func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txID]) { +func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) { // Record TX Status if r.Err != nil { - l.recordFailedTx(r.ID, r.Err) + l.recordFailedTx(r.ID.id, r.Err) } else { - l.recordConfirmedTx(r.ID, r.Receipt) + l.recordConfirmedTx(r.ID.id, r.Receipt) } } @@ -681,7 +678,7 @@ func logFields(xs ...any) (fs []any) { } type TestBatchSubmitter struct { - BatchSubmitter + *BatchSubmitter ttm *txmgr.TestTxManager } @@ -697,8 +694,8 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { var candidate *txmgr.TxCandidate var err error if l.Config.UseBlobs { - candidate = l.calldataTxCandidate(cancelTxData.CallData()) - } else if candidate, err = l.blobTxCandidate(cancelTxData); err != nil { + candidate = l.calldataTxCandidate([]byte{}) + } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { return err } if candidate.GasLimit, err = core.IntrinsicGas(candidate.TxData, nil, false, true, true, false); err != nil { @@ -706,7 +703,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { } l.ttm = &txmgr.TestTxManager{ - SimpleTxManager: *l.Txmgr, //nolint:all + SimpleTxManager: l.Txmgr, } l.Log.Info("sending txpool blocking test tx") if err := l.ttm.JamTxPool(ctx, *candidate); err != nil { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index c84322fe867d4..71dd7fa4b6d1e 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -426,16 +426,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error { var _ cliapp.Lifecycle = (*BatcherService)(nil) -// Driver returns the handler on the batch-submitter driver element, -// to start/stop/restart the batch-submission work, for use in testing. -func (bs *BatcherService) Driver() *BatchSubmitter { - return bs.driver -} - -// Driver returns the handler on the batch-submitter driver element, -// to start/stop/restart the batch-submission work, for use in testing. +// TestDriver returns a handler for the batch-submitter driver element, to start/stop/restart the +// batch-submission work, for use only in testing. func (bs *BatcherService) TestDriver() *TestBatchSubmitter { return &TestBatchSubmitter{ - BatchSubmitter: *bs.driver, //nolint:all + BatchSubmitter: bs.driver, } } diff --git a/op-e2e/eip4844_test.go b/op-e2e/eip4844_test.go index 7f529e0d2143e..66fe08e48106f 100644 --- a/op-e2e/eip4844_test.go +++ b/op-e2e/eip4844_test.go @@ -68,17 +68,17 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva action := SystemConfigOption{ key: "beforeBatcherStart", action: func(cfg *SystemConfig, s *System) { - ttm := s.BatchSubmitter.TestDriver() - err := ttm.JamTxPool(jamCtx) - require.Nil(t, err) + driver := s.BatchSubmitter.TestDriver() + err := driver.JamTxPool(jamCtx) + require.NoError(t, err) go func() { - jamChan <- ttm.WaitOnJammingTx(jamCtx) + jamChan <- driver.WaitOnJammingTx(jamCtx) }() }, } defer func() { jamCancel() - require.Nil(t, <-jamChan) + require.NoError(t, <-jamChan, "jam tx error") }() sys, err := cfg.Start(t, action) diff --git a/op-e2e/system_fpp_test.go b/op-e2e/system_fpp_test.go index 635276fe159a8..ece63d388698c 100644 --- a/op-e2e/system_fpp_test.go +++ b/op-e2e/system_fpp_test.go @@ -134,7 +134,8 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi l2OutputRoot := agreedL2Output.OutputRoot t.Log("=====Stopping batch submitter=====") - err = sys.BatchSubmitter.Driver().StopBatchSubmitting(ctx) + driver := sys.BatchSubmitter.TestDriver() + err = driver.StopBatchSubmitting(ctx) require.NoError(t, err, "could not stop batch submitter") // Wait for the sequencer to catch up with the current L1 head so we know all submitted batches are processed @@ -162,7 +163,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi l2Claim := l2Output.OutputRoot t.Log("=====Restarting batch submitter=====") - err = sys.BatchSubmitter.Driver().StartBatchSubmitting() + err = driver.StartBatchSubmitting() require.NoError(t, err, "could not start batch submitter") t.Log("Add a transaction to the next batch after sequence of empty blocks") diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index d4e147c529dec..d7f969872638a 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -1456,8 +1456,9 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) { require.NoError(t, err) require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance") + driver := sys.BatchSubmitter.TestDriver() // stop the batch submission - err = sys.BatchSubmitter.Driver().StopBatchSubmitting(context.Background()) + err = driver.StopBatchSubmitting(context.Background()) require.NoError(t, err) // wait for any old safe blocks being submitted / derived @@ -1477,7 +1478,7 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) { require.Equal(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain advanced while batcher was stopped") // start the batch submission - err = sys.BatchSubmitter.Driver().StartBatchSubmitting() + err = driver.StartBatchSubmitting() require.NoError(t, err) time.Sleep(safeBlockInclusionDuration) @@ -1519,7 +1520,8 @@ func TestBatcherMultiTx(t *testing.T) { require.NoError(t, err) // start batch submission - err = sys.BatchSubmitter.Driver().StartBatchSubmitting() + driver := sys.BatchSubmitter.TestDriver() + err = driver.StartBatchSubmitting() require.NoError(t, err) totalTxCount := 0 diff --git a/op-service/txmgr/test_txmgr.go b/op-service/txmgr/test_txmgr.go new file mode 100644 index 0000000000000..ff57367b4ef4e --- /dev/null +++ b/op-service/txmgr/test_txmgr.go @@ -0,0 +1,94 @@ +package txmgr + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type TestTxManager struct { + *SimpleTxManager + ss *SendState + tx *types.Transaction +} + +// JamTxPool sends a transaction intended to get stuck in the txpool, and should be used ONLY for testing. +// It is non-blocking. See WaitOnJammingTx if you wish to wait on the transaction to clear. +func (m *TestTxManager) JamTxPool(ctx context.Context, candidate TxCandidate) error { + var err error + m.tx, err = m.makeStuckTx(ctx, candidate) + if err != nil { + return err + } + m.ss = NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout) + if err := m.backend.SendTransaction(ctx, m.tx); err != nil { + return err + } + return nil +} + +// WaitOnJammingTx can be called after JamTxPool in order to wait on the jam transaction clearing. +func (m *TestTxManager) WaitOnJammingTx(ctx context.Context) error { + if m.ss == nil { + return errors.New("WaitOnJammingTx called without first calling JamTxPool") + } + _, err := m.waitMined(ctx, m.tx, m.ss) + return err +} + +func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) { + gasTipCap, _, blobBaseFee, err := m.suggestGasPriceCaps(ctx) + if err != nil { + return nil, err + } + + // override with minimal fees to make sure tx gets stuck in the pool + gasFeeCap := big.NewInt(2) + gasTipCap.SetUint64(1) + + var sidecar *types.BlobTxSidecar + var blobHashes []common.Hash + if len(candidate.Blobs) > 0 { + if sidecar, blobHashes, err = MakeSidecar(candidate.Blobs); err != nil { + return nil, err + } + } + + nonce, err := m.backend.NonceAt(ctx, m.cfg.From, nil) + if err != nil { + return nil, err + } + + var txMessage types.TxData + if sidecar != nil { + blobFeeCap := calcBlobFeeCap(blobBaseFee) + message := &types.BlobTx{ + To: *candidate.To, + Data: candidate.TxData, + Gas: candidate.GasLimit, + BlobHashes: blobHashes, + Sidecar: sidecar, + Nonce: nonce, + } + if err := finishBlobTx(message, m.chainID, gasTipCap, gasFeeCap, blobFeeCap, candidate.Value); err != nil { + return nil, err + } + txMessage = message + } else { + txMessage = &types.DynamicFeeTx{ + ChainID: m.chainID, + To: candidate.To, + GasTipCap: gasTipCap, + GasFeeCap: gasFeeCap, + Value: candidate.Value, + Data: candidate.TxData, + Gas: candidate.GasLimit, + Nonce: nonce, + } + } + + return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(txMessage)) +} diff --git a/op-service/txmgr/txmgr.go b/op-service/txmgr/txmgr.go index 5a0d4e69cce1e..4587680b7aa6b 100644 --- a/op-service/txmgr/txmgr.go +++ b/op-service/txmgr/txmgr.go @@ -905,87 +905,3 @@ func finishBlobTx(message *types.BlobTx, chainID, tip, fee, blobFee, value *big. } return nil } - -type TestTxManager struct { - SimpleTxManager - ss *SendState - tx *types.Transaction -} - -// JamTxPool sends a transaction intended to get stuck in the txpool, and should be used ONLY for testing. -// It is non-blocking. See WaitOnJammingTx if you wish to wait on the transaction to clear. -func (m *TestTxManager) JamTxPool(ctx context.Context, candidate TxCandidate) error { - var err error - m.tx, err = m.makeStuckTx(ctx, candidate) - if err != nil { - return err - } - m.ss = NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout) - if err := m.backend.SendTransaction(ctx, m.tx); err != nil { - return err - } - return nil -} - -// WaitOnJammingTx can be called after JamTxPool in order to wait on the jam transaction clearing. -func (m *TestTxManager) WaitOnJammingTx(ctx context.Context) error { - if m.ss == nil { - return errors.New("WaitOnJammingTx called without first calling JamTxPool") - } - _, err := m.waitMined(ctx, m.tx, m.ss) - return err -} - -func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) { - gasTipCap, _, blobBaseFee, err := m.suggestGasPriceCaps(ctx) - if err != nil { - return nil, err - } - - // override with minimal fees to make sure tx gets stuck in the pool - gasFeeCap := big.NewInt(2) - gasTipCap.SetUint64(1) - - var sidecar *types.BlobTxSidecar - var blobHashes []common.Hash - if len(candidate.Blobs) > 0 { - if sidecar, blobHashes, err = MakeSidecar(candidate.Blobs); err != nil { - return nil, err - } - } - - nonce, err := m.backend.NonceAt(ctx, m.cfg.From, nil) - if err != nil { - return nil, err - } - - var txMessage types.TxData - if sidecar != nil { - blobFeeCap := calcBlobFeeCap(blobBaseFee) - message := &types.BlobTx{ - To: *candidate.To, - Data: candidate.TxData, - Gas: candidate.GasLimit, - BlobHashes: blobHashes, - Sidecar: sidecar, - Nonce: nonce, - } - if err := finishBlobTx(message, m.chainID, gasTipCap, gasFeeCap, blobFeeCap, candidate.Value); err != nil { - return nil, err - } - txMessage = message - } else { - txMessage = &types.DynamicFeeTx{ - ChainID: m.chainID, - To: candidate.To, - GasTipCap: gasTipCap, - GasFeeCap: gasFeeCap, - Value: candidate.Value, - Data: candidate.TxData, - Gas: candidate.GasLimit, - Nonce: nonce, - } - } - - return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(txMessage)) -}