Skip to content

Commit

Permalink
[HCP Observability] Metrics filtering and Labels in Go Metrics sink (#…
Browse files Browse the repository at this point in the history
…17184)

* Move hcp client to subpackage hcpclient (#16800)

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* [HCP Observability] OTELExporter (#17128)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Initialize OTELSink with sync.Map for all the instrument stores.

* Added telemetry agent to client and init sink in deps

* Fixed client

* Initalize sink in deps

* init sink in telemetry library

* Init deps before telemetry

* Use concrete telemetry.OtelSink type

* add /v1/metrics

* Avoid returning err for telemetry init

* move sink init within the IsCloudEnabled()

* Use HCPSinkOpts in deps instead

* update golden test for configuration file

* Switch to using extra sinks in the telemetry library

* keep name MetricsConfig

* fix log in verifyCCMRegistration

* Set logger in context

* pass around MetricSink in deps

* Fix imports

* Rebased onto otel sink pr

* Fix URL in test

* [HCP Observability] OTELSink (#17159)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Fix nits

* pass extraSinks as function param instead

* Add default interval as package export

* remove verifyCCM func

* Add clusterID

* Fix import and add t.Parallel() for missing tests

* Kick Vercel CI

* Remove scheme from endpoint path, and fix error logging

* return metrics.MetricSink for sink method

* Update SDK

* Added telemetry agent to client and init sink in deps

* Add node_id and __replica__ default labels

* add function for default labels and set x-hcp-resource-id

* Fix labels tests

* Commit suggestion for getDefaultLabels

Co-authored-by: Joshua Timmons <[email protected]>

* Fixed server.id, and t.Parallel()

* Make defaultLabels a method on the TelemetryConfig object

* Rename FilterList to lowercase filterList

* Cleanup filter implemetation by combining regex into a single one, and making the type lowercase

* Fix append

* use regex directly for filters

* Fix x-resource-id test to use mocked value

* Fix log.Error formats

* Forgot the len(opts.Label) optimization)

* Use cfg.NodeID instead

---------

Co-authored-by: Joshua Timmons <[email protected]>
  • Loading branch information
Achooo and jjti committed May 29, 2023
1 parent 68e6e21 commit bb6cfb3
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 32 deletions.
14 changes: 14 additions & 0 deletions agent/hcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,17 @@ func (t *TelemetryConfig) Enabled() (string, bool) {
// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}

// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string {
labels := map[string]string{
"__replica__": nodeID, // used for Cortex HA-metrics (deduplication)
"node_id": nodeID, // used to delineate Consul nodes in graphs
}

for k, v := range t.Labels {
labels[k] = v
}

return labels
}
8 changes: 8 additions & 0 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/resource"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
Expand All @@ -37,6 +38,7 @@ type MetricsClient interface {
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
Resource() (resource.Resource, error)
}

// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
Expand Down Expand Up @@ -64,8 +66,14 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}

r, err := cfg.Resource()
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}

header := make(http.Header)
header.Set("Content-Type", "application/x-protobuf")
header.Set("x-hcp-resource-id", r.String())

return &otlpClient{
client: c,
Expand Down
16 changes: 13 additions & 3 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -34,8 +35,17 @@ func TestNewMetricsClient(t *testing.T) {
},
"failsHCPConfig": {
wantErr: "failed to init telemetry client",
cfg: MockErrCloudCfg{},
ctx: context.Background(),
cfg: MockCloudCfg{
ConfigErr: fmt.Errorf("test bad hcp config"),
},
ctx: context.Background(),
},
"failsBadResource": {
wantErr: "failed to init telemetry client",
cfg: MockCloudCfg{
ResourceErr: fmt.Errorf("test bad resource"),
},
ctx: context.Background(),
},
} {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -68,7 +78,7 @@ func TestExportMetrics(t *testing.T) {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")

require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID)
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}
Expand Down
25 changes: 17 additions & 8 deletions agent/hcp/client/mock_CloudConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package client

import (
"crypto/tls"
"errors"
"net/url"

hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/profile"
"github.com/hashicorp/hcp-sdk-go/resource"
"golang.org/x/oauth2"
)

const testResourceID = "organization/test-org/project/test-project/test-type/test-id"

type mockHCPCfg struct{}

func (m *mockHCPCfg) Token() (*oauth2.Token, error) {
Expand All @@ -25,14 +27,21 @@ func (m *mockHCPCfg) APIAddress() string { return "" }
func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} }
func (m *mockHCPCfg) Profile() *profile.UserProfile { return nil }

type MockCloudCfg struct{}

func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return &mockHCPCfg{}, nil
type MockCloudCfg struct {
ConfigErr error
ResourceErr error
}

type MockErrCloudCfg struct{}
func (m MockCloudCfg) Resource() (resource.Resource, error) {
r := resource.Resource{
ID: "test-id",
Type: "test-type",
Organization: "test-org",
Project: "test-project",
}
return r, m.ResourceErr
}

func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return nil, errors.New("test bad HCP config")
func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return &mockHCPCfg{}, m.ConfigErr
}
5 changes: 5 additions & 0 deletions agent/hcp/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/tls"

hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/resource"
)

// CloudConfig defines configuration for connecting to HCP services
Expand All @@ -30,6 +31,10 @@ func (c *CloudConfig) WithTLSConfig(cfg *tls.Config) {
c.TLSConfig = cfg
}

func (c *CloudConfig) Resource() (resource.Resource, error) {
return resource.FromString(c.ResourceID)
}

func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
if c.TLSConfig == nil {
c.TLSConfig = &tls.Config{}
Expand Down
13 changes: 8 additions & 5 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
)

Expand All @@ -23,7 +24,7 @@ type Deps struct {
Sink metrics.MetricSink
}

func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) {
d.Client, err = hcpclient.NewClient(cfg)
if err != nil {
return
Expand All @@ -34,15 +35,15 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
return
}

d.Sink = sink(d.Client, &cfg, logger)
d.Sink = sink(d.Client, &cfg, logger, nodeID)

return
}

// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, so errors are logged, but not returned.
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) metrics.MetricSink {
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger, nodeID types.NodeID) metrics.MetricSink {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)

Expand Down Expand Up @@ -73,8 +74,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
}

sinkOpts := &telemetry.OTELSinkOpts{
Ctx: ctx,
Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval),
Ctx: ctx,
Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval),
Labels: telemetryCfg.DefaultLabels(string(nodeID)),
Filters: telemetryCfg.MetricsConfig.Filters,
}

sink, err := telemetry.NewOTELSink(sinkOpts)
Expand Down
7 changes: 5 additions & 2 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/types"
)

func TestSink(t *testing.T) {
Expand Down Expand Up @@ -48,6 +49,9 @@ func TestSink(t *testing.T) {
mockCloudCfg: client.MockCloudCfg{},
},
"noSinkWhenMetricsClientInitFails": {
mockCloudCfg: client.MockCloudCfg{
ConfigErr: fmt.Errorf("test bad hcp config"),
},
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "https://test.com",
Expand All @@ -56,7 +60,6 @@ func TestSink(t *testing.T) {
},
}, nil)
},
mockCloudCfg: client.MockErrCloudCfg{},
},
"failsWithFetchTelemetryFailure": {
expect: func(mockClient *client.MockClient) {
Expand Down Expand Up @@ -92,7 +95,7 @@ func TestSink(t *testing.T) {
c := client.NewMockClient(t)
l := hclog.NewNullLogger()
test.expect(c)
sinkOpts := sink(c, test.mockCloudCfg, l)
sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234"))
if !test.expectedSink {
require.Nil(t, sinkOpts)
return
Expand Down
37 changes: 37 additions & 0 deletions agent/hcp/telemetry/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package telemetry

import (
"fmt"
"regexp"
"strings"

"github.com/hashicorp/go-multierror"
)

// newFilterRegex returns a valid regex used to filter metrics.
// It will fail if there are 0 valid regex filters given.
func newFilterRegex(filters []string) (*regexp.Regexp, error) {
var mErr error
validFilters := make([]string, 0, len(filters))
for _, filter := range filters {
_, err := regexp.Compile(filter)
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err))
continue
}
validFilters = append(validFilters, filter)
}

if len(validFilters) == 0 {
return nil, multierror.Append(mErr, fmt.Errorf("no valid filters"))
}

// Combine the valid regex strings with an OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex: %w", err)
}

return composedRegex, nil
}
58 changes: 58 additions & 0 deletions agent/hcp/telemetry/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package telemetry

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFilter(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
filters []string
expectedRegexString string
matches []string
wantErr string
wantMatch bool
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
wantErr: "no valid filters",
},
"failsWithNoRegex": {
filters: []string{},
wantErr: "no valid filters",
},
"matchFound": {
filters: []string{"raft.*", "mem.*"},
expectedRegexString: "raft.*|mem.*",
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.*"},
matches: []string{"consul.raft.peers", "consul.txn.apply"},
expectedRegexString: "mem.*",
wantMatch: false,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
f, err := newFilterRegex(tc.filters)

if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
return
}

require.NoError(t, err)
require.Equal(t, tc.expectedRegexString, f.String())
for _, metric := range tc.matches {
m := f.MatchString(metric)
require.Equal(t, tc.wantMatch, m)
}
})
}
}
Loading

0 comments on commit bb6cfb3

Please sign in to comment.