Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow flat interpolation #13601

Merged
merged 9 commits into from
Jun 27, 2024
5 changes: 5 additions & 0 deletions .changeset/tall-wombats-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Allow outputs to be passed directly to the inputs
8 changes: 4 additions & 4 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func Test_Client_DonTopologies(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. Where do we create this nested "response" key in the map? Is it now a requirement for every capability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tagged you where we add the response key.

The only requirement this PR introduces is for capabilities to return a map, not just any value. I think this is preferable because a) all capabilities so far return a map anyway, and b) it's symmetric with the inputs we expect

}

capability := &TestCapability{}
Expand Down Expand Up @@ -64,9 +64,9 @@ func Test_Client_TransmissionSchedules(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

capability := &TestCapability{}
Expand Down
14 changes: 9 additions & 5 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -106,9 +106,9 @@ func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -409,8 +409,12 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit

value := request.Inputs.Underlying["executeValue1"]

response, err := values.NewMap(map[string]any{"response": value})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolekk This is where we create the response value

if err != nil {
return nil, err
}
ch <- commoncap.CapabilityResponse{
Value: value,
Value: response,
}

return ch, nil
Expand Down
11 changes: 8 additions & 3 deletions core/capabilities/remote/target/request/client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
Config: transmissionSchedule,
}

m, err := values.NewMap(map[string]any{"response": "response1"})
require.NoError(t, err)
capabilityResponse := commoncap.CapabilityResponse{
Value: values.NewString("response1"),
Value: m,
Err: nil,
}

Expand Down Expand Up @@ -106,8 +108,10 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

require.NoError(t, err)

nm, err := values.NewMap(map[string]any{"response": "response2"})
require.NoError(t, err)
capabilityResponse2 := commoncap.CapabilityResponse{
Value: values.NewString("response2"),
Value: nm,
Err: nil,
}

Expand Down Expand Up @@ -297,8 +301,9 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)

response := <-request.ResponseChan()
resp := response.Value.Underlying["response"]

assert.Equal(t, response.Value, values.NewString("response1"))
assert.Equal(t, resp, values.NewString("response1"))
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit

value := request.Inputs.Underlying["executeValue1"]

response, err := values.NewMap(map[string]any{"response": value})
if err != nil {
return nil, err
}
ch <- commoncap.CapabilityResponse{
Value: value,
Value: response,
}

return ch, nil
Expand Down
15 changes: 9 additions & 6 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
)

const (
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
triggerEvent1 = "triggerEvent1"
triggerEvent2 = "triggerEvent2"
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
)

var (
triggerEvent1 = map[string]any{"event": "triggerEvent1"}
triggerEvent2 = map[string]any{"event": "triggerEvent2"}
)

func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
Expand Down Expand Up @@ -77,7 +80,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
<-awaitRegistrationMessageCh

// receive trigger event
triggerEventValue, err := values.Wrap(triggerEvent1)
triggerEventValue, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse := commoncap.CapabilityResponse{
Value: triggerEventValue,
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestToPeerID(t *testing.T) {
}

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
val, err := values.Wrap(triggerEvent1)
val, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.CapabilityResponse{
Value: val,
Expand All @@ -98,7 +98,7 @@ func TestDefaultModeAggregator_Aggregate(t *testing.T) {
marshaled1, err := pb.MarshalCapabilityResponse(capResponse1)
require.NoError(t, err)

val2, err := values.Wrap(triggerEvent2)
val2, err := values.NewMap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.CapabilityResponse{
Value: val2,
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16 h1:+0ElI3A3mKxYbw//a34d1GmLXyfWsw3IuDHv07dImWQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f h1:oMWyaKD36Q8YEuf6U6N9Y/+dhoVrDA2Kc+p1v62nb3U=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
17 changes: 12 additions & 5 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,18 +624,25 @@ func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepReque
return nil, nil, err
}

i, err := findAndInterpolateAllKeys(step.Inputs, msg.state)
var inputs any
if step.Inputs.OutputRef != "" {
inputs = step.Inputs.OutputRef
} else {
inputs = step.Inputs.Mapping
}

i, err := findAndInterpolateAllKeys(inputs, msg.state)
if err != nil {
return nil, nil, err
}

inputs, err := values.NewMap(i.(map[string]any))
inputsMap, err := values.NewMap(i.(map[string]any))
if err != nil {
return nil, nil, err
}

tr := capabilities.CapabilityRequest{
Inputs: inputs,
Inputs: inputsMap,
Config: step.config,
Metadata: capabilities.RequestMetadata{
WorkflowID: msg.state.WorkflowID,
Expand All @@ -648,10 +655,10 @@ func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepReque

output, err := executeSyncAndUnwrapSingleValue(ctx, step.capability, tr)
if err != nil {
return inputs, nil, err
return inputsMap, nil, err
}

return inputs, output, err
return inputsMap, output, err
}

func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error {
Expand Down
99 changes: 94 additions & 5 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,9 @@ targets:
`
)

func mockAction() (*mockCapability, values.Value) {
outputs := values.NewString("output")
func mockAction(t *testing.T) (*mockCapability, values.Value) {
outputs, err := values.NewMap(map[string]any{"output": "foo"})
require.NoError(t, err)
return newMockCapability(
capabilities.MustNewCapabilityInfo(
"[email protected]",
Expand All @@ -510,7 +511,7 @@ func TestEngine_MultiStepDependencies(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, out := mockAction()
action, out := mockAction(t)
require.NoError(t, reg.Add(ctx, action))

eng, hooks := newTestEngine(t, reg, multiStepWorkflow)
Expand Down Expand Up @@ -557,7 +558,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, _ := mockAction()
action, _ := mockAction(t)
require.NoError(t, reg.Add(ctx, action))

dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clockwork.NewFakeClock())
Expand Down Expand Up @@ -610,7 +611,7 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, _ := mockAction()
action, _ := mockAction(t)
require.NoError(t, reg.Add(ctx, action))

clock := clockwork.NewFakeClock()
Expand Down Expand Up @@ -791,3 +792,91 @@ func TestEngine_GetsNodeInfoDuringInitialization(t *testing.T) {

assert.Equal(t, node, eng.localNode)
}

const passthroughInterpolationWorkflow = `
triggers:
- id: "[email protected]"
config:
feedIds:
- "0x1111111111111111111100000000000000000000000000000000000000000000"
- "0x2222222222222222222200000000000000000000000000000000000000000000"
- "0x3333333333333333333300000000000000000000000000000000000000000000"

consensus:
- id: "[email protected]"
ref: "evm_median"
inputs:
observations:
- "$(trigger.outputs)"
config:
aggregation_method: "data_feeds_2_0"
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"

targets:
- id: "[email protected]"
inputs: "$(evm_median.outputs)"
config:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
`

func TestEngine_PassthroughInterpolation(t *testing.T) {
ctx := testutils.Context(t)
reg := coreCap.NewRegistry(logger.TestLogger(t))

trigger, _ := mockTrigger(t)

require.NoError(t, reg.Add(ctx, trigger))
require.NoError(t, reg.Add(ctx, mockConsensus()))
writeID := "[email protected]"
target := newMockCapability(
capabilities.MustNewCapabilityInfo(
writeID,
capabilities.CapabilityTypeTarget,
"a write capability targeting ethereum sepolia testnet",
),
func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return capabilities.CapabilityResponse{
Value: req.Inputs,
}, nil
},
)
require.NoError(t, reg.Add(ctx, target))

eng, testHooks := newTestEngine(
t,
reg,
passthroughInterpolationWorkflow,
)

servicetest.Run(t, eng)

eid := getExecutionId(t, eng, testHooks)

state, err := eng.executionStates.Get(ctx, eid)
require.NoError(t, err)

assert.Equal(t, state.Status, store.StatusCompleted)

// There is passthrough interpolation between the consensus and target steps,
// so the input of one should be the output of the other, exactly.
gotInputs, err := values.Unwrap(state.Steps[writeID].Inputs)
require.NoError(t, err)

gotOutputs, err := values.Unwrap(state.Steps["evm_median"].Outputs.Value)
require.NoError(t, err)
assert.Equal(t, gotInputs, gotOutputs)
}
4 changes: 2 additions & 2 deletions core/services/workflows/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ targets:
},
},
{
name: "non-trigger step with no dependent refs",
name: "invalid refs",
yaml: `
triggers:
- id: "[email protected]"
Expand All @@ -241,7 +241,7 @@ targets:
inputs:
consensus_output: $(a-consensus.outputs)
`,
errMsg: "all non-trigger steps must have a dependent ref",
errMsg: "invalid refs",
},
{
name: "duplicate edge declarations",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chain-selectors v1.0.10
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16 h1:+0ElI3A3mKxYbw//a34d1GmLXyfWsw3IuDHv07dImWQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240618210005-a88f179ffc16/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f h1:oMWyaKD36Q8YEuf6U6N9Y/+dhoVrDA2Kc+p1v62nb3U=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240619131342-fcdcef990c3f/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
Loading
Loading