Skip to content

Commit

Permalink
Merge branch 'release/2.19.0' into BCF-3430-nodes-noloaded-2.19.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Bwest981 authored Nov 22, 2024
2 parents 3da5c1d + 24073b2 commit a4a6156
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 139 deletions.
30 changes: 0 additions & 30 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,40 +140,10 @@ func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
}

func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error {
req, err := request.NewClientRegisterToWorkflowRequest(ctx, c.lggr, registerRequest, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)

if err != nil {
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

resp := <-req.ResponseChan()
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest commoncap.UnregisterFromWorkflowRequest) error {
req, err := request.NewClientUnregisterFromWorkflowRequest(ctx, c.lggr, unregisterRequest, c.remoteCapabilityInfo,
c.localDONInfo, c.dispatcher, c.requestTimeout)

if err != nil {
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

resp := <-req.ResponseChan()
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

Expand Down
102 changes: 0 additions & 102 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,84 +26,6 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

func Test_RemoteExecutableCapability_InsufficientCapabilityResponses(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.NotNil(t, responseError)
}

capability := &TestCapability{}

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10ms",
})
require.NoError(t, err)

var methods []func(ctx context.Context, caller commoncap.ExecutableCapability)

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest)
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.Error(t, responseError)
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.Error(t, responseError)
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 10, 10*time.Minute, method)
}
}

func Test_RemoteExecutableCapability_InsufficientWorkflowRequests(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.NotNil(t, responseError)
}

timeOut := 10 * time.Minute

capability := &TestCapability{}

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10ms",
})
require.NoError(t, err)

var methods []func(ctx context.Context, caller commoncap.ExecutableCapability)

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, responseTest)
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.Error(t, responseError)
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.Error(t, responseError)
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 10, 10*time.Millisecond, 10, 9, timeOut, method)
}
}

func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) {
ctx := testutils.Context(t)

Expand Down Expand Up @@ -214,18 +136,6 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
assert.Equal(t, "error executing request: failed to register to workflow: an error", responseError.Error())
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
assert.Equal(t, "error executing request: failed to unregister from workflow: an error", responseError.Error())
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method)
}
Expand All @@ -250,18 +160,6 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) {
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
assert.Equal(t, "error executing request: request expired", responseError.Error())
})
})

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) {
assert.Equal(t, "error executing request: request expired", responseError.Error())
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute,
method)
Expand Down
32 changes: 25 additions & 7 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

const fifteenMinutesMs = 15 * 60 * 1000
const (
fifteenMinutesMs = 15 * 60 * 1000
reservedFieldNameStepTimeout = "cre_step_timeout"
maxStepTimeoutOverrideSec = 10 * 60 // 10 minutes
)

type stepRequest struct {
stepRef string
Expand Down Expand Up @@ -769,10 +773,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()

inputs, outputs, err := e.executeStep(stepCtx, l, msg)
inputs, outputs, err := e.executeStep(ctx, l, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -919,6 +920,20 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
if err != nil {
return nil, nil, err
}
stepTimeoutDuration := e.stepTimeoutDuration
if timeoutOverride, ok := config.Underlying[reservedFieldNameStepTimeout]; ok {
var desiredTimeout int64
err2 := timeoutOverride.UnwrapTo(&desiredTimeout)
if err2 != nil {
e.logger.Warnw("couldn't decode step timeout override, using default", "error", err2, "default", stepTimeoutDuration)
} else {
if desiredTimeout > maxStepTimeoutOverrideSec {
e.logger.Warnw("desired step timeout is too large, limiting to max value", "maxValue", maxStepTimeoutOverrideSec)
desiredTimeout = maxStepTimeoutOverrideSec
}
stepTimeoutDuration = time.Duration(desiredTimeout) * time.Second
}
}

tr := capabilities.CapabilityRequest{
Inputs: inputsMap,
Expand All @@ -934,8 +949,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
},
}

e.metrics.incrementCapabilityInvocationCounter(ctx)
output, err := step.capability.Execute(ctx, tr)
stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration)
defer cancel()

e.metrics.incrementCapabilityInvocationCounter(stepCtx)
output, err := step.capability.Execute(stepCtx, tr)
if err != nil {
return inputsMap, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ targets:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
cre_step_timeout: 610
`

type testHooks struct {
Expand Down

0 comments on commit a4a6156

Please sign in to comment.