Skip to content

Commit

Permalink
feat(relayminer): add proxy.Ping(...) capability to test connectivi…
Browse files Browse the repository at this point in the history
…ty between relay servers and backend URLs (#1)

* relayer: add RelayServers() method to RelayProxy interface; Add Ping(), ServiceIDs(), Forward() method to RelayServer interface; add RelayServers slice with helper method byServiceID

* relayer: add forward config entry

* relayer: implement ServiceIDs, Forward, and Ping method for synchrounous RPC server

* relayer: add RelayServers implementation for RelayProxy

* relayer: add Ping and Forward options

* relayer: integrate ping option

* relayer: add ServePing and ServeForward method to RelayMiner

* test proxy.Ping() in test + remove forward feature

* add serve ping test

* add doc
  • Loading branch information
eddyzags committed Aug 31, 2024
1 parent 1fdde42 commit 3fadb58
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 1 deletion.
16 changes: 16 additions & 0 deletions docusaurus/docs/operate/configs/relayminer_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ You can find a fully featured example configuration at [relayminer_config_full_e
- [`smt_store_path`](#smt_store_path)
- [`metrics`](#metrics)
- [`pprof`](#pprof)
- [`ping`](#ping)
- [Pocket node connectivity](#pocket-node-connectivity)
- [`query_node_rpc_url`](#query_node_rpc_url)
- [`query_node_grpc_url`](#query_node_grpc_url)
Expand Down Expand Up @@ -168,6 +169,21 @@ pprof:

You can learn how to use that endpoint on the [Performance Troubleshooting](../../develop/developer_guide/performance_troubleshooting.md) page.

### `ping`

Configures a `ping` server to test the connectivity of every backend URLs. If
all the backend URLs are reachable, the endpoint returns a 200 HTTP
Code. Otherwise, if one or more backend URLs aren't reachable, the service
returns an 500 HTTP Internal server error.

Example configuration:

```yaml
ping:
enabled: true
addr: localhost:8081
```

## Pocket node connectivity

```yaml
Expand Down
3 changes: 3 additions & 0 deletions localnet/poktrolld/config/relayminer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ suppliers:
pprof:
enabled: false
addr: localhost:6060
ping:
enabled: false
addr: localhost:8082
6 changes: 6 additions & 0 deletions localnet/poktrolld/config/relayminer_config_full_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ pprof:
enabled: false
addr: localhost:6060

# Ping server configuration to test the connectivity of every
# suppliers.[].service_config.backend_url
ping:
enabled: false
addr: localhost:8081

pocket_node:
# Pocket node URL exposing the CometBFT JSON-RPC API.
# Used by the Cosmos client SDK, event subscriptions, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ suppliers:
pprof:
enabled: false
addr: localhost:6070
ping:
enabled: false
addr: localhost:8081
12 changes: 12 additions & 0 deletions pkg/relayer/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -134,6 +135,17 @@ func runRelayer(cmd *cobra.Command, _ []string) error {
}
}

if relayMinerConfig.Ping.Enabled {
ln, err := net.Listen("tcp", relayMinerConfig.Ping.Addr)
if err != nil {
return fmt.Errorf("failed to listen ping server: %w", err)
}

if err := relayMiner.ServePing(ctx, ln); err != nil {
return fmt.Errorf("failed to start ping server: %w", err)
}
}

// Start the relay miner
logger.Info().Msg("Starting relay miner...")
if err := relayMiner.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/relayer/config/relayminer_configs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func ParseRelayMinerConfigs(configContent []byte) (*RelayMinerConfig, error) {
Addr: yamlRelayMinerConfig.Pprof.Addr,
}

relayMinerConfig.Ping = &RelayMinerPingConfig{
Enabled: yamlRelayMinerConfig.Ping.Enabled,
Addr: yamlRelayMinerConfig.Ping.Addr,
}

// Hydrate the pocket node urls
if err := relayMinerConfig.HydratePocketNodeUrls(&yamlRelayMinerConfig.PocketNode); err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions pkg/relayer/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ type YAMLRelayMinerConfig struct {
Pprof YAMLRelayMinerPprofConfig `yaml:"pprof"`
SmtStorePath string `yaml:"smt_store_path"`
Suppliers []YAMLRelayMinerSupplierConfig `yaml:"suppliers"`
Ping YAMLRelayMinerPingConfig `yaml:"ping"`
}

type YAMLRelayMinerPingConfig struct {
Enabled bool `yaml:"enabled"`
Addr string `yaml:"addr"`
}

// YAMLRelayMinerPocketNodeConfig is the structure used to unmarshal the pocket
Expand Down Expand Up @@ -83,6 +89,12 @@ type RelayMinerConfig struct {
Pprof *RelayMinerPprofConfig
Servers map[string]*RelayMinerServerConfig
SmtStorePath string
Ping *RelayMinerPingConfig
}

type RelayMinerPingConfig struct {
Enabled bool
Addr string
}

// RelayMinerPocketNodeConfig is the structure resulting from parsing the pocket
Expand Down
9 changes: 9 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type RelayerProxy interface {
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for signing relay responses.
SignRelayResponse(relayResponse *servicetypes.RelayResponse, supplierOperatorAddr string) error

// Ping tests the connectivity between all the managed relay servers and their respective backend URLs.
Ping(ctx context.Context) []error
}

type RelayerProxyOption func(RelayerProxy)
Expand All @@ -84,8 +87,14 @@ type RelayServer interface {

// Stop terminates the service server and returns an error if it fails.
Stop(ctx context.Context) error

// Ping tests the connection between the relay server and its backend URL.
Ping(ctx context.Context) error
}

// RelayServers aggregates a slice of RelayServer interface.
type RelayServers []RelayServer

// RelayerSessionsManager is responsible for managing the relayer's session lifecycles.
// It handles the creation and retrieval of SMSTs (trees) for a given session, as
// well as the respective and subsequent claim creation and proof submission.
Expand Down
28 changes: 27 additions & 1 deletion pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,18 @@ func (rp *relayerProxy) Start(ctx context.Context) error {
if err := rp.BuildProvidedServices(ctx); err != nil {
return err
}

// Start the ring cache.
rp.ringCache.Start(ctx)

startGroup, ctx := errgroup.WithContext(ctx)

for _, relayServer := range rp.servers {
server := relayServer // create a new variable scoped to the anonymous function

if err := server.Ping(ctx); err != nil {
return err
}

startGroup.Go(func() error { return server.Start(ctx) })
}

Expand Down Expand Up @@ -187,3 +191,25 @@ func (rp *relayerProxy) validateConfig() error {

return nil
}

// Ping tests the connectivity between all the managed relay servers and their respective backend URLs.
func (rp *relayerProxy) Ping(ctx context.Context) []error {
errs := make([]error, len(rp.servers))

var i int
for _, srv := range rp.servers {
if err := srv.Ping(ctx); err != nil {
rp.logger.Error().Err(err).
Msg("an unexpected error occured while pinging backend URL")
errs[i] = err
}

i++
}

if len(errs) > 0 {
return errs
}

return nil
}
5 changes: 5 additions & 0 deletions pkg/relayer/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func TestRelayerProxy_StartAndStop(t *testing.T) {
// Block so relayerProxy has sufficient time to start
time.Sleep(100 * time.Millisecond)

errs := rp.Ping(ctx)
for _, err := range errs {
require.NoError(t, err)
}

// Test that RelayerProxy is handling requests (ignoring the actual response content)
res, err := http.DefaultClient.Get(fmt.Sprintf("http://%s/", servicesConfigMap[defaultRelayMinerServer].ListenAddress))
require.NoError(t, err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/relayer/proxy/synchronous.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -92,6 +93,26 @@ func (sync *synchronousRPCServer) Stop(ctx context.Context) error {
return sync.server.Shutdown(ctx)
}

// Ping tries to dial the suppliers backend URLs to test the connection.
func (sync *synchronousRPCServer) Ping(ctx context.Context) error {
for _, supplierCfg := range sync.serverConfig.SupplierConfigsMap {
c := &http.Client{Timeout: 2 * time.Second}

resp, err := c.Head(supplierCfg.ServiceConfig.BackendUrl.String())
if err != nil {
return err
}
_ = resp.Body.Close()

if resp.StatusCode >= http.StatusInternalServerError {
return errors.New("ping failed")
}

}

return nil
}

// ServeHTTP listens for incoming relay requests. It implements the respective
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when synchronousRPCServer is used as an http.Handler with an http.Server.
Expand Down
19 changes: 19 additions & 0 deletions pkg/relayer/relayminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,22 @@ func (rel *relayMiner) ServePprof(ctx context.Context, addr string) error {

return nil
}

func (rel *relayMiner) ServePing(ctx context.Context, ln net.Listener) error {
go func() {
if err := http.Serve(ln, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
rel.logger.Debug().Msg("pinging relay servers...")

if errs := rel.relayerProxy.Ping(ctx); errs != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
})); err != nil {
return
}
}()

return nil
}
69 changes: 69 additions & 0 deletions pkg/relayer/relayminer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package relayer_test

import (
"context"
"net"
"net/http"
"os"
"testing"
"time"

Expand Down Expand Up @@ -57,3 +60,69 @@ func TestRelayMiner_StartAndStop(t *testing.T) {
err = relayminer.Stop(ctx)
require.NoError(t, err)
}

func TestRelayMiner_Ping(t *testing.T) {
srObs, _ := channel.NewObservable[*servicetypes.Relay]()
servedRelaysObs := relayer.RelaysObservable(srObs)

mrObs, _ := channel.NewObservable[*relayer.MinedRelay]()
minedRelaysObs := relayer.MinedRelaysObservable(mrObs)

ctx := polyzero.NewLogger().WithContext(context.Background())
relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxyWithPing(
ctx, t,
servedRelaysObs,
)

minerMock := testrelayer.NewMockOneTimeMiner(
ctx, t,
servedRelaysObs,
minedRelaysObs,
)

relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager(
ctx, t,
minedRelaysObs,
)

deps := depinject.Supply(
relayerProxyMock,
minerMock,
relayerSessionsManagerMock,
)

relayminer, err := relayer.NewRelayMiner(ctx, deps)
require.NoError(t, err)
require.NotNil(t, relayminer)

err = relayminer.Start(ctx)
require.NoError(t, err)

time.Sleep(time.Millisecond)

filename := "/tmp/relayerminer.ping.sock"

ln, err := net.Listen("unix", filename)
require.NoError(t, err)
defer os.Remove(filename)

err = relayminer.ServePing(ctx, ln)
require.NoError(t, err)

time.Sleep(time.Millisecond)

c := http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial(ln.Addr().Network(), ln.Addr().String())
},
},
}
require.NoError(t, err)

_, err = c.Get("http://unix")
require.NoError(t, err)

err = relayminer.Stop(ctx)
require.NoError(t, err)
}
31 changes: 31 additions & 0 deletions testutil/testrelayer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,36 @@ func NewMockOneTimeRelayerProxy(
ServedRelays().
Return(returnedRelaysObs).
Times(1)

return relayerProxyMock
}

// NewMockOneTimeRelayerProxyWithPing creates a new mock RelayerProxy. This mock
// RelayerProxy will expect a call to ServedRelays with the given context, and
// when that call is made, returnedRelaysObs is returned. It also expects a call
// to Start, Ping, and Stop with the given context.
func NewMockOneTimeRelayerProxyWithPing(
ctx context.Context,
t *testing.T,
returnedRelaysObs relayer.RelaysObservable,
) *mockrelayer.MockRelayerProxy {
t.Helper()

ctrl := gomock.NewController(t)
relayerProxyMock := mockrelayer.NewMockRelayerProxy(ctrl)
relayerProxyMock.EXPECT().
Start(gomock.Eq(ctx)).
Times(1)
relayerProxyMock.EXPECT().
Stop(gomock.Eq(ctx)).
Times(1)
relayerProxyMock.EXPECT().
ServedRelays().
Return(returnedRelaysObs).
Times(1)
relayerProxyMock.EXPECT().
Ping(gomock.Eq(ctx)).
Times(1)

return relayerProxyMock
}

0 comments on commit 3fadb58

Please sign in to comment.