Skip to content

Commit

Permalink
add gateway handler and http client for outgoing messages (#14536)
Browse files Browse the repository at this point in the history
* implement HTTP target capability and connector handler

* self-review

* fix linter

* more linting fixes

* fix build

* regenerate mocks

* Update core/capabilities/webapi/target/connector_handler.go

Co-authored-by: Street <[email protected]>

* address feedback

* add gateway handler for node messages and http client for outgoing messages. pending rate limiting and unit tests.

* implement rate limiter and add unit test http client

* add http client mock and unit tests

* fix failing tests

* add tag to changeset

* fix linting issue

* fix failing race

* fix linting

---------

Co-authored-by: Street <[email protected]>
  • Loading branch information
jinhoonbang and MStreet3 authored Oct 1, 2024
1 parent 41443fa commit 4b97702
Show file tree
Hide file tree
Showing 17 changed files with 679 additions and 80 deletions.
5 changes: 5 additions & 0 deletions .changeset/empty-bees-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#wip implement gateway handler that forwards outgoing request from http target capability. introduce gateway http client
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ packages:
HttpServer:
HTTPRequestHandler:
WebSocketServer:
HTTPClient:
github.com/smartcontractkit/chainlink/v2/core/services/job:
interfaces:
ServiceCtx:
Expand Down
8 changes: 4 additions & 4 deletions core/capabilities/webapi/target/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func gatewayResponse(t *testing.T, msgID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(webapicapabilities.TargetResponsePayload{
StatusCode: 200,
Headers: headers,
Body: body,
Success: true,
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
})
require.NoError(t, err)
return &api.Message{
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/webapi/target/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type WorkflowConfig struct {
DeliveryMode string `json:"deliveryMode,omitempty"` // DeliveryMode describes how request should be delivered to gateway nodes, defaults to SingleNode.
}

// CapabilityConfigConfig is the configuration for the Target capability and handler
// Config is the configuration for the Target capability and handler
// TODO: handle retry configurations here CM-472
// Note that workflow executions have their own internal timeouts and retries set by the user
// that are separate from this configuration
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/gateway/run_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {

lggr, _ := logger.NewLogger()

handlerFactory := gateway.NewHandlerFactory(nil, nil, lggr)
handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr)
gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr)
if err != nil {
fmt.Println("error creating Gateway object:", err)
Expand Down
4 changes: 3 additions & 1 deletion core/services/gateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ type GatewayConfig struct {
UserServerConfig gw_net.HTTPServerConfig
NodeServerConfig gw_net.WebSocketServerConfig
ConnectionManagerConfig ConnectionManagerConfig
Dons []DONConfig
// HTTPClientConfig is configuration for outbound HTTP calls to external endpoints
HTTPClientConfig gw_net.HTTPClientConfig
Dons []DONConfig
}

type ConnectionManagerConfig struct {
Expand Down
7 changes: 6 additions & 1 deletion core/services/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
)
Expand Down Expand Up @@ -54,7 +55,11 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
if err2 != nil {
return nil, errors.Wrap(err2, "unmarshal gateway config")
}
handlerFactory := NewHandlerFactory(d.legacyChains, d.ds, d.lggr)
httpClient, err := network.NewHTTPClient(gatewayConfig.HTTPClientConfig, d.lggr)
if err != nil {
return nil, err
}
handlerFactory := NewHandlerFactory(d.legacyChains, d.ds, httpClient, d.lggr)
gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions core/services/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.NoError(t, err)
}

Expand All @@ -75,7 +75,7 @@ HandlerName = "dummy"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -89,7 +89,7 @@ HandlerName = "no_such_handler"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -103,7 +103,7 @@ SomeOtherField = "abcd"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

Expand All @@ -121,15 +121,15 @@ Address = "0xnot_an_address"
`)

lggr := logger.TestLogger(t)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
_, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.Error(t, err)
}

func TestGateway_CleanStartAndClose(t *testing.T) {
t.Parallel()

lggr := logger.TestLogger(t)
gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, lggr), lggr)
gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr)
require.NoError(t, err)
servicetest.Run(t, gateway)
}
Expand Down
9 changes: 6 additions & 3 deletions core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network"
)

const (
Expand All @@ -23,26 +24,28 @@ type handlerFactory struct {
legacyChains legacyevm.LegacyChainContainer
ds sqlutil.DataSource
lggr logger.Logger
httpClient network.HTTPClient
}

var _ HandlerFactory = (*handlerFactory)(nil)

func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, lggr logger.Logger) HandlerFactory {
func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, httpClient network.HTTPClient, lggr logger.Logger) HandlerFactory {
return &handlerFactory{
legacyChains,
ds,
lggr,
httpClient,
}
}

func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) {
switch handlerType {
case FunctionsHandlerType:
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr)
case WebAPICapabilitiesType:
return webapicapabilities.NewWorkflowHandler(handlerConfig, donConfig, don, hf.lggr)
case DummyHandlerType:
return handlers.NewDummyHandler(donConfig, don, hf.lggr)
case WebAPICapabilitiesType:
return webapicapabilities.NewHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr)
default:
return nil, fmt.Errorf("unsupported handler type %s", handlerType)
}
Expand Down
3 changes: 2 additions & 1 deletion core/services/gateway/handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type Handler interface {
// 2. waits on callbackCh with a timeout
HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- UserCallbackPayload) error

// Handlers should not make any assumptions about goroutines calling HandleNodeMessage
// Handlers should not make any assumptions about goroutines calling HandleNodeMessage.
// should be non-blocking
HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error
}

Expand Down
Loading

0 comments on commit 4b97702

Please sign in to comment.