Skip to content

Commit

Permalink
Update core to be compatible with changes made in chainlink-common as…
Browse files Browse the repository at this point in the history
… part of KS-120
  • Loading branch information
ettec committed Apr 19, 2024
1 parent 63abd08 commit 4530c71
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-lizards-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Updates required to work with chainlink-common changes to support grpc streams for capabilities
6 changes: 3 additions & 3 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
return nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
{
name: "trigger",
newCapability: func(ctx context.Context, reg *coreCapabilities.Registry) (string, error) {
odt := triggers.NewOnDemand()
odt := triggers.NewOnDemand(logger.TestLogger(t))
info, err := odt.Info(ctx)
require.NoError(t, err)
return info.ID, reg.Add(ctx, odt)
Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
Expand All @@ -60,10 +60,12 @@ func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- common
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
return err
return nil, err
}
}
return nil

// TODO: return a channel that will be closed when all responses are received
return nil, nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
Expand Down
6 changes: 4 additions & 2 deletions core/capabilities/remote/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package remote_test
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand All @@ -24,5 +24,7 @@ func TestTarget_Placeholder(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil)
target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr)
require.NoError(t, target.Execute(ctx, nil, commoncap.CapabilityRequest{}))

_, err := target.Execute(ctx, commoncap.CapabilityRequest{})
assert.NoError(t, err)
}
8 changes: 3 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type registrationKey struct {
}

type pubRegState struct {
callback chan<- commoncap.CapabilityResponse
callback <-chan commoncap.CapabilityResponse
request commoncap.CapabilityRequest
}

Expand Down Expand Up @@ -112,9 +112,8 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled)

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / lint

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / lint

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Analyze go

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Analyze go

not enough arguments in call to p.underlying.RegisterTrigger

Check failure on line 116 in core/capabilities/remote/trigger_publisher.go

View workflow job for this annotation

GitHub Actions / Analyze go

assignment mismatch: 2 variables but p.underlying.RegisterTrigger returns 1 value
cancel()
if err == nil {
p.registrations[key] = &pubRegState{
Expand Down Expand Up @@ -153,7 +152,6 @@ func (p *triggerPublisher) registrationCleanupLoop() {
cancel()
p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err)
// after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel
close(req.callback)
delete(p.registrations, key)
p.messageCache.Delete(key)
}
Expand All @@ -163,7 +161,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.CapabilityResponse, key registrationKey) {
func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.CapabilityResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error)
return t.info, nil
}

func (t *testTrigger) RegisterTrigger(_ context.Context, _ chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
t.registrationsCh <- request
return nil
return nil, nil
}

func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error {
Expand Down
19 changes: 14 additions & 5 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ var _ commoncap.TriggerCapability = &triggerSubscriber{}
var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON,
dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -88,22 +92,25 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return s.capInfo, nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
rawRequest, err := pb.MarshalCapabilityRequest(request)
if err != nil {
return err
return nil, err
}
if request.Metadata.WorkflowID == "" {
return errors.New("empty workflowID")
return nil, errors.New("empty workflowID")
}
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
return callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand Down Expand Up @@ -141,6 +148,8 @@ func (s *triggerSubscriber) registrationLoop() {
func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) error {
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
triggerEventCallbackCh := make(chan commoncap.CapabilityResponse, 2)
require.NoError(t, subscriber.RegisterTrigger(ctx, triggerEventCallbackCh, commoncap.CapabilityRequest{

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
}))
})
require.NoError(t, err)
<-awaitRegistrationMessageCh

// receive trigger event
Expand Down
26 changes: 15 additions & 11 deletions core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

var forwardABI = evmtypes.MustGetABI(forwarder.KeystoneForwarderMetaData.ABI)

func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, lggr logger.Logger) error {
func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer,
lggr logger.Logger) error {
for _, chain := range legacyEVMChains.Slice() {
capability := NewEvmWrite(chain, lggr)
if err := registry.Add(context.TODO(), capability); err != nil {
Expand Down Expand Up @@ -157,7 +158,7 @@ func encodePayload(args []any, rawSelector string) ([]byte, error) {
// return append(method.ID, arguments...), nil
}

func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, request capabilities.CapabilityRequest) error {
func (cap *EvmWrite) Execute(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cap.lggr.Debugw("Execute", "request", request)
// TODO: idempotency

Expand All @@ -168,22 +169,23 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C

reqConfig, err := parseConfig(request.Config)
if err != nil {
return err
return nil, err
}

inputsAny, err := request.Inputs.Unwrap()
if err != nil {
return err
return nil, err
}
inputs := inputsAny.(map[string]any)
rep, ok := inputs["report"]
if !ok {
return errors.New("malformed data: inputs doesn't contain a report key")
return nil, errors.New("malformed data: inputs doesn't contain a report key")
}

if rep == nil {
// We received any empty report -- this means we should skip transmission.
cap.lggr.Debugw("Skipping empty report", "request", request)
callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -192,18 +194,18 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

// evaluate any variables in reqConfig.Params
args, err := evaluateParams(reqConfig.Params, inputs)
if err != nil {
return err
return nil, err
}

data, err := encodePayload(args, reqConfig.ABI)
if err != nil {
return err
return nil, err
}

// TODO: validate encoded report is prefixed with workflowID and executionID that match the request meta
Expand All @@ -214,7 +216,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
// construct forwarding payload
calldata, err := forwardABI.Pack("report", common.HexToAddress(reqConfig.Address), data, signatures)
if err != nil {
return err
return nil, err
}

txMeta := &txmgr.TxMeta{
Expand All @@ -238,9 +240,11 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
tx, err := txm.CreateTransaction(ctx, req)
if err != nil {
return err
return nil, err
}
cap.lggr.Debugw("Transaction submitted", "request", request, "transaction", tx)

callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -249,7 +253,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

func (cap *EvmWrite) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down
8 changes: 2 additions & 6 deletions core/capabilities/targets/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func TestEvmWrite(t *testing.T) {

})

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down Expand Up @@ -134,9 +132,7 @@ func TestEvmWrite_EmptyReport(t *testing.T) {
Inputs: inputs,
}

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down
9 changes: 8 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro
Config: tc,
Inputs: triggerInputs,
}
err = t.trigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest)
eventsCh, err := t.trigger.RegisterTrigger(ctx, triggerRegRequest)
if err != nil {
return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err)
}

go func() {
for event := range eventsCh {
e.triggerEvents <- event
}
}()

return nil
}

Expand Down
18 changes: 11 additions & 7 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,18 @@ func newMockCapability(info capabilities.CapabilityInfo, transform func(capabili
}
}

func (m *mockCapability) Execute(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cr, err := m.transform(req)
if err != nil {
return err
return nil, err
}

ch := make(chan capabilities.CapabilityResponse, 10)

m.response <- cr
ch <- cr
close(ch)
m.response <- cr
return nil
return ch, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -102,13 +104,14 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap
type mockTriggerCapability struct {
capabilities.CapabilityInfo
triggerEvent capabilities.CapabilityResponse
ch chan capabilities.CapabilityResponse
}

var _ capabilities.TriggerCapability = (*mockTriggerCapability)(nil)

func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
ch <- m.triggerEvent
return nil
func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
m.ch <- m.triggerEvent
return m.ch, nil
}

func (m *mockTriggerCapability) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error {
Expand Down Expand Up @@ -217,6 +220,7 @@ func mockTrigger(t *testing.T) (capabilities.TriggerCapability, capabilities.Cap
"issues a trigger when a mercury report is received.",
"v1.0.0",
),
ch: make(chan capabilities.CapabilityResponse, 10),
}
resp, err := values.NewMap(map[string]any{
"123": decimal.NewFromFloat(1.00),
Expand Down

0 comments on commit 4530c71

Please sign in to comment.