diff --git a/.chloggen/splunkhecexporter_httpclientsettings.yaml b/.chloggen/splunkhecexporter_httpclientsettings.yaml new file mode 100644 index 000000000000..e835babc5ce5 --- /dev/null +++ b/.chloggen/splunkhecexporter_httpclientsettings.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Expose HTTPClientSettings on splunkhecexporter + +# One or more tracking issues related to the change +issues: [16838] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/exporter/splunkhecexporter/README.md b/exporter/splunkhecexporter/README.md index e27eaa8e6128..980927f367f8 100644 --- a/exporter/splunkhecexporter/README.md +++ b/exporter/splunkhecexporter/README.md @@ -20,7 +20,7 @@ The following configuration options can also be configured: - `source` (no default): Optional Splunk source: https://docs.splunk.com/Splexicon:Source - `sourcetype` (no default): Optional Splunk source type: https://docs.splunk.com/Splexicon:Sourcetype - `index` (no default): Splunk index, optional name of the Splunk index targeted -- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data. +- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data. Deprecated: use `max_idle_conns` or `max_idle_conns_per_host` instead. See [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md) for more info. - `disable_compression` (default: false): Whether to disable gzip compression over HTTP. - `timeout` (default: 10s): HTTP timeout when sending data. - `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS. @@ -92,7 +92,7 @@ exporters: # Splunk index, optional name of the Splunk index targeted. index: "metrics" # Maximum HTTP connections to use simultaneously when sending data. Defaults to 100. - max_connections: 200 + max_idle_conns: 200 # Whether to disable gzip compression over HTTP. Defaults to false. disable_compression: false # HTTP timeout when sending data. Defaults to 10s. @@ -118,5 +118,13 @@ with detailed sample configurations [here](testdata/config.yaml). This exporter also offers proxy support as documented [here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter#proxy-support). +## Advanced Configuration + +Several helper files are leveraged to provide additional capabilities automatically: + +- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md) +- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) +- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) + [beta]:https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 0a8d6f5a37c0..f961b53c31cd 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -18,6 +18,8 @@ import ( "bytes" "context" "fmt" + "net/http" + "net/url" "sync" jsoniter "github.com/json-iterator/go" @@ -34,10 +36,12 @@ import ( // client sends the data to the splunk backend. type client struct { - config *Config - logger *zap.Logger - wg sync.WaitGroup - hecWorker hecWorker + config *Config + logger *zap.Logger + wg sync.WaitGroup + telemetrySettings component.TelemetrySettings + hecWorker hecWorker + buildInfo component.BuildInfo } func (c *client) pushMetricsData( @@ -615,6 +619,73 @@ func (c *client) stop(context.Context) error { return nil } -func (c *client) start(context.Context, component.Host) (err error) { +func (c *client) start(ctx context.Context, host component.Host) (err error) { + + httpClient, err := buildHTTPClient(c.config, host, c.telemetrySettings) + if err != nil { + return err + } + + if c.config.HecHealthCheckEnabled { + healthCheckURL, _ := c.config.getURL() + healthCheckURL.Path = c.config.HealthPath + if err := checkHecHealth(httpClient, healthCheckURL); err != nil { + return fmt.Errorf("health check failed: %w", err) + } + } + url, _ := c.config.getURL() + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)} return nil } + +func checkHecHealth(client *http.Client, healthCheckURL *url.URL) error { + + req, err := http.NewRequest("GET", healthCheckURL.String(), nil) + if err != nil { + return consumererror.NewPermanent(err) + } + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + err = splunk.HandleHTTPCode(resp) + if err != nil { + return err + } + + return nil +} + +func buildHTTPClient(config *Config, host component.Host, telemetrySettings component.TelemetrySettings) (*http.Client, error) { + // we handle compression explicitly. + config.HTTPClientSettings.Compression = "" + if config.MaxConnections != 0 && (config.MaxIdleConns == nil || config.HTTPClientSettings.MaxIdleConnsPerHost == nil) { + telemetrySettings.Logger.Warn("You are using the deprecated `max_connections` option that will be removed soon; use `max_idle_conns` and/or `max_idle_conns_per_host` instead: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/splunkhecexporter#advanced-configuration") + intMaxConns := int(config.MaxConnections) + if config.HTTPClientSettings.MaxIdleConns == nil { + config.HTTPClientSettings.MaxIdleConns = &intMaxConns + } + if config.HTTPClientSettings.MaxIdleConnsPerHost == nil { + config.HTTPClientSettings.MaxIdleConnsPerHost = &intMaxConns + } + } + return config.ToClient(host, telemetrySettings) +} + +func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]string { + appVersion := config.SplunkAppVersion + if appVersion == "" { + appVersion = buildInfo.Version + } + return map[string]string{ + "Connection": "keep-alive", + "Content-Type": "application/json", + "User-Agent": config.SplunkAppName + "/" + appVersion, + "Authorization": splunk.HECTokenHeader + " " + string(config.Token), + "__splunk_app_name": config.SplunkAppName, + "__splunk_app_version": config.SplunkAppVersion, + } +} diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 406a4f58000e..2fe8316c1de9 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -34,6 +34,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter/exportertest" @@ -208,7 +209,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i } factory := NewFactory() - cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" rr := make(chan receivedRequest) @@ -258,7 +259,7 @@ func runTraceExport(testConfig *Config, traces ptrace.Traces, expectedBatchesNum factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.DisableCompression = testConfig.DisableCompression cfg.MaxContentLengthTraces = testConfig.MaxContentLengthTraces cfg.Token = "1234-1234" @@ -319,7 +320,7 @@ func runLogExport(cfg *Config, ld plog.Logs, expectedBatchesNum int, t *testing. panic(err) } - cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" rr := make(chan receivedRequest) @@ -881,7 +882,7 @@ func TestErrorReceived(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" // Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces // otherwise we will not see the error. cfg.QueueSettings.Enabled = false @@ -930,7 +931,7 @@ func TestInvalidURL(t *testing.T) { cfg.QueueSettings.Enabled = false // Disable retries to not wait too much time for the return error. cfg.RetrySettings.Enabled = false - cfg.Endpoint = "ftp://example.com:134" + cfg.HTTPClientSettings.Endpoint = "ftp://example.com:134" cfg.Token = "1234-1234" params := exportertest.NewNopCreateSettings() exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) @@ -957,12 +958,6 @@ func TestInvalidJson(t *testing.T) { assert.Error(t, err) } -func TestStartAlwaysReturnsNil(t *testing.T) { - c := client{} - err := c.start(context.Background(), componenttest.NewNopHost()) - assert.NoError(t, err) -} - func Test_pushLogData_nil_Logs(t *testing.T) { tests := []struct { name func(bool) string @@ -1095,7 +1090,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) { // An HTTP client that returns status code 400 and response body responseBody. httpClient, _ := newTestClient(400, responseBody) - splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)} + splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())} // Sending logs using the client. err := splunkClient.pushLogData(context.Background(), logs) // TODO: Uncomment after consumererror.Logs implements method Unwrap. @@ -1106,7 +1101,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) { // An HTTP client that returns some other status code other than 400 and response body responseBody. httpClient, _ = newTestClient(500, responseBody) - splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)} + splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())} // Sending logs using the client. err = splunkClient.pushLogData(context.Background(), logs) // TODO: Uncomment after consumererror.Logs implements method Unwrap. @@ -1132,7 +1127,7 @@ func Test_pushLogData_ShouldReturnUnsentLogsOnly(t *testing.T) { // The first record is to be sent successfully, the second one should not httpClient, _ := newTestClientWithPresetResponses([]int{200, 400}, []string{"OK", "NOK"}) - c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())} err := c.pushLogData(context.Background(), logs) require.Error(t, err) @@ -1157,7 +1152,7 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) { var headers *[]http.Header httpClient, headers := newTestClient(200, "OK") - c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())} // A 300-byte buffer only fits one record (around 200 bytes), so each record will be sent separately c.config.MaxContentLengthLogs, c.config.DisableCompression = 300, true @@ -1223,7 +1218,7 @@ func benchPushLogData(b *testing.B, numResources int, numProfiling int, numNonPr } httpClient, _ := newTestClient(200, "OK") - c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())} c.config.MaxContentLengthLogs = bufSize logs := createLogDataWithCustomLibraries(numResources, []string{"otel.logs", "otel.profiling"}, []int{numNonProfiling, numProfiling}) @@ -1241,7 +1236,7 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) { c := client{ config: config, logger: zaptest.NewLogger(t), - hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config)}, + hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}, } c.config.MaxContentLengthLogs = 1 diff --git a/exporter/splunkhecexporter/config.go b/exporter/splunkhecexporter/config.go index cf87ddb54823..96ed9d7f874b 100644 --- a/exporter/splunkhecexporter/config.go +++ b/exporter/splunkhecexporter/config.go @@ -20,8 +20,8 @@ import ( "net/url" "path" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" @@ -48,9 +48,9 @@ type OtelToHecFields struct { // Config defines configuration for Splunk exporter. type Config struct { - exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - exporterhelper.QueueSettings `mapstructure:"sending_queue"` - exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` + confighttp.HTTPClientSettings `mapstructure:",squash"` + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` // LogDataEnabled can be used to disable sending logs by the exporter. LogDataEnabled bool `mapstructure:"log_data_enabled"` @@ -61,9 +61,6 @@ type Config struct { // HEC Token is the authentication token provided by Splunk: https://docs.splunk.com/Documentation/Splunk/latest/Data/UsetheHTTPEventCollector. Token configopaque.String `mapstructure:"token"` - // URL is the Splunk HEC endpoint where data is going to be sent to. - Endpoint string `mapstructure:"endpoint"` - // Optional Splunk source: https://docs.splunk.com/Splexicon:Source. // Sources identify the incoming data. Source string `mapstructure:"source"` @@ -75,6 +72,7 @@ type Config struct { Index string `mapstructure:"index"` // MaxConnections is used to set a limit to the maximum idle HTTP connection the exporter can keep open. Defaults to 100. + // Deprecated: use HTTPClientSettings.MaxIdleConns or HTTPClientSettings.MaxIdleConnsPerHost instead. MaxConnections uint `mapstructure:"max_connections"` // Disable GZip compression. Defaults to false. @@ -92,9 +90,6 @@ type Config struct { // Maximum allowed value is 838860800 (~ 800 MB). MaxContentLengthTraces uint `mapstructure:"max_content_length_traces"` - // TLSSetting struct exposes TLS client configuration. - TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` - // App name is used to track telemetry information for Splunk App's using HEC by App name. Defaults to "OpenTelemetry Collector Contrib". SplunkAppName string `mapstructure:"splunk_app_name"` @@ -112,27 +107,31 @@ type Config struct { HecHealthCheckEnabled bool `mapstructure:"health_check_enabled"` } -func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) { - if err := cfg.validateConfig(); err != nil { - return nil, err - } +func (cfg *Config) getURL() (out *url.URL, err error) { - url, err := cfg.getURL() + out, err = url.Parse(cfg.HTTPClientSettings.Endpoint) if err != nil { - return nil, fmt.Errorf(`invalid "endpoint": %w`, err) + return out, err + } + if out.Path == "" || out.Path == "/" { + out.Path = path.Join(out.Path, hecPath) } - return &exporterOptions{ - url: url, - token: cfg.Token, - }, nil + return } -func (cfg *Config) validateConfig() error { - if cfg.Endpoint == "" { +// Validate checks if the exporter configuration is valid. +func (cfg *Config) Validate() error { + if !cfg.LogDataEnabled && !cfg.ProfilingDataEnabled { + return errors.New(`either "log_data_enabled" or "profiling_data_enabled" has to be true`) + } + if cfg.HTTPClientSettings.Endpoint == "" { return errors.New(`requires a non-empty "endpoint"`) } - + _, err := cfg.getURL() + if err != nil { + return fmt.Errorf(`invalid "endpoint": %w`, err) + } if cfg.Token == "" { return errors.New(`requires a non-empty "token"`) } @@ -146,32 +145,10 @@ func (cfg *Config) validateConfig() error { } if cfg.MaxContentLengthTraces > maxContentLengthTracesLimit { - return fmt.Errorf(`requires "max_content_length_traces <= #{maxContentLengthTracesLimit}`) - } - - return nil -} - -func (cfg *Config) getURL() (out *url.URL, err error) { - - out, err = url.Parse(cfg.Endpoint) - if err != nil { - return out, err - } - if out.Path == "" || out.Path == "/" { - out.Path = path.Join(out.Path, hecPath) + return fmt.Errorf(`requires "max_content_length_traces" <= %d`, maxContentLengthTracesLimit) } - - return -} - -// Validate checks if the exporter configuration is valid. -func (cfg *Config) Validate() error { if err := cfg.QueueSettings.Validate(); err != nil { return fmt.Errorf("sending_queue settings has invalid configuration: %w", err) } - if !cfg.LogDataEnabled && !cfg.ProfilingDataEnabled { - return errors.New(`either "log_data_enabled" or "profiling_data_enabled" has to be true`) - } return nil } diff --git a/exporter/splunkhecexporter/config_test.go b/exporter/splunkhecexporter/config_test.go index c2b2084ec40a..9509152e7df3 100644 --- a/exporter/splunkhecexporter/config_test.go +++ b/exporter/splunkhecexporter/config_test.go @@ -15,7 +15,6 @@ package splunkhecexporter import ( - "net/url" "path/filepath" "testing" "time" @@ -23,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -40,7 +39,9 @@ func TestLoadConfig(t *testing.T) { // Endpoint and Token do not have a default value so set them directly. defaultCfg := createDefaultConfig().(*Config) defaultCfg.Token = "00000000-0000-0000-0000-0000000000000" - defaultCfg.Endpoint = "https://splunk:8088/services/collector" + defaultCfg.HTTPClientSettings.Endpoint = "https://splunk:8088/services/collector" + + hundred := 100 tests := []struct { id component.ID @@ -54,7 +55,6 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(typeStr, "allsettings"), expected: &Config{ Token: "00000000-0000-0000-0000-0000000000000", - Endpoint: "https://splunk:8088/services/collector", Source: "otel", SourceType: "otel", Index: "metrics", @@ -62,12 +62,22 @@ func TestLoadConfig(t *testing.T) { SplunkAppVersion: "v0.0.1", LogDataEnabled: true, ProfilingDataEnabled: true, - MaxConnections: 100, MaxContentLengthLogs: 2 * 1024 * 1024, MaxContentLengthMetrics: 2 * 1024 * 1024, MaxContentLengthTraces: 2 * 1024 * 1024, - TimeoutSettings: exporterhelper.TimeoutSettings{ - Timeout: 10 * time.Second, + HTTPClientSettings: confighttp.HTTPClientSettings{ + Timeout: 10 * time.Second, + Endpoint: "https://splunk:8088/services/collector", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "", + CertFile: "", + KeyFile: "", + }, + InsecureSkipVerify: false, + }, + MaxIdleConns: &hundred, + MaxIdleConnsPerHost: &hundred, }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, @@ -80,14 +90,6 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - TLSSetting: configtls.TLSClientSetting{ - TLSSetting: configtls.TLSSetting{ - CAFile: "", - CertFile: "", - KeyFile: "", - }, - InsecureSkipVerify: false, - }, HecToOtelAttrs: splunk.HecToOtelAttrs{ Source: "mysource", SourceType: "mysourcetype", @@ -119,109 +121,79 @@ func TestLoadConfig(t *testing.T) { } } -func TestConfig_getOptionsFromConfig(t *testing.T) { - type fields struct { - Endpoint string - Token configopaque.String - Source string - SourceType string - Index string - MaxContentLengthLogs uint - MaxContentLengthMetrics uint - MaxContentLengthTraces uint - } +func TestConfig_Validate(t *testing.T) { tests := []struct { name string - fields fields - want *exporterOptions - wantErr bool + cfg *Config + wantErr string }{ { - name: "Test missing url", - fields: fields{ - Token: "1234", - }, - want: nil, - wantErr: true, + name: "default", + cfg: createDefaultConfig().(*Config), + wantErr: "requires a non-empty \"endpoint\"", }, { - name: "Test missing token", - fields: fields{ - Endpoint: "https://example.com:8000", - }, - want: nil, - wantErr: true, + name: "bad url", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.HTTPClientSettings.Endpoint = "cache_object:foo/bar" + cfg.Token = "foo" + return cfg + }(), + wantErr: "invalid \"endpoint\": parse \"cache_object:foo/bar\": first path segment in URL cannot contain colon", }, { - name: "Test incomplete URL", - fields: fields{ - Token: "1234", - Endpoint: "https://example.com:8000", - }, - want: &exporterOptions{ - token: "1234", - url: &url.URL{ - Scheme: "https", - Host: "example.com:8000", - Path: "services/collector", - }, - }, - wantErr: false, - }, - { - name: "Test empty config", - want: nil, - wantErr: true, + name: "missing token", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.HTTPClientSettings.Endpoint = "http://example.com" + return cfg + }(), + wantErr: "requires a non-empty \"token\"", }, { - name: "Test max content length logs greater than limit", - fields: fields{ - Token: "1234", - Endpoint: "https://example.com:8000", - MaxContentLengthLogs: maxContentLengthLogsLimit + 1, - }, - want: nil, - wantErr: true, + name: "max default content-length for logs", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.HTTPClientSettings.Endpoint = "http://foo_bar.com" + cfg.MaxContentLengthLogs = maxContentLengthLogsLimit + 1 + cfg.Token = "foo" + return cfg + }(), + wantErr: "requires \"max_content_length_logs\" <= 838860800", }, { - name: "Test max content length metrics greater than limit", - fields: fields{ - Token: "1234", - Endpoint: "https://example.com:8000", - MaxContentLengthMetrics: maxContentLengthMetricsLimit + 1, - }, - want: nil, - wantErr: true, + name: "max default content-length for metrics", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.HTTPClientSettings.Endpoint = "http://foo_bar.com" + cfg.MaxContentLengthMetrics = maxContentLengthMetricsLimit + 1 + cfg.Token = "foo" + return cfg + }(), + wantErr: "requires \"max_content_length_metrics\" <= 838860800", }, { - name: "Test max content length traces greater than limit", - fields: fields{ - Token: "1234", - Endpoint: "https://example.com:8000", - MaxContentLengthTraces: maxContentLengthTracesLimit + 1, - }, - want: nil, - wantErr: true, + name: "max default content-length for traces", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.HTTPClientSettings.Endpoint = "http://foo_bar.com" + cfg.MaxContentLengthTraces = maxContentLengthTracesLimit + 1 + cfg.Token = "foo" + return cfg + }(), + wantErr: "requires \"max_content_length_traces\" <= 838860800", }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := &Config{ - Token: tt.fields.Token, - Endpoint: tt.fields.Endpoint, - Source: tt.fields.Source, - SourceType: tt.fields.SourceType, - Index: tt.fields.Index, - MaxContentLengthLogs: tt.fields.MaxContentLengthLogs, - MaxContentLengthMetrics: tt.fields.MaxContentLengthMetrics, - MaxContentLengthTraces: tt.fields.MaxContentLengthTraces, - } - got, err := cfg.getOptionsFromConfig() - if (err != nil) != tt.wantErr { - t.Errorf("getOptionsFromConfig() error = %v, wantErr %v", err, tt.wantErr) - return + err := tt.cfg.Validate() + if tt.wantErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr) } - require.EqualValues(t, tt.want, got) }) } } diff --git a/exporter/splunkhecexporter/exporter.go b/exporter/splunkhecexporter/exporter.go deleted file mode 100644 index 2edb6a02de47..000000000000 --- a/exporter/splunkhecexporter/exporter.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2020, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" - -import ( - "context" - "errors" - "fmt" - "net" - "net/http" - "net/url" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" -) - -const ( - idleConnTimeout = 30 * time.Second - tlsHandshakeTimeout = 10 * time.Second - dialerTimeout = 30 * time.Second - dialerKeepAlive = 30 * time.Second - defaultSplunkAppName = "OpenTelemetry Collector Contrib" -) - -type splunkExporter struct { - pushMetricsData func(ctx context.Context, md pmetric.Metrics) error - pushTraceData func(ctx context.Context, td ptrace.Traces) error - pushLogData func(ctx context.Context, td plog.Logs) error - stop func(ctx context.Context) (err error) - start func(ctx context.Context, host component.Host) (err error) -} - -type exporterOptions struct { - url *url.URL - token configopaque.String -} - -// createExporter returns a new Splunk exporter. -func createExporter( - config *Config, - logger *zap.Logger, - buildinfo *component.BuildInfo, -) (*splunkExporter, error) { - if config == nil { - return nil, errors.New("nil config") - } - - if config.SplunkAppName == "" { - config.SplunkAppName = defaultSplunkAppName - } - - if config.SplunkAppVersion == "" { - config.SplunkAppVersion = buildinfo.Version - } - - options, err := config.getOptionsFromConfig() - if err != nil { - return nil, err - } - - httpClient, err := buildHTTPClient(config) - if err != nil { - return nil, err - } - - client := buildClient(options, config, httpClient, logger) - - if config.HecHealthCheckEnabled { - healthCheckURL := options.url - healthCheckURL.Path = config.HealthPath - if err := checkHecHealth(httpClient, healthCheckURL); err != nil { - return nil, fmt.Errorf("health check failed: %w", err) - } - } - - return &splunkExporter{ - pushMetricsData: client.pushMetricsData, - pushTraceData: client.pushTraceData, - pushLogData: client.pushLogData, - stop: client.stop, - start: client.start, - }, nil -} - -func checkHecHealth(client *http.Client, healthCheckURL *url.URL) error { - - req, err := http.NewRequest("GET", healthCheckURL.String(), nil) - if err != nil { - return consumererror.NewPermanent(err) - } - - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - err = splunk.HandleHTTPCode(resp) - if err != nil { - return err - } - - return nil -} - -func buildClient(options *exporterOptions, config *Config, httpClient *http.Client, logger *zap.Logger) *client { - return &client{ - logger: logger, - config: config, - hecWorker: &defaultHecWorker{options.url, httpClient, buildHTTPHeaders(config)}, - } -} - -func buildHTTPClient(config *Config) (*http.Client, error) { - tlsCfg, err := config.TLSSetting.LoadTLSConfig() - if err != nil { - return nil, fmt.Errorf("could not retrieve TLS config for Splunk HEC Exporter: %w", err) - } - return &http.Client{ - Timeout: config.Timeout, - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: dialerTimeout, - KeepAlive: dialerKeepAlive, - }).DialContext, - MaxIdleConns: int(config.MaxConnections), - MaxIdleConnsPerHost: int(config.MaxConnections), - IdleConnTimeout: idleConnTimeout, - TLSHandshakeTimeout: tlsHandshakeTimeout, - TLSClientConfig: tlsCfg, - }}, nil -} -func buildHTTPHeaders(config *Config) map[string]string { - return map[string]string{ - "Connection": "keep-alive", - "Content-Type": "application/json", - "User-Agent": config.SplunkAppName + "/" + config.SplunkAppVersion, - "Authorization": splunk.HECTokenHeader + " " + string(config.Token), - "__splunk_app_name": config.SplunkAppName, - "__splunk_app_version": config.SplunkAppVersion, - } -} diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go deleted file mode 100644 index 074990e95deb..000000000000 --- a/exporter/splunkhecexporter/exporter_test.go +++ /dev/null @@ -1,444 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package splunkhecexporter - -import ( - "bytes" - "compress/gzip" - "context" - "encoding/json" - "fmt" - "io" - "net" - "net/http" - "net/http/httptest" - "net/url" - "strconv" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - conventions "go.opentelemetry.io/collector/semconv/v1.9.0" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" -) - -func TestNew(t *testing.T) { - buildInfo := component.NewDefaultBuildInfo() - got, err := createExporter(nil, zap.NewNop(), &buildInfo) - assert.EqualError(t, err, "nil config") - assert.Nil(t, got) - - config := &Config{ - Token: "someToken", - Endpoint: "https://example.com:8088", - TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second}, - } - got, err = createExporter(config, zap.NewNop(), &buildInfo) - assert.NoError(t, err) - require.NotNil(t, got) - - config = &Config{ - Token: "someToken", - Endpoint: "https://example.com:8088", - TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second}, - TLSSetting: configtls.TLSClientSetting{ - TLSSetting: configtls.TLSSetting{ - CAFile: "file-not-found", - CertFile: "file-not-found", - KeyFile: "file-not-found", - }, - InsecureSkipVerify: false, - }, - } - got, err = createExporter(config, zap.NewNop(), &buildInfo) - assert.Error(t, err) - require.Nil(t, got) -} - -func TestNewWithHealthCheckSuccess(t *testing.T) { - - rr := make(chan receivedRequest) - capture := CapturingData{receivedRequest: rr, statusCode: 200} - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - panic(err) - } - s := &http.Server{ - Handler: &capture, - } - defer s.Close() - go func() { - if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) - } - }() - - endpoint := "http://" + listener.Addr().String() + "/services/collector" - - config := &Config{ - Token: "someToken", - Endpoint: endpoint, - TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second}, - HecHealthCheckEnabled: true, - } - buildInfo := component.NewDefaultBuildInfo() - got, err := createExporter(config, zap.NewNop(), &buildInfo) - assert.NoError(t, err) - require.NotNil(t, got) - -} - -func TestNewWithHealthCheckFail(t *testing.T) { - - rr := make(chan receivedRequest) - capture := CapturingData{receivedRequest: rr, statusCode: 500} - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - panic(err) - } - s := &http.Server{ - Handler: &capture, - } - defer s.Close() - go func() { - if e := s.Serve(listener); e != http.ErrServerClosed { - require.NoError(t, e) - } - }() - - endpoint := "http://" + listener.Addr().String() + "/services/collector" - - config := &Config{ - Token: "someToken", - Endpoint: endpoint, - TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second}, - HecHealthCheckEnabled: true, - } - buildInfo := component.NewDefaultBuildInfo() - got, err := createExporter(config, zap.NewNop(), &buildInfo) - assert.Error(t, err) - require.Nil(t, got) - -} - -func TestConsumeMetricsData(t *testing.T) { - smallBatch := pmetric.NewMetrics() - smallBatch.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("com.splunk.source", "test_splunk") - m := smallBatch.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - m.SetName("test_gauge") - dp := m.SetEmptyGauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("k0", "v0") - dp.Attributes().PutStr("k1", "v1") - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - dp.SetDoubleValue(123) - tests := []struct { - name string - md pmetric.Metrics - reqTestFunc func(t *testing.T, r *http.Request) - httpResponseCode int - maxContentLength uint - wantErr bool - }{ - { - name: "happy_path", - md: smallBatch, - reqTestFunc: func(t *testing.T, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, "keep-alive", r.Header.Get("Connection")) - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - assert.Equal(t, "OpenTelemetry-Collector Splunk Exporter/v0.0.1", r.Header.Get("User-Agent")) - assert.Equal(t, "Splunk 1234", r.Header.Get("Authorization")) - if r.Header.Get("Content-Encoding") == "gzip" { - t.Fatal("Small batch should not be compressed") - } - firstPayload := strings.Split(string(body), "\n")[0] - var metric splunk.Event - err = json.Unmarshal([]byte(firstPayload), &metric) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, "test_splunk", metric.Source) - assert.Equal(t, "test_type", metric.SourceType) - assert.Equal(t, "test_index", metric.Index) - - }, - httpResponseCode: http.StatusAccepted, - }, - { - name: "response_forbidden", - md: smallBatch, - reqTestFunc: nil, - httpResponseCode: http.StatusForbidden, - wantErr: true, - }, - { - name: "large_batch", - md: generateLargeBatch(), - reqTestFunc: func(t *testing.T, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - assert.Equal(t, "keep-alive", r.Header.Get("Connection")) - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - assert.Equal(t, "OpenTelemetry-Collector Splunk Exporter/v0.0.1", r.Header.Get("User-Agent")) - assert.Equal(t, "Splunk 1234", r.Header.Get("Authorization")) - bodyBytes := body - // the last batch might not be zipped. - if r.Header.Get("Content-Encoding") == "gzip" { - zipReader, err2 := gzip.NewReader(bytes.NewReader(body)) - require.NoError(t, err2) - bodyBytes, _ = io.ReadAll(zipReader) - } - - events := strings.Split(string(bodyBytes), "}{") - firstPayload := events[0] - if len(events) > 1 { - firstPayload += "}" - } - - var metric splunk.Event - err = json.Unmarshal([]byte(firstPayload), &metric) - assert.NoError(t, err, fmt.Sprintf("could not read: %s", firstPayload)) - assert.Equal(t, "test_splunk", metric.Source) - assert.Equal(t, "test_type", metric.SourceType) - assert.Equal(t, "test_index", metric.Index) - - }, - maxContentLength: 1800, - httpResponseCode: http.StatusAccepted, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r) - } - w.WriteHeader(tt.httpResponseCode) - })) - defer server.Close() - - serverURL, err := url.Parse(server.URL) - assert.NoError(t, err) - - options := &exporterOptions{ - url: serverURL, - token: "1234", - } - - config := NewFactory().CreateDefaultConfig().(*Config) - config.Source = "test" - config.SourceType = "test_type" - config.Token = "1234" - config.Index = "test_index" - config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" - config.SplunkAppVersion = "v0.0.1" - config.MaxContentLengthMetrics = tt.maxContentLength - - httpClient, err := buildHTTPClient(config) - require.NoError(t, err) - - sender := buildClient(options, config, httpClient, zap.NewNop()) - - err = sender.pushMetricsData(context.Background(), tt.md) - if tt.wantErr { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - }) - } -} - -func generateLargeBatch() pmetric.Metrics { - ts := time.Now() - metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_splunkhec") - rm.Resource().Attributes().PutStr(splunk.DefaultSourceLabel, "test_splunk") - ms := rm.ScopeMetrics().AppendEmpty().Metrics() - - for i := 0; i < 6500; i++ { - m := ms.AppendEmpty() - m.SetName("test_" + strconv.Itoa(i)) - dp := m.SetEmptyGauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("k0", "v0") - dp.Attributes().PutStr("k1", "v1") - dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - dp.SetIntValue(int64(i)) - } - - return metrics -} - -func generateLargeLogsBatch() plog.Logs { - logs := plog.NewLogs() - rl := logs.ResourceLogs().AppendEmpty() - sl := rl.ScopeLogs().AppendEmpty() - sl.LogRecords().EnsureCapacity(65000) - ts := pcommon.Timestamp(123) - for i := 0; i < 65000; i++ { - logRecord := sl.LogRecords().AppendEmpty() - logRecord.Body().SetStr("mylog") - logRecord.Attributes().PutStr(splunk.DefaultSourceLabel, "myapp") - logRecord.Attributes().PutStr(splunk.DefaultSourceTypeLabel, "myapp-type") - logRecord.Attributes().PutStr(splunk.DefaultIndexLabel, "myindex") - logRecord.Attributes().PutStr(conventions.AttributeHostName, "myhost") - logRecord.Attributes().PutStr("custom", "custom") - logRecord.SetTimestamp(ts) - } - - return logs -} - -func TestConsumeLogsData(t *testing.T) { - smallBatch := plog.NewLogs() - logRecord := smallBatch.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - logRecord.Body().SetStr("mylog") - logRecord.Attributes().PutStr(conventions.AttributeHostName, "myhost") - logRecord.Attributes().PutStr("custom", "custom") - logRecord.SetTimestamp(123) - tests := []struct { - name string - ld plog.Logs - reqTestFunc func(t *testing.T, r *http.Request) - httpResponseCode int - wantErr bool - }{ - { - name: "happy_path", - ld: smallBatch, - reqTestFunc: func(t *testing.T, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, "keep-alive", r.Header.Get("Connection")) - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - assert.Equal(t, "OpenTelemetry-Collector Splunk Exporter/v0.0.1", r.Header.Get("User-Agent")) - assert.Equal(t, "Splunk 1234", r.Header.Get("Authorization")) - if r.Header.Get("Content-Encoding") == "gzip" { - t.Fatal("Small batch should not be compressed") - } - firstPayload := strings.Split(string(body), "\n")[0] - var event splunk.Event - err = json.Unmarshal([]byte(firstPayload), &event) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, "test", event.Source) - assert.Equal(t, "test_type", event.SourceType) - assert.Equal(t, "test_index", event.Index) - - }, - httpResponseCode: http.StatusAccepted, - }, - { - name: "response_forbidden", - ld: smallBatch, - reqTestFunc: nil, - httpResponseCode: http.StatusForbidden, - wantErr: true, - }, - { - name: "large_batch", - ld: generateLargeLogsBatch(), - reqTestFunc: nil, - httpResponseCode: http.StatusAccepted, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r) - } - w.WriteHeader(tt.httpResponseCode) - })) - defer server.Close() - - serverURL, err := url.Parse(server.URL) - assert.NoError(t, err) - - options := &exporterOptions{ - url: serverURL, - token: "1234", - } - - config := NewFactory().CreateDefaultConfig().(*Config) - config.Source = "test" - config.SourceType = "test_type" - config.Token = "1234" - config.Index = "test_index" - config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" - config.SplunkAppVersion = "v0.0.1" - - httpClient, err := buildHTTPClient(config) - require.NoError(t, err) - - sender := buildClient(options, config, httpClient, zap.NewNop()) - - err = sender.pushLogData(context.Background(), tt.ld) - if tt.wantErr { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - }) - } -} - -func TestExporterStartAlwaysReturnsNil(t *testing.T) { - buildInfo := component.NewDefaultBuildInfo() - config := &Config{ - Endpoint: "https://example.com:8088", - Token: "abc", - } - e, err := createExporter(config, zap.NewNop(), &buildInfo) - assert.NoError(t, err) - assert.NoError(t, e.start(context.Background(), componenttest.NewNopHost())) -} - -func TestHecHealthCheckFailed(t *testing.T) { - - healthCheckURL := &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"} - client, _ := newTestClient(503, "NOK") - err := checkHecHealth(client, healthCheckURL) - assert.Error(t, err) - assert.Contains(t, err.Error(), "503") -} - -func TestHecHealthCheckSucceded(t *testing.T) { - healthCheckURL := &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"} - - client, _ := newTestClient(200, "OK") - err := checkHecHealth(client, healthCheckURL) - assert.NoError(t, err) -} diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index e5abcd86f221..a22823465316 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -16,10 +16,10 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-col import ( "context" - "errors" "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -33,9 +33,10 @@ const ( // The value of "type" key in configuration. typeStr = "splunk_hec" // The stability level of the exporter. - stability = component.StabilityLevelBeta - defaultMaxIdleCons = 100 - defaultHTTPTimeout = 10 * time.Second + stability = component.StabilityLevelBeta + defaultMaxIdleCons = 100 + defaultHTTPTimeout = 10 * time.Second + defaultSplunkAppName = "OpenTelemetry Collector Contrib" ) // TODO: Find a place for this to be shared. @@ -61,16 +62,19 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + defaultMaxConns := defaultMaxIdleCons return &Config{ LogDataEnabled: true, ProfilingDataEnabled: true, - TimeoutSettings: exporterhelper.TimeoutSettings{ - Timeout: defaultHTTPTimeout, + HTTPClientSettings: confighttp.HTTPClientSettings{ + Timeout: defaultHTTPTimeout, + MaxIdleConnsPerHost: &defaultMaxConns, + MaxIdleConns: &defaultMaxConns, }, + SplunkAppName: defaultSplunkAppName, RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), DisableCompression: false, - MaxConnections: defaultMaxIdleCons, MaxContentLengthLogs: defaultContentLengthLogsLimit, MaxContentLengthMetrics: defaultContentLengthMetricsLimit, MaxContentLengthTraces: defaultContentLengthTracesLimit, @@ -94,27 +98,26 @@ func createTracesExporter( set exporter.CreateSettings, config component.Config, ) (exporter.Traces, error) { - if config == nil { - return nil, errors.New("nil config") - } cfg := config.(*Config) - exp, err := createExporter(cfg, set.Logger, &set.BuildInfo) - if err != nil { - return nil, err + c := &client{ + config: cfg, + logger: set.Logger, + telemetrySettings: set.TelemetrySettings, + buildInfo: set.BuildInfo, } return exporterhelper.NewTracesExporter( ctx, set, cfg, - exp.pushTraceData, + c.pushTraceData, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(cfg.RetrySettings), exporterhelper.WithQueue(cfg.QueueSettings), - exporterhelper.WithStart(exp.start), - exporterhelper.WithShutdown(exp.stop)) + exporterhelper.WithStart(c.start), + exporterhelper.WithShutdown(c.stop)) } func createMetricsExporter( @@ -122,27 +125,26 @@ func createMetricsExporter( set exporter.CreateSettings, config component.Config, ) (exporter.Metrics, error) { - if config == nil { - return nil, errors.New("nil config") - } cfg := config.(*Config) - exp, err := createExporter(cfg, set.Logger, &set.BuildInfo) - if err != nil { - return nil, err + c := &client{ + config: cfg, + logger: set.Logger, + telemetrySettings: set.TelemetrySettings, + buildInfo: set.BuildInfo, } exporter, err := exporterhelper.NewMetricsExporter( ctx, set, cfg, - exp.pushMetricsData, + c.pushMetricsData, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(cfg.RetrySettings), exporterhelper.WithQueue(cfg.QueueSettings), - exporterhelper.WithStart(exp.start), - exporterhelper.WithShutdown(exp.stop)) + exporterhelper.WithStart(c.start), + exporterhelper.WithShutdown(c.stop)) if err != nil { return nil, err } @@ -160,28 +162,26 @@ func createLogsExporter( set exporter.CreateSettings, config component.Config, ) (exporter exporter.Logs, err error) { - if config == nil { - return nil, errors.New("nil config") - } cfg := config.(*Config) - exp, err := createExporter(cfg, set.Logger, &set.BuildInfo) - - if err != nil { - return nil, err + c := &client{ + config: cfg, + logger: set.Logger, + telemetrySettings: set.TelemetrySettings, + buildInfo: set.BuildInfo, } logsExporter, err := exporterhelper.NewLogsExporter( ctx, set, cfg, - exp.pushLogData, + c.pushLogData, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(cfg.RetrySettings), exporterhelper.WithQueue(cfg.QueueSettings), - exporterhelper.WithStart(exp.start), - exporterhelper.WithShutdown(exp.stop)) + exporterhelper.WithStart(c.start), + exporterhelper.WithShutdown(c.stop)) if err != nil { return nil, err diff --git a/exporter/splunkhecexporter/factory_test.go b/exporter/splunkhecexporter/factory_test.go index 888a7edace49..4200988acaae 100644 --- a/exporter/splunkhecexporter/factory_test.go +++ b/exporter/splunkhecexporter/factory_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/exporter/exportertest" ) @@ -32,7 +33,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateMetricsExporter(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "https://example.com:8088/services/collector" + cfg.HTTPClientSettings.Endpoint = "https://example.com:8088/services/collector" cfg.Token = "1234-1234" params := exportertest.NewNopCreateSettings() @@ -40,15 +41,9 @@ func TestCreateMetricsExporter(t *testing.T) { assert.NoError(t, err) } -func TestCreateMetricsExporterNoConfig(t *testing.T) { - params := exportertest.NewNopCreateSettings() - _, err := createMetricsExporter(context.Background(), params, nil) - assert.Error(t, err) -} - func TestCreateTracesExporter(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "https://example.com:8088/services/collector" + cfg.HTTPClientSettings.Endpoint = "https://example.com:8088/services/collector" cfg.Token = "1234-1234" params := exportertest.NewNopCreateSettings() @@ -56,23 +51,9 @@ func TestCreateTracesExporter(t *testing.T) { assert.NoError(t, err) } -func TestCreateTracesExporterNoConfig(t *testing.T) { - params := exportertest.NewNopCreateSettings() - _, err := createTracesExporter(context.Background(), params, nil) - assert.Error(t, err) -} - -func TestCreateTracesExporterInvalidEndpoint(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "urn:something:12345" - params := exportertest.NewNopCreateSettings() - _, err := createTracesExporter(context.Background(), params, cfg) - assert.Error(t, err) -} - func TestCreateLogsExporter(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "https://example.com:8088/services/collector" + cfg.HTTPClientSettings.Endpoint = "https://example.com:8088/services/collector" cfg.Token = "1234-1234" params := exportertest.NewNopCreateSettings() @@ -80,25 +61,11 @@ func TestCreateLogsExporter(t *testing.T) { assert.NoError(t, err) } -func TestCreateLogsExporterNoConfig(t *testing.T) { - params := exportertest.NewNopCreateSettings() - _, err := createLogsExporter(context.Background(), params, nil) - assert.Error(t, err) -} - -func TestCreateLogsExporterInvalidEndpoint(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "urn:something:12345" - params := exportertest.NewNopCreateSettings() - _, err := createLogsExporter(context.Background(), params, cfg) - assert.Error(t, err) -} - func TestCreateInstanceViaFactory(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - cfg.Endpoint = "https://example.com:8088/services/collector" + cfg.HTTPClientSettings.Endpoint = "https://example.com:8088/services/collector" cfg.Token = "1234-1234" params := exportertest.NewNopCreateSettings() exp, err := factory.CreateMetricsExporter( @@ -109,7 +76,7 @@ func TestCreateInstanceViaFactory(t *testing.T) { // Set values that don't have a valid default. cfg.Token = "testToken" - cfg.Endpoint = "https://example.com" + cfg.HTTPClientSettings.Endpoint = "https://example.com" exp, err = factory.CreateMetricsExporter( context.Background(), params, cfg) @@ -121,8 +88,10 @@ func TestCreateInstanceViaFactory(t *testing.T) { func TestFactory_CreateMetricsExporter(t *testing.T) { config := &Config{ - Token: "testToken", - Endpoint: "https://example.com:8000", + Token: "testToken", + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "https://example.com:8000", + }, } params := exportertest.NewNopCreateSettings() @@ -130,34 +99,3 @@ func TestFactory_CreateMetricsExporter(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, te) } - -func TestFactory_CreateMetricsExporterFails(t *testing.T) { - tests := []struct { - name string - config *Config - errorMessage string - }{ - { - name: "empty_endpoint", - config: &Config{ - Token: "token", - }, - errorMessage: "requires a non-empty \"endpoint\"", - }, - { - name: "empty_token", - config: &Config{ - Endpoint: "https://example.com:8000", - }, - errorMessage: "requires a non-empty \"token\"", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - params := exportertest.NewNopCreateSettings() - te, err := createMetricsExporter(context.Background(), params, tt.config) - assert.EqualError(t, err, tt.errorMessage) - assert.Nil(t, te) - }) - } -} diff --git a/exporter/splunkhecexporter/go.mod b/exporter/splunkhecexporter/go.mod index 0888bb97371c..ff93eeeb2e42 100644 --- a/exporter/splunkhecexporter/go.mod +++ b/exporter/splunkhecexporter/go.mod @@ -22,8 +22,13 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.15.15 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -33,8 +38,10 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.8.1 // indirect + github.com/rs/cors v1.8.3 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/featuregate v0.70.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.37.0 // indirect go.opentelemetry.io/otel v1.11.2 // indirect go.opentelemetry.io/otel/metric v0.34.0 // indirect go.opentelemetry.io/otel/trace v1.11.2 // indirect diff --git a/exporter/splunkhecexporter/go.sum b/exporter/splunkhecexporter/go.sum index db9a60d3cd5a..9cd120e0f5a0 100644 --- a/exporter/splunkhecexporter/go.sum +++ b/exporter/splunkhecexporter/go.sum @@ -52,6 +52,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -64,8 +66,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -92,6 +97,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -158,6 +165,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -247,6 +256,8 @@ github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8d github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= +github.com/rs/cors v1.8.3 h1:O+qNyWn7Z+F9M0ILBHgMVPuB1xTOucVd5gtaYyXBpRo= +github.com/rs/cors v1.8.3/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= @@ -291,6 +302,8 @@ go.opentelemetry.io/collector/pdata v1.0.0-rc4 h1:vIQHHiaDqvTM3I30j3PDo44ttkv9n8 go.opentelemetry.io/collector/pdata v1.0.0-rc4/go.mod h1:ft/11i2R6Ld/DC543bAS4R30/W8heexIvNqtzmQpnLQ= go.opentelemetry.io/collector/semconv v0.70.0 h1:zuito4A9yoB5v+4M7GnwMBNcQdfhutHeysDAkTaGJjw= go.opentelemetry.io/collector/semconv v0.70.0/go.mod h1:UAp+qAMqEXOD0eEBmWJ3IJ5+LkF7zVTgmfufwpHmL8w= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.37.0 h1:yt2NKzK7Vyo6h0+X8BA4FpreZQTlVEIarnsBP/H5mzs= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.37.0/go.mod h1:+ARmXlUlc51J7sZeCBkBJNdHGySrdOzgzxp6VWRWM1U= go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= go.opentelemetry.io/otel/exporters/prometheus v0.34.0 h1:L5D+HxdaC/ORB47ribbTBbkXRZs9JzPjq0EoIOMWncM=