Skip to content

Commit

Permalink
Add transaction status function for external components to use (#13040)
Browse files Browse the repository at this point in the history
* Added transaction status function for external components to use

* Added changeset and fixed linting

* Updated changeset

* Made internal error regex matching explicit

* Updated common client error list

* Defined a generalized tx error interface for chain agnostic compatibility

* Adjusted to align as close as possible with the ChainWriter interface

* Updated to align transaction status method to ChainWriter expectations

* Updated to use the TransactionStatus type from chainlink-common

* Simplified the TxError interface

* Updated FindTxWithIdempotencyKey to return error if no rows found

* Removed sql error checks outside of txstore

* Updated method signature to accept string instead of uuid

* Updated mocks

* Moved tx finality check to Confirmer and renamed fields

* Updated tx finalized check to consider re-orgs

* Fixed linting

* Removed finality from GetTransactionStatus method

* Cleaned out unused client method

* Rephrased comment to match others
  • Loading branch information
amit-momin authored Jun 19, 2024
1 parent 29196d5 commit 0ac790b
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 61 deletions.
5 changes: 5 additions & 0 deletions .changeset/funny-snails-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added API for products to query a transaction's status in the TXM #internal
6 changes: 3 additions & 3 deletions common/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
InsufficientFunds // Tx was rejected due to insufficient funds.
ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected.
FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price.
OutOfCounters // The error returned when a transaction is too complex to be proven by zk circuits. This error is mainly returned by zk chains.
TerminallyStuck // The error returned when a transaction is or could get terminally stuck in the mempool without any chance of inclusion.
sendTxReturnCodeLen // tracks the number of errors. Must always be last
)

Expand Down Expand Up @@ -50,8 +50,8 @@ func (c SendTxReturnCode) String() string {
return "ExceedsMaxFee"
case FeeOutOfValidRange:
return "FeeOutOfValidRange"
case OutOfCounters:
return "OutOfCounters"
case TerminallyStuck:
return "TerminallyStuck"
default:
return fmt.Sprintf("SendTxReturnCode(%d)", c)
}
Expand Down
2 changes: 1 addition & 1 deletion common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
ExpectedTxResult: "not enough keccak counters to continue the execution",
ExpectedCriticalErr: "",
ResultsByCode: sendTxErrors{
OutOfCounters: {errors.New("not enough keccak counters to continue the execution")},
TerminallyStuck: {errors.New("not enough keccak counters to continue the execution")},
},
},
}
Expand Down
30 changes: 30 additions & 0 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 69 additions & 27 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package txmgr

import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
Expand All @@ -14,6 +13,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
Expand All @@ -29,6 +29,8 @@ import (
// ResumeCallback is assumed to be idempotent
type ResumeCallback func(ctx context.Context, id uuid.UUID, result interface{}, err error) error

type NewErrorClassifier func(err error) txmgrtypes.ErrorClassifier

// TxManager is the main component of the transaction manager.
// It is also the interface to external callers.
//
Expand Down Expand Up @@ -62,6 +64,7 @@ type TxManager[
FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error)
CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error)
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type reset struct {
Expand Down Expand Up @@ -102,13 +105,14 @@ type Txm[
chSubbed chan struct{}
wg sync.WaitGroup

reaper *Reaper[CHAIN_ID]
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
reaper *Reaper[CHAIN_ID]
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -141,26 +145,28 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
newErrorClassifierFunc NewErrorClassifier,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
txStore: txStore,
config: cfg,
txConfig: txCfg,
keyStore: keyStore,
chainID: chainId,
checkerFactory: checkerFactory,
chHeads: make(chan HEAD),
trigger: make(chan ADDR),
chStop: make(chan struct{}),
chSubbed: make(chan struct{}),
reset: make(chan reset),
fwdMgr: fwdMgr,
txAttemptBuilder: txAttemptBuilder,
broadcaster: broadcaster,
confirmer: confirmer,
resender: resender,
tracker: tracker,
logger: logger.Sugared(lggr),
txStore: txStore,
config: cfg,
txConfig: txCfg,
keyStore: keyStore,
chainID: chainId,
checkerFactory: checkerFactory,
chHeads: make(chan HEAD),
trigger: make(chan ADDR),
chStop: make(chan struct{}),
chSubbed: make(chan struct{}),
reset: make(chan reset),
fwdMgr: fwdMgr,
txAttemptBuilder: txAttemptBuilder,
broadcaster: broadcaster,
confirmer: confirmer,
resender: resender,
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -498,7 +504,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
Expand Down Expand Up @@ -625,6 +631,38 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTrans
return b.txStore.CountTransactionsByState(ctx, state, b.chainID)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
// Loads attempts and receipts in the transaction
tx, err := b.txStore.FindTxWithIdempotencyKey(ctx, transactionID, b.chainID)
if err != nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID, err)
}
// This check is required since a no-rows error returns nil err
if tx == nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s", transactionID)
}
switch tx.State {
case TxUnconfirmed, TxConfirmedMissingReceipt:
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
// TODO: Check for finality and return finalized status
// Return unconfirmed if tx receipt's block is newer than the latest finalized block
return commontypes.Unconfirmed, nil
case TxFatalError:
// Use an ErrorClassifier to determine if the transaction is considered Fatal
txErr := b.newErrorClassifier(tx.GetError())
if txErr != nil && txErr.IsFatal() {
return commontypes.Fatal, tx.GetError()
}
// Return failed for all other tx's marked as FatalError
return commontypes.Failed, tx.GetError()
default:
// Unstarted and InProgress are classified as unknown since they are not supported by the ChainWriter interface
return commontypes.Unknown, nil
}
}

type NullTxManager[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
Expand Down Expand Up @@ -708,6 +746,10 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cou
return count, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn(
ctx context.Context,
txRequest txmgrtypes.TxRequest[ADDR, TX_HASH],
Expand Down
7 changes: 7 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,10 @@ func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetChecker() (Transm

return t, nil
}

// Provides error classification to external components in a chain agnostic way
// Only exposes the error types that could be set in the transaction error field
type ErrorClassifier interface {
error
IsFatal() bool
}
1 change: 1 addition & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type TxStore[
FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Find transactions loaded with transaction attempts and receipts by transaction IDs and states
FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []int64, states []TxState, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
}

// TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts
Expand Down
31 changes: 25 additions & 6 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"

commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
commontypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ const (
TransactionAlreadyMined
Fatal
ServiceUnavailable
OutOfCounters
TerminallyStuck
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -246,10 +247,17 @@ var zkSync = ClientErrors{
}

var zkEvm = ClientErrors{
OutOfCounters: regexp.MustCompile(`(?:: |^)not enough .* counters to continue the execution$`),
TerminallyStuck: regexp.MustCompile(`(?:: |^)not enough .* counters to continue the execution$`),
}

var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm}
const TerminallyStuckMsg = "transaction terminally stuck"

// Tx.Error messages that are set internally so they are not chain or client specific
var internal = ClientErrors{
TerminallyStuck: regexp.MustCompile(TerminallyStuckMsg),
}

var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm, internal}

// ClientErrorRegexes returns a map of compiled regexes for each error type
func ClientErrorRegexes(errsRegex config.ClientErrors) *ClientErrors {
Expand Down Expand Up @@ -353,9 +361,16 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool {
return s.is(ServiceUnavailable, configErrors)
}

// IsOutOfCounters is a zk chain specific error returned if the transaction is too complex to prove on zk circuits
func (s *SendError) IsOutOfCounters(configErrors *ClientErrors) bool {
return s.is(OutOfCounters, configErrors)
// IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion
func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool {
return s.is(TerminallyStuck, configErrors)
}

// IsFatal indicates if a transaction error is considered fatal for external callers
// The naming discrepancy is due to the generic transaction statuses introduced by ChainWriter
func (s *SendError) IsFatal() bool {
// An error classified as terminally stuck is considered fatal since the transaction payload should NOT be retried by external callers
return s.IsTerminallyStuckConfigError(nil)
}

// IsTimeout indicates if the error was caused by an exceeded context deadline
Expand Down Expand Up @@ -399,6 +414,10 @@ func NewSendError(e error) *SendError {
return &SendError{err: pkgerrors.WithStack(e), fatal: fatal}
}

func NewTxError(e error) commontypes.ErrorClassifier {
return NewSendError(e)
}

// Geth/parity returns these errors if the transaction failed in such a way that:
// 1. It will never be included into a block as a result of this send
// 2. Resending the transaction at a different gas price will never change the outcome
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/tx_simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestSimulateTx_Default(t *testing.T) {
Data: []byte("0x00"),
}
sendErr := client.SimulateTransaction(ctx, ethClient, logger.TestSugared(t), "", msg)
require.Equal(t, true, sendErr.IsOutOfCounters(nil))
require.Equal(t, true, sendErr.IsTerminallyStuckConfigError(nil))
})

t.Run("returns without error if simulation returns non-OOC error", func(t *testing.T) {
Expand Down Expand Up @@ -108,6 +108,6 @@ func TestSimulateTx_Default(t *testing.T) {
Data: []byte("0x00"),
}
sendErr := client.SimulateTransaction(ctx, ethClient, logger.TestSugared(t), "", msg)
require.Equal(t, false, sendErr.IsOutOfCounters(nil))
require.Equal(t, false, sendErr.IsTerminallyStuckConfigError(nil))
})
}
6 changes: 3 additions & 3 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
Expand All @@ -26,7 +26,7 @@ func NewTxm(
clientErrors config.ClientErrors,
dbConfig DatabaseConfig,
listenerConfig ListenerConfig,
client evmclient.Client,
client client.Client,
lggr logger.Logger,
logPoller logpoller.LogPoller,
keyStore keystore.Eth,
Expand Down Expand Up @@ -77,7 +77,7 @@ func NewEvmTxm(
resender *Resender,
tracker *Tracker,
) *Txm {
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker)
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, client.NewTxError)
}

// NewEvmResender creates a new concrete EvmResender
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, dbTx)
require.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
require.Equal(t, "transaction terminally stuck", dbTx.Error.String)
require.Equal(t, client.TerminallyStuckMsg, dbTx.Error.String)
})
}

Expand Down
Loading

0 comments on commit 0ac790b

Please sign in to comment.