Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinhoonbang committed Sep 25, 2024
1 parent 06f468d commit 741327c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 45 deletions.
63 changes: 23 additions & 40 deletions core/capabilities/webapi/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -27,24 +26,20 @@ var capabilityInfo = capabilities.MustNewCapabilityInfo(

// Capability is a target capability that sends HTTP requests to external clients via the Chainlink Gateway.
type Capability struct {
capabilityInfo capabilities.CapabilityInfo
connectorHandler *ConnectorHandler
lggr logger.Logger
registry core.CapabilitiesRegistry
config Config
registeredWorkflows map[string]struct{}
registeredWorkflowsMu sync.RWMutex
capabilityInfo capabilities.CapabilityInfo
connectorHandler *ConnectorHandler
lggr logger.Logger
registry core.CapabilitiesRegistry
config Config
}

func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) {
return &Capability{
capabilityInfo: capabilityInfo,
config: config,
registry: registry,
connectorHandler: connectorHandler,
registeredWorkflows: make(map[string]struct{}),
registeredWorkflowsMu: sync.RWMutex{},
lggr: lggr,
capabilityInfo: capabilityInfo,
config: config,
registry: registry,
connectorHandler: connectorHandler,
lggr: lggr,
}, nil
}

Expand Down Expand Up @@ -95,18 +90,11 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
return capabilities.CapabilityResponse{}, err
}

c.registeredWorkflowsMu.RLock()
if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok {
c.registeredWorkflowsMu.RUnlock()
return capabilities.CapabilityResponse{}, fmt.Errorf("workflow is not registered: %v", req.Metadata.WorkflowID)
}
c.registeredWorkflowsMu.RUnlock()

payload := webapicapabilities.TargetRequestPayload{
URL: input.URL,
Method: input.Method,
Headers: input.Headers,
Body: []byte(input.Body),
Body: input.Body,
TimeoutMs: workflowCfg.TimeoutMs,
}

Expand All @@ -115,7 +103,13 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
return capabilities.CapabilityResponse{}, err
}

switch workflowCfg.Schedule {
// Default to SingleNode delivery mode
deliveryMode := SingleNode
if workflowCfg.DeliveryMode != "" {
deliveryMode = workflowCfg.DeliveryMode
}

switch deliveryMode {
case SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
Expand All @@ -133,7 +127,7 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
values, err := values.NewMap(map[string]any{
"statusCode": payload.StatusCode,
"headers": payload.Headers,
"body": string(payload.Body),
"body": payload.Body,
})
if err != nil {
return capabilities.CapabilityResponse{}, err
Expand All @@ -142,28 +136,17 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
Value: values,
}, nil
default:
return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported schedule: %v", workflowCfg.Schedule)
return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported schedule: %v", workflowCfg.DeliveryMode)
}
}

func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return fmt.Errorf("workflow ID is invalid: %w", err)
}
c.registeredWorkflowsMu.Lock()
defer c.registeredWorkflowsMu.Unlock()
c.registeredWorkflows[req.Metadata.WorkflowID] = struct{}{}
// Workflow engine guarantees registration requests are valid
// TODO: handle retry configuration
return nil
}

func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error {
// if workflow is not found for some reason, just log a warning
c.registeredWorkflowsMu.Lock()
defer c.registeredWorkflowsMu.Unlock()
if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok {
c.lggr.Warnw("workflow not found", "workflowID", req.Metadata.WorkflowID)
} else {
delete(c.registeredWorkflows, req.Metadata.WorkflowID)
}
// Workflow engine guarantees deregistration requests are valid
return nil
}
11 changes: 6 additions & 5 deletions core/capabilities/webapi/target/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@ package target
import "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"

const (
AllAtOnce string = "AllAtOnce"
SingleNode string = "SingleNode"
// TODO: AllAtOnce is not yet implemented
AllAtOnce string = "AllAtOnce"
)

type Input struct {
URL string `json:"url"` // URL to query, only http and https protocols are supported.
Method string `json:"method,omitempty"` // HTTP verb, defaults to GET.
Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty.
Body string `json:"body,omitempty"` // Base64-encoded binary body, defaults to empty.
Body []byte `json:"body,omitempty"` // HTTP body, defaults to empty.
}

// WorkflowConfig is the configuration of the workflow that is passed in the workflow execute request
type WorkflowConfig struct {
TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds
RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0.
Schedule string `json:"schedule,omitempty"` // schedule, defaults to empty.
TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds
RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0.
DeliveryMode string `json:"deliveryMode,omitempty"` // DeliveryMode describes how request should be delivered to gateway nodes, defaults to SingleNode.
}

// CapabilityConfigConfig is the configuration for the Target capability and handler
Expand Down

0 comments on commit 741327c

Please sign in to comment.