Skip to content

Commit

Permalink
Log Event Trigger Capability (#14308)
Browse files Browse the repository at this point in the history
* Log Event Trigger Capability

* Minor refactoring

* Moved main script to plugins/cmd

* Added initial implementation for UnregisterTrigger

* Create NewContractReader in RegisterTrigger flow of the trigger capability

* Refactoring to integrate with ChainReader QueryKey API

* Integrate with ChainReader QueryKey interface

* Minor changes

* Send cursor in QueryKey in subsequent calls

* Test utils for LOOP capability

* Happy path test for log event trigger capability

* Float64 fix in values

* Happy path integration test for Log Event Trigger Capability

* Fix code lint annotations

* Addressed PR comments

* Added changeset

* Addressed Lint errors

* Addressed PR comments

* Addressed more lint issues

* Simplified trigger ctx creation and cancel flows

* Added comment

* Addressed PR comments

* Implemented Start/Close pattern in logEventTrigger and used stopChan to track listener

* Addressed more PR comments

* Handled errors from Info and Close methods

* Fixed lint errors and pass ctx to Info

* Handle race conditions in log event trigger service

* Fixed lint errors

* Minor change

* Test fix and lint fixes

* Move EVM specific tests out of chain-agnostic capability

* Set block time

* Check existence of trigger in slow path

* Complete usage of services.Service with StartOnce and StopOnce with tests updated

* Wait for all goroutines to exit in test

* Cleanup logpoller and headtracker after test
  • Loading branch information
kidambisrinivas authored Sep 30, 2024
1 parent 02472a6 commit 3e9e058
Show file tree
Hide file tree
Showing 9 changed files with 950 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/chilly-crews-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added log-event-trigger LOOPP capability, using ChainReader
157 changes: 157 additions & 0 deletions core/capabilities/triggers/logevent/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package logevent

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

const ID = "log-event-trigger-%s-%[email protected]"

const defaultSendChannelBufferSize = 1000

// Log Event Trigger Capability Input
type Input struct {
}

// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type TriggerService struct {
services.StateMachine
capabilities.CapabilityInfo
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig Config
stopCh services.StopChan
}

// Common capability level config across all workflows
type Config struct {
ChainID string `json:"chainId"`
Network string `json:"network"`
LookbackBlocks uint64 `json:"lookbakBlocks"`
PollPeriod uint32 `json:"pollPeriod"`
}

func (config Config) Version(capabilityVersion string) string {
return fmt.Sprintf(capabilityVersion, config.Network, config.ChainID)
}

var _ capabilities.TriggerCapability = (*TriggerService)(nil)
var _ services.Service = &TriggerService{}

// Creates a new Cron Trigger Service.
// Scheduling will commence on calling .Start()
func NewTriggerService(ctx context.Context,
lggr logger.Logger,
relayer core.Relayer,
logEventConfig Config) (*TriggerService, error) {
l := logger.Named(lggr, "LogEventTriggerCapabilityService")

logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]()

s := &TriggerService{
lggr: l,
triggers: logEventStore,
relayer: relayer,
logEventConfig: logEventConfig,
stopCh: make(services.StopChan),
}
var err error
s.CapabilityInfo, err = s.Info(ctx)
if err != nil {
return s, err
}
s.Validator = capabilities.NewValidator[RequestConfig, Input, capabilities.TriggerResponse](capabilities.ValidatorArgs{Info: s.CapabilityInfo})
return s, nil
}

func (s *TriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilities.NewCapabilityInfo(
s.logEventConfig.Version(ID),
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (s *TriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
if req.Config == nil {
return nil, errors.New("config is required to register a log event trigger")
}
reqConfig, err := s.ValidateConfig(req.Config)
if err != nil {
return nil, err
}
// Add log event trigger with Contract details to CapabilitiesStore
var respCh chan capabilities.TriggerResponse
ok := s.IfNotStopped(func() {
respCh, err = s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
l, ch, tErr := newLogEventTrigger(ctx, s.lggr, req.Metadata.WorkflowID, reqConfig, s.logEventConfig, s.relayer)
if tErr != nil {
return l, ch, tErr
}
tErr = l.Start(ctx)
return l, ch, tErr
})
})
if !ok {
return nil, fmt.Errorf("cannot create new trigger since LogEventTriggerService has been stopped")
}
if err != nil {
return nil, fmt.Errorf("create new trigger failed %w", err)
}
s.lggr.Infow("RegisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID)
return respCh, nil
}

func (s *TriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
trigger, ok := s.triggers.Read(req.TriggerID)
if !ok {
return fmt.Errorf("triggerId %s not found", req.TriggerID)
}
// Close callback channel and stop log event trigger listener
err := trigger.Close()
if err != nil {
return fmt.Errorf("error closing trigger %s (chainID %s): %w", req.TriggerID, s.logEventConfig.ChainID, err)
}
// Remove from triggers context
s.triggers.Delete(req.TriggerID)
s.lggr.Infow("UnregisterTrigger", "triggerId", req.TriggerID, "WorkflowID", req.Metadata.WorkflowID)
return nil
}

// Start the service.
func (s *TriggerService) Start(ctx context.Context) error {
return s.StartOnce("LogEventTriggerCapabilityService", func() error {
s.lggr.Info("Starting LogEventTriggerCapabilityService")
return nil
})
}

// Close stops the Service.
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *TriggerService) Close() error {
return s.StopOnce("LogEventTriggerCapabilityService", func() error {
s.lggr.Infow("Stopping LogEventTriggerCapabilityService")
triggers := s.triggers.ReadAll()
return services.MultiCloser(triggers).Close()
})
}

func (s *TriggerService) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Healthy()}
}

func (s *TriggerService) Name() string {
return s.lggr.Name()
}
82 changes: 82 additions & 0 deletions core/capabilities/triggers/logevent/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package logevent

import (
"fmt"
"sync"
)

type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error)

// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
Read(capabilityID string) (value *T, ok bool)
ReadAll() (values []*T)
Write(capabilityID string, value *T)
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
}

// Implementation for the CapabilitiesStore interface
type capabilitiesStore[T any, Resp any] struct {
mu sync.RWMutex
capabilities map[string]*T
}

var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil)

// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] {
return &capabilitiesStore[T, Resp]{
capabilities: map[string]*T{},
}
}

func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) {
cs.mu.RLock()
defer cs.mu.RUnlock()
trigger, ok := cs.capabilities[capabilityID]
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values []*T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
vals := make([]*T, 0)
for _, v := range cs.capabilities {
vals = append(vals, v)
}
return vals
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.capabilities[capabilityID] = value
}

func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) {
cs.mu.RLock()
_, ok := cs.capabilities[capabilityID]
cs.mu.RUnlock()
if ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
cs.mu.Lock()
defer cs.mu.Unlock()
_, ok = cs.capabilities[capabilityID]
if ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
value, respCh, err := fn()
if err != nil {
return nil, fmt.Errorf("error registering capability: %v", err)
}
cs.capabilities[capabilityID] = value
return respCh, nil
}

func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) {
cs.mu.Lock()
defer cs.mu.Unlock()
delete(cs.capabilities, capabilityID)
}
Loading

0 comments on commit 3e9e058

Please sign in to comment.