Skip to content

Commit

Permalink
wait for callback unless explicit zero
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 7, 2024
1 parent 9c9678a commit 871214a
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions core/services/pipeline/task.eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (t *ETHTxTask) getEvmChainID() string {
return t.EVMChainID
}

func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) {
var chainID StringParam
err := errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.getEvmChainID(), vars), NonemptyString(t.getEvmChainID()), "")), "evmChainID")
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

chain, err := t.legacyChains.Get(string(chainID))
Expand All @@ -81,7 +81,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
txManager := chain.TxManager()
_, err = CheckInputs(inputs, -1, -1, 0)
if err != nil {
return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
return Result{Error: errors.Wrap(err, "task inputs")}, RunInfo{}
}

maximumGasLimit := SelectGasLimit(cfg.GasEstimator(), t.jobType, t.specGasLimit)
Expand All @@ -107,20 +107,20 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
errors.Wrap(ResolveParam(&failOnRevert, From(NonemptyString(t.FailOnRevert), false)), "failOnRevert"),
)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}
minOutgoingConfirmations, isMinConfirmationSet := maybeMinConfirmations.Uint64()

txMeta, err := decodeMeta(txMetaMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}
txMeta.FailOnRevert = null.BoolFrom(bool(failOnRevert))
setJobIDOnMeta(lggr, vars, txMeta)

transmitChecker, err := decodeTransmitChecker(transmitCheckerMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

fromAddr, err := t.keyStore.GetRoundRobinAddress(ctx, chain.ID(), fromAddrs...)
Expand Down Expand Up @@ -154,9 +154,12 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
SignalCallback: true,
}

if isMinConfirmationSet && minOutgoingConfirmations > 0 {
if !isMinConfirmationSet {
// Store the task run ID, so we can resume the pipeline when tx is confirmed
txRequest.PipelineTaskRunID = &t.uuid
} else if minOutgoingConfirmations > 0 {
// Store the task run ID, so we can resume the pipeline after minOutgoingConfirmations
txRequest.PipelineTaskRunID = &t.uuid
txRequest.MinConfirmations = clnull.Uint32From(uint32(minOutgoingConfirmations))
}

Expand All @@ -165,11 +168,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
return Result{Error: errors.Wrapf(ErrTaskRunFailed, "while creating transaction: %v", err)}, retryableRunInfo()
}

if isMinConfirmationSet && minOutgoingConfirmations > 0 {
return Result{}, pendingRunInfo()
if txRequest.PipelineTaskRunID != nil {
return Result{}, RunInfo{IsPending: true}
}

return Result{Value: nil}, runInfo
return Result{}, RunInfo{}
}

func decodeMeta(metaMap MapParam) (*txmgr.TxMeta, error) {
Expand Down

0 comments on commit 871214a

Please sign in to comment.