Skip to content

Commit

Permalink
support finality callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 7, 2024
1 parent 871214a commit 3b899db
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 39 deletions.
7 changes: 6 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,12 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)
latest := head.BlockNumber()
var finalized int64
if finalizedHead := head.LatestFinalizedHead(); finalizedHead != nil {
finalized = finalizedHead.BlockNumber()
}
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID)

if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
10 changes: 7 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
Expand All @@ -1067,8 +1067,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND (
(evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations))
OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2)
)
AND evm.txes.evm_chain_id = $3
`, latest, finalized, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
24 changes: 20 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
attempt1 := etx1.TxAttempts[0]
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash)
etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
Expand Down Expand Up @@ -684,10 +684,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 1)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}

// Clear min_confirmations
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Empty(t, receiptsPlus)

// Search evm.txes table for tx requiring callback, with block 1 finalized
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID())
require.NoError(t, err)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
Expand Down
31 changes: 16 additions & 15 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3b899db

Please sign in to comment.