diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index bbf1d3b27b7..f2442610921 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1259,7 +1259,12 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen // ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error { - receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID) + latest := head.BlockNumber() + var finalized int64 + if finalizedHead := head.LatestFinalizedHead(); finalizedHead != nil { + finalized = finalizedHead.BlockNumber() + } + receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID) if err != nil { return err diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 4467729e167..279a7252f1a 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -1096,9 +1096,9 @@ func (_c *TxStore_FindTxesByMetaFieldAndStates_Call[ADDR, CHAIN_ID, TX_HASH, BLO return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1106,19 +1106,19 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPen var r0 []txmgrtypes.ReceiptPlus[R] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1133,15 +1133,16 @@ type TxStore_FindTxesPendingCallback_Call[ADDR types.Hashable, CHAIN_ID types.ID // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(CHAIN_ID)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(CHAIN_ID)) }) return _c } @@ -1151,7 +1152,7 @@ func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HA return _c } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Return(run) return _c } diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 3d874cc4366..668b8db2049 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -34,7 +34,7 @@ type TxStore[ TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled - FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index fa2251168d9..b46849ef28f 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1056,7 +1056,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e } // Find confirmed txes requiring callback but have not yet been signaled -func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { +func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { var rs []dbReceiptPlus var cancel context.CancelFunc @@ -1067,8 +1067,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64 INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE - AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 - `, blockNum, chainID.String()) + AND ( + (evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations)) + OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2) + ) + AND evm.txes.evm_chain_id = $3 + `, latest, finalized, chainID.String()) if err != nil { return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err) } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index c711c2788e8..25d77db27be 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -651,7 +651,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) attempt1 := etx1.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash) + etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID) // Callback to pipeline service completed. Should be ignored @@ -684,10 +684,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID) // Search evm.txes table for tx requiring callback - receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID()) + receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) require.NoError(t, err) - assert.Len(t, receiptsPlus, 1) - assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } + + // Clear min_confirmations + pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID) + + // Search evm.txes table for tx requiring callback + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) + require.NoError(t, err) + assert.Empty(t, receiptsPlus) + + // Search evm.txes table for tx requiring callback, with block 1 finalized + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID()) + require.NoError(t, err) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } } func Test_FindTxWithIdempotencyKey(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a175e3d94..7800b26e47a 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1395,9 +1395,9 @@ func (_c *EvmTxStore_FindTxesByMetaFieldAndStates_Call) RunAndReturn(run func(co return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1405,19 +1405,19 @@ func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int6 var r0 []types.ReceiptPlus[*evmtypes.Receipt] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1432,15 +1432,16 @@ type EvmTxStore_FindTxesPendingCallback_Call struct { // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID *big.Int -func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { - return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { + return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, blockNum int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, latest int64, finalized int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(*big.Int)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(*big.Int)) }) return _c } @@ -1450,7 +1451,7 @@ func (_c *EvmTxStore_FindTxesPendingCallback_Call) Return(receiptsPlus []types.R return _c } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Return(run) return _c }