Skip to content

Commit

Permalink
Log Event Trigger Capability Development: Part 3 (#14637)
Browse files Browse the repository at this point in the history
* Use default mode aggregator for all non-streams triggers

* F+1 idential responses

* Test for remote trigger subscriber shim

* Updated comments

* Interface needed for the test
  • Loading branch information
kidambisrinivas authored Oct 9, 2024
1 parent c654322 commit 3738ee4
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 21 deletions.
42 changes: 21 additions & 21 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"

"github.com/smartcontractkit/libocr/ragep2p"
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand Down Expand Up @@ -253,24 +252,25 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
if !strings.HasPrefix(info.ID, "streams-trigger") {
return nil, errors.New("not supported: trigger capability does not have id = streams-trigger")
}

codec := streams.NewCodec(w.lggr)

signers, err := signersFor(remoteDON, state)
if err != nil {
return nil, err
var aggregator remotetypes.Aggregator
if strings.HasPrefix(info.ID, "streams-trigger") {
codec := streams.NewCodec(w.lggr)

signers, err := signersFor(remoteDON, state)
if err != nil {
return nil, err
}

aggregator = triggers.NewMercuryRemoteAggregator(
codec,
signers,
int(remoteDON.F+1),
w.lggr,
)
} else {
aggregator = remote.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
}

aggregator := triggers.NewMercuryRemoteAggregator(
codec,
signers,
int(remoteDON.F+1),
w.lggr,
)

// TODO: We need to implement a custom, Mercury-specific
// aggregator here, because there is no guarantee that
// all trigger events in the workflow will have the same
Expand Down
232 changes: 232 additions & 0 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ import (
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
Expand Down Expand Up @@ -184,6 +188,234 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
defer launcher.Close()
}

func newTriggerEventMsg(t *testing.T,
senderPeerID types.PeerID,
workflowID string,
triggerEvent map[string]any,
triggerEventID string) (*remotetypes.MessageBody, *values.Map) {
triggerEventValue, err := values.NewMap(triggerEvent)
require.NoError(t, err)
capResponse := capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
Outputs: triggerEventValue,
ID: triggerEventID,
},
Err: nil,
}
marshaled, err := pb.MarshalTriggerResponse(capResponse)
require.NoError(t, err)
return &remotetypes.MessageBody{
Sender: senderPeerID[:],
Method: remotetypes.MethodTriggerEvent,
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{
WorkflowIds: []string{workflowID},
},
},
Payload: marshaled,
}, triggerEventValue
}

func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)
registry := NewRegistry(lggr)
dispatcher := remoteMocks.NewDispatcher(t)

var pid ragetypes.PeerID
err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N"))
require.NoError(t, err)
peer := mocks.NewPeer(t)
peer.On("UpdateConnections", mock.Anything).Return(nil)
peer.On("ID").Return(pid)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)

workflowDonNodes := []ragetypes.PeerID{
pid,
randomWord(),
randomWord(),
randomWord(),
}

capabilityDonNodes := []ragetypes.PeerID{
randomWord(),
randomWord(),
randomWord(),
randomWord(),
}

fullTriggerCapID := "[email protected]"
fullTargetID := "[email protected]"
triggerCapID := randomWord()
targetCapID := randomWord()
dID := uint32(1)
capDonID := uint32(2)
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the log-event-trigger and write_chain capabilities.
// We expect receivers to be wired up and both capabilities to be added to the registry.
rtc := &capabilities.RemoteTriggerConfig{}
rtc.ApplyDefaults()

cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{
RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{
RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{
RegistrationRefresh: durationpb.New(1 * time.Second),
MinResponsesToAggregate: 3,
},
},
})
require.NoError(t, err)

state := &registrysyncer.LocalRegistry{
IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{
registrysyncer.DonID(dID): {
DON: capabilities.DON{
ID: dID,
ConfigVersion: uint32(0),
F: uint8(1),
IsPublic: true,
AcceptsWorkflows: true,
Members: workflowDonNodes,
},
},
registrysyncer.DonID(capDonID): {
DON: capabilities.DON{
ID: capDonID,
ConfigVersion: uint32(0),
F: uint8(1),
IsPublic: true,
AcceptsWorkflows: false,
Members: capabilityDonNodes,
},
CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{
fullTriggerCapID: {
Config: cfg,
},
fullTargetID: {
Config: cfg,
},
},
},
},
IDsToCapabilities: map[string]registrysyncer.Capability{
fullTriggerCapID: {
ID: fullTriggerCapID,
CapabilityType: capabilities.CapabilityTypeTrigger,
},
fullTargetID: {
ID: fullTargetID,
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{
capabilityDonNodes[0]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: capabilityDonNodes[0],
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
},
capabilityDonNodes[1]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: capabilityDonNodes[1],
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
},
capabilityDonNodes[2]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: capabilityDonNodes[2],
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
},
capabilityDonNodes[3]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: capabilityDonNodes[3],
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
},
workflowDonNodes[0]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: workflowDonNodes[0],
},
workflowDonNodes[1]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: workflowDonNodes[1],
},
workflowDonNodes[2]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: workflowDonNodes[2],
},
workflowDonNodes[3]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: workflowDonNodes[3],
},
},
}

launcher := NewLauncher(
lggr,
wrapper,
dispatcher,
registry,
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)
awaitRegistrationMessageCh := make(chan struct{})
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
select {
case awaitRegistrationMessageCh <- struct{}{}:
default:
}
})

err = launcher.Launch(ctx, state)
require.NoError(t, err)
defer launcher.Close()

baseCapability, err := registry.Get(ctx, fullTriggerCapID)
require.NoError(t, err)

remoteTriggerSubscriber, ok := baseCapability.(remote.TriggerSubscriber)
require.True(t, ok, "remote trigger capability")

// Register trigger
workflowID1 := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 := "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
req := capabilities.TriggerRegistrationRequest{
TriggerID: "logeventtrigger_log1",
Metadata: capabilities.RequestMetadata{
ReferenceID: "logeventtrigger",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
}
triggerEventCallbackCh, err := remoteTriggerSubscriber.RegisterTrigger(ctx, req)
require.NoError(t, err)
<-awaitRegistrationMessageCh

// Receive trigger event
triggerEvent1 := map[string]any{"event": "triggerEvent1"}
triggerEvent2 := map[string]any{"event": "triggerEvent2"}
triggerEventMsg1, triggerEventValue := newTriggerEventMsg(t, capabilityDonNodes[0], workflowID1, triggerEvent1, "TriggerEventID1")
triggerEventMsg2, _ := newTriggerEventMsg(t, capabilityDonNodes[1], workflowID1, triggerEvent1, "TriggerEventID1")
// One Faulty Node (F = 1) sending bad event data for the same TriggerEventID1
triggerEventMsg3, _ := newTriggerEventMsg(t, capabilityDonNodes[2], workflowID1, triggerEvent2, "TriggerEventID1")
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg1)
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg2)
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg3)

// After MinResponsesToAggregate, we should get a response
response := <-triggerEventCallbackCh

// Checks if response is same as minIdenticalResponses = F + 1, F = 1
require.Equal(t, response.Event.Outputs, triggerEventValue)
}

func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)
Expand Down
5 changes: 5 additions & 0 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type subRegState struct {
rawRequest []byte
}

type TriggerSubscriber interface {
commoncap.TriggerCapability
Receive(ctx context.Context, msg *types.MessageBody)
}

var _ commoncap.TriggerCapability = &triggerSubscriber{}
var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}
Expand Down

0 comments on commit 3738ee4

Please sign in to comment.