Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCFR-934/remove-finality-depth-as-default-value-for-minConfirmation-and-fix-inconsistency #14509

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/brave-ads-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"chainlink": patch
---

Remove finality depth as the default value for minConfirmation for tx jobs.
Update the sql query for fetching pending callback transactions:
if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation
else we check if the tx block is <= finalizedBlock
#updated
6 changes: 3 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head); err != nil {
if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1258,8 +1258,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID)

if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3054,7 +3054,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand All @@ -3072,7 +3072,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand Down Expand Up @@ -3100,7 +3100,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3154,7 +3154,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3191,7 +3191,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
Expand Down
10 changes: 7 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
Expand All @@ -1067,8 +1067,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND (
(evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations))
OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2)
)
AND evm.txes.evm_chain_id = $3
`, latest, finalized, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
24 changes: 20 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
attempt1 := etx1.TxAttempts[0]
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash)
etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
Expand Down Expand Up @@ -684,10 +684,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 1)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}

// Clear min_confirmations
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Empty(t, receiptsPlus)

// Search evm.txes table for tx requiring callback, with block 1 finalized
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID())
require.NoError(t, err)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
Expand Down
31 changes: 16 additions & 15 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 15 additions & 17 deletions core/services/pipeline/task.eth_tx.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now passing existing tests, but I think we are missing some coverage. Consider if a user does not set a value. In the old path, a value would be obtained that may or may not be zero, which determines whether we wait for a callback or not. In the new path, we never wait for callback. It would seem that we need a means of getting a callback after finalization (regardless of the particular depth or tags being used).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to set only txRequest.PipelineTaskRunID without txRequest.MinConfirmations, to get a callback based on the configured finality? If that doesn't work already, it seems like a logical way to support this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Now the only change in behavior should be the unnoticeable implementation detail of an "immediate" callback in the case of omitting a value for a chain with "instant" finality. So just a bit of latency in the worst case.

Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (t *ETHTxTask) getEvmChainID() string {
return t.EVMChainID
}

func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) {
var chainID StringParam
err := errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.getEvmChainID(), vars), NonemptyString(t.getEvmChainID()), "")), "evmChainID")
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

chain, err := t.legacyChains.Get(string(chainID))
Expand All @@ -81,7 +81,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
txManager := chain.TxManager()
_, err = CheckInputs(inputs, -1, -1, 0)
if err != nil {
return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
return Result{Error: errors.Wrap(err, "task inputs")}, RunInfo{}
}

maximumGasLimit := SelectGasLimit(cfg.GasEstimator(), t.jobType, t.specGasLimit)
Expand All @@ -107,25 +107,20 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
errors.Wrap(ResolveParam(&failOnRevert, From(NonemptyString(t.FailOnRevert), false)), "failOnRevert"),
)
if err != nil {
return Result{Error: err}, runInfo
}
var minOutgoingConfirmations uint64
if min, isSet := maybeMinConfirmations.Uint64(); isSet {
minOutgoingConfirmations = min
} else {
minOutgoingConfirmations = uint64(cfg.FinalityDepth())
return Result{Error: err}, RunInfo{}
}
minOutgoingConfirmations, isMinConfirmationSet := maybeMinConfirmations.Uint64()

txMeta, err := decodeMeta(txMetaMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}
txMeta.FailOnRevert = null.BoolFrom(bool(failOnRevert))
setJobIDOnMeta(lggr, vars, txMeta)

transmitChecker, err := decodeTransmitChecker(transmitCheckerMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

fromAddr, err := t.keyStore.GetRoundRobinAddress(ctx, chain.ID(), fromAddrs...)
Expand Down Expand Up @@ -159,8 +154,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
SignalCallback: true,
}

if minOutgoingConfirmations > 0 {
// Store the task run ID, so we can resume the pipeline when tx is confirmed
if !isMinConfirmationSet {
// Store the task run ID, so we can resume the pipeline when tx is finalized
txRequest.PipelineTaskRunID = &t.uuid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmank88 is PipelineTaskRunID only set during this specific functionality? I'm wondering if there is a case where it's being set for a different reason and the minConfirmations gets executed as a side-effect of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is PipelineTaskRunID only set during this specific functionality?

Yes

and the minConfirmations gets executed as a side-effect of that.

What do you mean that it "gets executed"? AFAIK this parameter only affects the callback behavior, not confirmation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm referring to a scenario where PipelineTaskRunID is used by another task/process, but it seems it doesn't.

} else if minOutgoingConfirmations > 0 {
// Store the task run ID, so we can resume the pipeline after minOutgoingConfirmations
txRequest.PipelineTaskRunID = &t.uuid
txRequest.MinConfirmations = clnull.Uint32From(uint32(minOutgoingConfirmations))
}
Expand All @@ -170,11 +168,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
return Result{Error: errors.Wrapf(ErrTaskRunFailed, "while creating transaction: %v", err)}, retryableRunInfo()
}

if minOutgoingConfirmations > 0 {
return Result{}, pendingRunInfo()
if txRequest.PipelineTaskRunID != nil {
return Result{}, RunInfo{IsPending: true}
}

return Result{Value: nil}, runInfo
return Result{}, RunInfo{}
}

func decodeMeta(metaMap MapParam) (*txmgr.TxMeta, error) {
Expand Down
Loading