Skip to content

Commit

Permalink
Allow flat interpolation (#13601)
Browse files Browse the repository at this point in the history
* Allow flat interpolation

* Update common

* Fix tests after rebase
  • Loading branch information
cedric-cordenier authored Jun 27, 2024
1 parent 884fca7 commit c3f6b70
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 46 deletions.
5 changes: 5 additions & 0 deletions .changeset/tall-wombats-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Allow outputs to be passed directly to the inputs
8 changes: 4 additions & 4 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func Test_Client_DonTopologies(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

capability := &TestCapability{}
Expand Down Expand Up @@ -64,9 +64,9 @@ func Test_Client_TransmissionSchedules(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

capability := &TestCapability{}
Expand Down
14 changes: 9 additions & 5 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -106,9 +106,9 @@ func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
}

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -409,8 +409,12 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit

value := request.Inputs.Underlying["executeValue1"]

response, err := values.NewMap(map[string]any{"response": value})
if err != nil {
return nil, err
}
ch <- commoncap.CapabilityResponse{
Value: value,
Value: response,
}

return ch, nil
Expand Down
11 changes: 8 additions & 3 deletions core/capabilities/remote/target/request/client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
Config: transmissionSchedule,
}

m, err := values.NewMap(map[string]any{"response": "response1"})
require.NoError(t, err)
capabilityResponse := commoncap.CapabilityResponse{
Value: values.NewString("response1"),
Value: m,
Err: nil,
}

Expand Down Expand Up @@ -106,8 +108,10 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

require.NoError(t, err)

nm, err := values.NewMap(map[string]any{"response": "response2"})
require.NoError(t, err)
capabilityResponse2 := commoncap.CapabilityResponse{
Value: values.NewString("response2"),
Value: nm,
Err: nil,
}

Expand Down Expand Up @@ -297,8 +301,9 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)

response := <-request.ResponseChan()
resp := response.Value.Underlying["response"]

assert.Equal(t, response.Value, values.NewString("response1"))
assert.Equal(t, resp, values.NewString("response1"))
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ func (t TestCapability) Execute(ctx context.Context, request commoncap.Capabilit

value := request.Inputs.Underlying["executeValue1"]

response, err := values.NewMap(map[string]any{"response": value})
if err != nil {
return nil, err
}
ch <- commoncap.CapabilityResponse{
Value: value,
Value: response,
}

return ch, nil
Expand Down
15 changes: 9 additions & 6 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
)

const (
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
triggerEvent1 = "triggerEvent1"
triggerEvent2 = "triggerEvent2"
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
)

var (
triggerEvent1 = map[string]any{"event": "triggerEvent1"}
triggerEvent2 = map[string]any{"event": "triggerEvent2"}
)

func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
Expand Down Expand Up @@ -77,7 +80,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
<-awaitRegistrationMessageCh

// receive trigger event
triggerEventValue, err := values.Wrap(triggerEvent1)
triggerEventValue, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse := commoncap.CapabilityResponse{
Value: triggerEventValue,
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestToPeerID(t *testing.T) {
}

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
val, err := values.Wrap(triggerEvent1)
val, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.CapabilityResponse{
Value: val,
Expand All @@ -98,7 +98,7 @@ func TestDefaultModeAggregator_Aggregate(t *testing.T) {
marshaled1, err := pb.MarshalCapabilityResponse(capResponse1)
require.NoError(t, err)

val2, err := values.Wrap(triggerEvent2)
val2, err := values.NewMap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.CapabilityResponse{
Value: val2,
Expand Down
5 changes: 4 additions & 1 deletion core/capabilities/streams/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ func newTriggerEvent(t *testing.T, reportList []datastreams.FeedReport, triggerE
eventVal, err := values.Wrap(triggerEvent)
require.NoError(t, err)

marshaled, err := pb.MarshalCapabilityResponse(capabilities.CapabilityResponse{Value: eventVal})
marshaled, err := pb.MarshalCapabilityResponse(
capabilities.CapabilityResponse{
Value: eventVal.(*values.Map),
})
require.NoError(t, err)
msg := &remotetypes.MessageBody{
Sender: sender[:],
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625074419-c278d083facf
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625145034-72dab520f468
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625074419-c278d083facf h1:d9AS/K8RSVG64USb20N/U7RaPOsYPcmuLGJq7iE+caM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625074419-c278d083facf/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625145034-72dab520f468 h1:rhsUMdSrCerrUzzsOWaHkcb+qlB7knEAXYv/P29YUn8=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625145034-72dab520f468/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
17 changes: 12 additions & 5 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,18 +668,25 @@ func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map,
return nil, nil, err
}

i, err := findAndInterpolateAllKeys(step.Inputs, msg.state)
var inputs any
if step.Inputs.OutputRef != "" {
inputs = step.Inputs.OutputRef
} else {
inputs = step.Inputs.Mapping
}

i, err := findAndInterpolateAllKeys(inputs, msg.state)
if err != nil {
return nil, nil, err
}

inputs, err := values.NewMap(i.(map[string]any))
inputsMap, err := values.NewMap(i.(map[string]any))
if err != nil {
return nil, nil, err
}

tr := capabilities.CapabilityRequest{
Inputs: inputs,
Inputs: inputsMap,
Config: step.config,
Metadata: capabilities.RequestMetadata{
WorkflowID: msg.state.WorkflowID,
Expand All @@ -692,10 +699,10 @@ func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map,

output, err := executeSyncAndUnwrapSingleValue(ctx, step.capability, tr)
if err != nil {
return inputs, nil, err
return inputsMap, nil, err
}

return inputs, output, err
return inputsMap, output, err
}

func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error {
Expand Down
99 changes: 94 additions & 5 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,9 @@ targets:
`
)

func mockAction() (*mockCapability, values.Value) {
outputs := values.NewString("output")
func mockAction(t *testing.T) (*mockCapability, values.Value) {
outputs, err := values.NewMap(map[string]any{"output": "foo"})
require.NoError(t, err)
return newMockCapability(
capabilities.MustNewCapabilityInfo(
"[email protected]",
Expand All @@ -533,7 +534,7 @@ func TestEngine_MultiStepDependencies(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, out := mockAction()
action, out := mockAction(t)
require.NoError(t, reg.Add(ctx, action))

eng, hooks := newTestEngine(t, reg, multiStepWorkflow)
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, _ := mockAction()
action, _ := mockAction(t)
require.NoError(t, reg.Add(ctx, action))
dbstore := newTestDBStore(t, clockwork.NewFakeClock())
ec := &store.WorkflowExecution{
Expand Down Expand Up @@ -632,7 +633,7 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

action, _ := mockAction()
action, _ := mockAction(t)
require.NoError(t, reg.Add(ctx, action))

clock := clockwork.NewFakeClock()
Expand Down Expand Up @@ -814,6 +815,94 @@ func TestEngine_GetsNodeInfoDuringInitialization(t *testing.T) {
assert.Equal(t, node, eng.localNode)
}

const passthroughInterpolationWorkflow = `
triggers:
- id: "[email protected]"
config:
feedIds:
- "0x1111111111111111111100000000000000000000000000000000000000000000"
- "0x2222222222222222222200000000000000000000000000000000000000000000"
- "0x3333333333333333333300000000000000000000000000000000000000000000"
consensus:
- id: "[email protected]"
ref: "evm_median"
inputs:
observations:
- "$(trigger.outputs)"
config:
aggregation_method: "data_feeds_2_0"
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"
targets:
- id: "[email protected]"
inputs: "$(evm_median.outputs)"
config:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
`

func TestEngine_PassthroughInterpolation(t *testing.T) {
ctx := testutils.Context(t)
reg := coreCap.NewRegistry(logger.TestLogger(t))

trigger, _ := mockTrigger(t)

require.NoError(t, reg.Add(ctx, trigger))
require.NoError(t, reg.Add(ctx, mockConsensus()))
writeID := "[email protected]"
target := newMockCapability(
capabilities.MustNewCapabilityInfo(
writeID,
capabilities.CapabilityTypeTarget,
"a write capability targeting ethereum sepolia testnet",
),
func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return capabilities.CapabilityResponse{
Value: req.Inputs,
}, nil
},
)
require.NoError(t, reg.Add(ctx, target))

eng, testHooks := newTestEngine(
t,
reg,
passthroughInterpolationWorkflow,
)

servicetest.Run(t, eng)

eid := getExecutionId(t, eng, testHooks)

state, err := eng.executionStates.Get(ctx, eid)
require.NoError(t, err)

assert.Equal(t, state.Status, store.StatusCompleted)

// There is passthrough interpolation between the consensus and target steps,
// so the input of one should be the output of the other, exactly.
gotInputs, err := values.Unwrap(state.Steps[writeID].Inputs)
require.NoError(t, err)

gotOutputs, err := values.Unwrap(state.Steps["evm_median"].Outputs.Value)
require.NoError(t, err)
assert.Equal(t, gotInputs, gotOutputs)
}

func TestEngine_Error(t *testing.T) {
err := errors.New("some error")
tests := []struct {
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ targets:
},
},
{
name: "non-trigger step with no dependent refs",
name: "invalid refs",
yaml: `
triggers:
- id: "[email protected]"
Expand All @@ -241,7 +241,7 @@ targets:
inputs:
consensus_output: $(a-consensus.outputs)
`,
errMsg: "all non-trigger steps must have a dependent ref",
errMsg: "invalid refs",
},
{
name: "duplicate edge declarations",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chain-selectors v1.0.10
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625074419-c278d083facf
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240625145034-72dab520f468
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917
Expand Down
Loading

0 comments on commit c3f6b70

Please sign in to comment.