Skip to content

Commit

Permalink
use a txRef type with a isCancel indicator instead of a magic channel…
Browse files Browse the repository at this point in the history
… id indicator
  • Loading branch information
Roberto Bayardo committed Jul 18, 2024
1 parent bd610fb commit 2ab3e00
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 132 deletions.
55 changes: 26 additions & 29 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,20 @@ import (

var (
ErrBatcherNotRunning = errors.New("batcher is not running")

// cancelID is the channel ID used to indicate this transaction is a cancellation transaction only
cancelID = derive.ChannelID([derive.ChannelIDLength]byte{0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD, 0xBE, 0xEF})
cancelTxData = txData{
emptyTxData = txData{
frames: []frameData{
frameData{
data: []byte{},
id: frameID{
chID: cancelID,
frameNumber: 0,
},
},
},
}
)

type txRef struct {
id txID
isCancel bool
}

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
Expand Down Expand Up @@ -299,8 +297,8 @@ func (l *BatchSubmitter) loop() {
}
}

receiptsCh := make(chan txmgr.TxReceipt[txID])
queue := txmgr.NewQueue[txID](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand All @@ -314,8 +312,7 @@ func (l *BatchSubmitter) loop() {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
l.Log.Info("incompatible tx in txpool")
} else if r.ID[0].chID == cancelID &&
txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
Expand Down Expand Up @@ -425,7 +422,7 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
Expand Down Expand Up @@ -482,7 +479,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand Down Expand Up @@ -535,21 +532,21 @@ func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error)
// cancelBlockingTx creates an empty transaction of appropriate type to cancel out the incompatible
// transaction stuck in the txpool. In the future we might send an actual batch transaction instead
// of an empty one to avoid wasting the tx fee.
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate(cancelTxData.CallData())
} else if candidate, err = l.blobTxCandidate(cancelTxData); err != nil {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool")
l.queueTx(cancelTxData, candidate, queue, receiptsCh)
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) error {
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

Expand Down Expand Up @@ -584,11 +581,11 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate = l.calldataTxCandidate(data)
}

l.queueTx(txdata, candidate, queue, receiptsCh)
l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

func (l *BatchSubmitter) queueTx(txdata txData, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
Expand All @@ -597,7 +594,7 @@ func (l *BatchSubmitter) queueTx(txdata txData, candidate *txmgr.TxCandidate, qu
candidate.GasLimit = intrinsicGas
}

queue.Send(txdata.ID(), *candidate, receiptsCh)
queue.Send(txRef{txdata.ID(), isCancel}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand All @@ -624,12 +621,12 @@ func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate {
}
}

func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txID]) {
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
// Record TX Status
if r.Err != nil {
l.recordFailedTx(r.ID, r.Err)
l.recordFailedTx(r.ID.id, r.Err)
} else {
l.recordConfirmedTx(r.ID, r.Receipt)
l.recordConfirmedTx(r.ID.id, r.Receipt)
}
}

Expand Down Expand Up @@ -681,7 +678,7 @@ func logFields(xs ...any) (fs []any) {
}

type TestBatchSubmitter struct {
BatchSubmitter
*BatchSubmitter
ttm *txmgr.TestTxManager
}

Expand All @@ -697,16 +694,16 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
candidate = l.calldataTxCandidate(cancelTxData.CallData())
} else if candidate, err = l.blobTxCandidate(cancelTxData); err != nil {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
}
if candidate.GasLimit, err = core.IntrinsicGas(candidate.TxData, nil, false, true, true, false); err != nil {
return err
}

l.ttm = &txmgr.TestTxManager{
SimpleTxManager: *l.Txmgr, //nolint:all
SimpleTxManager: l.Txmgr,
}
l.Log.Info("sending txpool blocking test tx")
if err := l.ttm.JamTxPool(ctx, *candidate); err != nil {
Expand Down
12 changes: 3 additions & 9 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {

var _ cliapp.Lifecycle = (*BatcherService)(nil)

// Driver returns the handler on the batch-submitter driver element,
// to start/stop/restart the batch-submission work, for use in testing.
func (bs *BatcherService) Driver() *BatchSubmitter {
return bs.driver
}

// Driver returns the handler on the batch-submitter driver element,
// to start/stop/restart the batch-submission work, for use in testing.
// TestDriver returns a handler for the batch-submitter driver element, to start/stop/restart the
// batch-submission work, for use only in testing.
func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
return &TestBatchSubmitter{
BatchSubmitter: *bs.driver, //nolint:all
BatchSubmitter: bs.driver,
}
}
10 changes: 5 additions & 5 deletions op-e2e/eip4844_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva
action := SystemConfigOption{
key: "beforeBatcherStart",
action: func(cfg *SystemConfig, s *System) {
ttm := s.BatchSubmitter.TestDriver()
err := ttm.JamTxPool(jamCtx)
require.Nil(t, err)
driver := s.BatchSubmitter.TestDriver()
err := driver.JamTxPool(jamCtx)
require.NoError(t, err)
go func() {
jamChan <- ttm.WaitOnJammingTx(jamCtx)
jamChan <- driver.WaitOnJammingTx(jamCtx)
}()
},
}
defer func() {
jamCancel()
require.Nil(t, <-jamChan)
require.NoError(t, <-jamChan, "jam tx error")
}()

sys, err := cfg.Start(t, action)
Expand Down
5 changes: 3 additions & 2 deletions op-e2e/system_fpp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi
l2OutputRoot := agreedL2Output.OutputRoot

t.Log("=====Stopping batch submitter=====")
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(ctx)
driver := sys.BatchSubmitter.TestDriver()
err = driver.StopBatchSubmitting(ctx)
require.NoError(t, err, "could not stop batch submitter")

// Wait for the sequencer to catch up with the current L1 head so we know all submitted batches are processed
Expand Down Expand Up @@ -162,7 +163,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi
l2Claim := l2Output.OutputRoot

t.Log("=====Restarting batch submitter=====")
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
err = driver.StartBatchSubmitting()
require.NoError(t, err, "could not start batch submitter")

t.Log("Add a transaction to the next batch after sequence of empty blocks")
Expand Down
8 changes: 5 additions & 3 deletions op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,8 +1456,9 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.NoError(t, err)
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")

driver := sys.BatchSubmitter.TestDriver()
// stop the batch submission
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(context.Background())
err = driver.StopBatchSubmitting(context.Background())
require.NoError(t, err)

// wait for any old safe blocks being submitted / derived
Expand All @@ -1477,7 +1478,7 @@ func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.Equal(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain advanced while batcher was stopped")

// start the batch submission
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
err = driver.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(safeBlockInclusionDuration)

Expand Down Expand Up @@ -1519,7 +1520,8 @@ func TestBatcherMultiTx(t *testing.T) {
require.NoError(t, err)

// start batch submission
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
driver := sys.BatchSubmitter.TestDriver()
err = driver.StartBatchSubmitting()
require.NoError(t, err)

totalTxCount := 0
Expand Down
94 changes: 94 additions & 0 deletions op-service/txmgr/test_txmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package txmgr

import (
"context"
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type TestTxManager struct {
*SimpleTxManager
ss *SendState
tx *types.Transaction
}

// JamTxPool sends a transaction intended to get stuck in the txpool, and should be used ONLY for testing.
// It is non-blocking. See WaitOnJammingTx if you wish to wait on the transaction to clear.
func (m *TestTxManager) JamTxPool(ctx context.Context, candidate TxCandidate) error {
var err error
m.tx, err = m.makeStuckTx(ctx, candidate)
if err != nil {
return err
}
m.ss = NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
if err := m.backend.SendTransaction(ctx, m.tx); err != nil {
return err
}
return nil
}

// WaitOnJammingTx can be called after JamTxPool in order to wait on the jam transaction clearing.
func (m *TestTxManager) WaitOnJammingTx(ctx context.Context) error {
if m.ss == nil {
return errors.New("WaitOnJammingTx called without first calling JamTxPool")
}
_, err := m.waitMined(ctx, m.tx, m.ss)
return err
}

func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, _, blobBaseFee, err := m.suggestGasPriceCaps(ctx)
if err != nil {
return nil, err
}

// override with minimal fees to make sure tx gets stuck in the pool
gasFeeCap := big.NewInt(2)
gasTipCap.SetUint64(1)

var sidecar *types.BlobTxSidecar
var blobHashes []common.Hash
if len(candidate.Blobs) > 0 {
if sidecar, blobHashes, err = MakeSidecar(candidate.Blobs); err != nil {
return nil, err
}
}

nonce, err := m.backend.NonceAt(ctx, m.cfg.From, nil)
if err != nil {
return nil, err
}

var txMessage types.TxData
if sidecar != nil {
blobFeeCap := calcBlobFeeCap(blobBaseFee)
message := &types.BlobTx{
To: *candidate.To,
Data: candidate.TxData,
Gas: candidate.GasLimit,
BlobHashes: blobHashes,
Sidecar: sidecar,
Nonce: nonce,
}
if err := finishBlobTx(message, m.chainID, gasTipCap, gasFeeCap, blobFeeCap, candidate.Value); err != nil {
return nil, err
}
txMessage = message
} else {
txMessage = &types.DynamicFeeTx{
ChainID: m.chainID,
To: candidate.To,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Value: candidate.Value,
Data: candidate.TxData,
Gas: candidate.GasLimit,
Nonce: nonce,
}
}

return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(txMessage))
}
Loading

0 comments on commit 2ab3e00

Please sign in to comment.