Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement HTTP target capability and connector handler #14491

Merged
merged 12 commits into from
Sep 26, 2024
5 changes: 5 additions & 0 deletions .changeset/six-frogs-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added HTTP target capability and gateway connector handler
123 changes: 123 additions & 0 deletions core/capabilities/webapi/target/connector_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package target

import (
"context"
"encoding/json"
"sort"
"sync"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

var _ connector.GatewayConnectorHandler = &ConnectorHandler{}

type ConnectorHandler struct {
gc connector.GatewayConnector
lggr logger.Logger
responseChs map[string]chan *api.Message
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
responseChsMu sync.Mutex
rateLimiter *common.RateLimiter
}

func NewConnectorHandler(gc connector.GatewayConnector, config Config, lgger logger.Logger) (*ConnectorHandler, error) {
rateLimiter, err := common.NewRateLimiter(config.RateLimiter)
if err != nil {
return nil, err
}
responseChs := make(map[string]chan *api.Message)
return &ConnectorHandler{
gc: gc,
responseChs: responseChs,
responseChsMu: sync.Mutex{},
rateLimiter: rateLimiter,
lggr: lgger,
}, nil
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries and timeouts
func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) {
ch := make(chan *api.Message, 1)
c.responseChsMu.Lock()
c.responseChs[messageID] = ch
c.responseChsMu.Unlock()
l := logger.With(c.lggr, "messageID", messageID)
l.Debugw("sending request to gateway")

body := &api.MessageBody{
MessageId: messageID,
DonId: c.gc.DonID(),
Method: webapicapabilities.MethodWebAPITarget,
Payload: payload,
}

// simply, send request to first available gateway node from sorted list
// this allows for deterministic selection of gateway node receiver for easier debugging
gatewayIDs := c.gc.GatewayIDs()
if len(gatewayIDs) == 0 {
return nil, errors.New("no gateway nodes available")
}
sort.Strings(gatewayIDs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinhoonbang 🤔 Doesn't this also mean we'll always go to the same gateway assuming the same list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I had a discussion with @bolekk on this and decided to keep it simple here. Random gateway nodes will be harder to debug so reusing the same gateway node is preferred


err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

select {
case resp := <-ch:
MStreet3 marked this conversation as resolved.
Show resolved Hide resolved
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId)
if !c.rateLimiter.Allow(body.Sender) {
// error is logged here instead of warning because if a message from gateway is rate-limited,
// the workflow will eventually fail with timeout as there are no retries in place yet
c.lggr.Errorw("request rate-limited")
return
}
l.Debugw("handling gateway request")
switch body.Method {
case webapicapabilities.MethodWebAPITarget:
var payload webapicapabilities.TargetResponsePayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
l.Errorw("failed to unmarshal payload", "err", err)
return
}
c.responseChsMu.Lock()
defer c.responseChsMu.Unlock()
ch, ok := c.responseChs[body.MessageId]
if !ok {
l.Errorw("no response channel found")
return
}
select {
case ch <- msg:
delete(c.responseChs, body.MessageId)
case <-ctx.Done():
return
}
default:
l.Errorw("unsupported method")
}
}

func (c *ConnectorHandler) Start(ctx context.Context) error {
return c.gc.AddHandler([]string{webapicapabilities.MethodWebAPITarget}, c)
}

func (c *ConnectorHandler) Close() error {
return nil
}
152 changes: 152 additions & 0 deletions core/capabilities/webapi/target/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package target

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
)

const ID = "[email protected]"

var _ capabilities.TargetCapability = &Capability{}

var capabilityInfo = capabilities.MustNewCapabilityInfo(
ID,
capabilities.CapabilityTypeTarget,
"A target that sends HTTP requests to external clients via the Chainlink Gateway.",
)

// Capability is a target capability that sends HTTP requests to external clients via the Chainlink Gateway.
type Capability struct {
capabilityInfo capabilities.CapabilityInfo
connectorHandler *ConnectorHandler
lggr logger.Logger
registry core.CapabilitiesRegistry
config Config
}

func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) {
return &Capability{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only fix if doing other fixes, but how about validating some of the fields? Says connectorHandler is nil?

capabilityInfo: capabilityInfo,
config: config,
registry: registry,
connectorHandler: connectorHandler,
lggr: lggr,
}, nil
}

func (c *Capability) Start(ctx context.Context) error {
return c.registry.Add(ctx, c)
}

func (c *Capability) Close() error {
return nil
}

func (c *Capability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilityInfo, nil
}

func getMessageID(req capabilities.CapabilityRequest) (string, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return "", fmt.Errorf("workflow ID is invalid: %w", err)
}
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}
messageID := []string{
req.Metadata.WorkflowID,
req.Metadata.WorkflowExecutionID,
webapicapabilities.MethodWebAPITarget,
}
return strings.Join(messageID, "/"), nil
}

func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c.lggr.Debugw("executing http target", "capabilityRequest", req)

var input Input
err := req.Inputs.UnwrapTo(&input)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

var workflowCfg WorkflowConfig
err = req.Config.UnwrapTo(&workflowCfg)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

messageID, err := getMessageID(req)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

payload := webapicapabilities.TargetRequestPayload{
URL: input.URL,
Method: input.Method,
Headers: input.Headers,
Body: input.Body,
TimeoutMs: workflowCfg.TimeoutMs,
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// Default to SingleNode delivery mode
deliveryMode := SingleNode
if workflowCfg.DeliveryMode != "" {
deliveryMode = workflowCfg.DeliveryMode
}

switch deliveryMode {
case SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
c.lggr.Debugw("received gateway response", "resp", resp)
var payload webapicapabilities.TargetResponsePayload
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// TODO: check target response format and fields CM-473
values, err := values.NewMap(map[string]any{
"statusCode": payload.StatusCode,
"headers": payload.Headers,
"body": payload.Body,
})
if err != nil {
return capabilities.CapabilityResponse{}, err
}
return capabilities.CapabilityResponse{
Value: values,
}, nil
default:
return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported delivery mode: %v", workflowCfg.DeliveryMode)
}
}

func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error {
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
// Workflow engine guarantees registration requests are valid
// TODO: handle retry configuration CM-472
return nil
}

func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error {
// Workflow engine guarantees deregistration requests are valid
return nil
}
Loading
Loading