Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HCP Observability] New MetricsClient #17100

Merged
merged 9 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
)

const (
// HTTP Client config
defaultStreamTimeout = 15 * time.Second

// Retry config
// TODO: Evenutally, we'd like to configure these values dynamically.
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 15 * time.Second
defaultRetryMax = 4
Achooo marked this conversation as resolved.
Show resolved Hide resolved
)

// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}

// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
}

// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
// It also holds default HTTP headers to add to export requests.
type otlpClient struct {
client *retryablehttp.Client
header *http.Header
}

// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) {
if cfg == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
}

if ctx == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context")
}

logger := hclog.FromContext(ctx)

c, err := newHTTPClient(cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}

header := make(http.Header)
header.Set("Content-Type", "application/x-protobuf")

return &otlpClient{
client: c,
header: &header,
}, nil
}

// newHTTPClient configures the retryable HTTP client.
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
hcpCfg, err := cloudCfg.HCPConfig()
if err != nil {
return nil, err
}

tlsTransport := cleanhttp.DefaultPooledTransport()
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig()

var transport http.RoundTripper = &oauth2.Transport{
Base: tlsTransport,
Source: hcpCfg,
}

client := &http.Client{
Transport: transport,
Timeout: defaultStreamTimeout,
}

retryClient := &retryablehttp.Client{
HTTPClient: client,
Logger: logger.Named("hcp_telemetry_client"),
RetryWaitMin: defaultRetryWaitMin,
Achooo marked this conversation as resolved.
Show resolved Hide resolved
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}

return retryClient, nil
}

// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}

body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}
req.Header = *o.header

resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}
defer resp.Body.Close()

var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
Achooo marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to export metrics: %v", err)
}

if respData.Len() != 0 {
var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to figure out what to do about this on the CTGW side as well. Will get back to you about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I am going to never return this but good to keep this here if we decide to implement partial success.

msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
}

return nil
}
107 changes: 107 additions & 0 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package client

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
)

func TestNewMetricsClient(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
cfg CloudConfig
ctx context.Context
}{
"success": {
cfg: &MockCloudCfg{},
ctx: context.Background(),
},
"failsWithoutCloudCfg": {
wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)",
cfg: nil,
ctx: context.Background(),
},
"failsWithoutContext": {
wantErr: "failed to init telemetry client: provide a valid context",
cfg: MockCloudCfg{},
ctx: nil,
},
"failsHCPConfig": {
wantErr: "failed to init telemetry client",
cfg: MockErrCloudCfg{},
ctx: context.Background(),
},
} {
t.Run(name, func(t *testing.T) {
client, err := NewMetricsClient(test.cfg, test.ctx)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.Nil(t, err)
require.NotNil(t, client)
})
}
}

func TestExportMetrics(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
status int
}{
"success": {
status: http.StatusOK,
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will partial success always return badrequest?

Copy link
Contributor Author

@Achooo Achooo Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily actually, I just added it to test out that part of my code which handles the partial success (without kicking off the retry logic to avoid timing out my tests)

wantErr: "failed to export metrics",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")

require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(test.status)
w.Write(bytes)
}))
defer srv.Close()

client, err := NewMetricsClient(MockCloudCfg{}, context.Background())
require.NoError(t, err)

ctx := context.Background()
metrics := &metricpb.ResourceMetrics{}
err = client.ExportMetrics(ctx, metrics, srv.URL)

if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.NoError(t, err)
})
}
}
40 changes: 40 additions & 0 deletions agent/hcp/client/mock_CloudConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"crypto/tls"
"errors"
"net/url"

hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"golang.org/x/oauth2"
)

type mockHCPCfg struct{}

func (m *mockHCPCfg) Token() (*oauth2.Token, error) {
return &oauth2.Token{
AccessToken: "test-token",
}, nil
}

func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil }

func (m *mockHCPCfg) SCADAAddress() string { return "" }

func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} }

func (m *mockHCPCfg) APIAddress() string { return "" }

func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} }

type MockCloudCfg struct{}

func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return &mockHCPCfg{}, nil
}

type MockErrCloudCfg struct{}

func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return nil, errors.New("test bad HCP config")
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-raftchunking v0.7.0
github.com/hashicorp/go-retryablehttp v0.6.7
github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0
Expand Down Expand Up @@ -95,6 +96,7 @@ require (
github.com/shirou/gopsutil/v3 v3.22.8
github.com/stretchr/testify v1.8.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/goleak v1.1.10
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/net v0.7.0
Expand Down Expand Up @@ -167,11 +169,11 @@ require (
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gophercloud/gophercloud v0.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect
github.com/hashicorp/go-plugin v1.4.5 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
Expand Down Expand Up @@ -225,7 +227,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
Expand Down
Loading