Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/splunkhecexporter] expose HTTPClientSettings on splunkhecexporter #16839

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/splunkhecexporter_httpclientsettings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Expose HTTPClientSettings on splunkhecexporter

# One or more tracking issues related to the change
issues: [16838]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 10 additions & 2 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following configuration options can also be configured:
- `source` (no default): Optional Splunk source: https://docs.splunk.com/Splexicon:Source
- `sourcetype` (no default): Optional Splunk source type: https://docs.splunk.com/Splexicon:Sourcetype
- `index` (no default): Splunk index, optional name of the Splunk index targeted
- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data.
- `max_connections` (default: 100): Maximum HTTP connections to use simultaneously when sending data. Deprecated: use `max_idle_conns` or `max_idle_conns_per_host` instead. See [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md) for more info.
- `disable_compression` (default: false): Whether to disable gzip compression over HTTP.
- `timeout` (default: 10s): HTTP timeout when sending data.
- `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS.
Expand Down Expand Up @@ -92,7 +92,7 @@ exporters:
# Splunk index, optional name of the Splunk index targeted.
index: "metrics"
# Maximum HTTP connections to use simultaneously when sending data. Defaults to 100.
max_connections: 200
max_idle_conns: 200
# Whether to disable gzip compression over HTTP. Defaults to false.
disable_compression: false
# HTTP timeout when sending data. Defaults to 10s.
Expand All @@ -118,5 +118,13 @@ with detailed sample configurations [here](testdata/config.yaml).
This exporter also offers proxy support as documented
[here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter#proxy-support).

## Advanced Configuration

Several helper files are leveraged to provide additional capabilities automatically:

- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)

[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
81 changes: 76 additions & 5 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"sync"

jsoniter "github.com/json-iterator/go"
Expand All @@ -34,10 +36,12 @@ import (

// client sends the data to the splunk backend.
type client struct {
config *Config
logger *zap.Logger
wg sync.WaitGroup
hecWorker hecWorker
config *Config
logger *zap.Logger
wg sync.WaitGroup
telemetrySettings component.TelemetrySettings
hecWorker hecWorker
buildInfo component.BuildInfo
}

func (c *client) pushMetricsData(
Expand Down Expand Up @@ -615,6 +619,73 @@ func (c *client) stop(context.Context) error {
return nil
}

func (c *client) start(context.Context, component.Host) (err error) {
func (c *client) start(ctx context.Context, host component.Host) (err error) {

httpClient, err := buildHTTPClient(c.config, host, c.telemetrySettings)
if err != nil {
return err
}

if c.config.HecHealthCheckEnabled {
healthCheckURL, _ := c.config.getURL()
healthCheckURL.Path = c.config.HealthPath
if err := checkHecHealth(httpClient, healthCheckURL); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)}
return nil
}

func checkHecHealth(client *http.Client, healthCheckURL *url.URL) error {

req, err := http.NewRequest("GET", healthCheckURL.String(), nil)
if err != nil {
return consumererror.NewPermanent(err)
}

resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

err = splunk.HandleHTTPCode(resp)
if err != nil {
return err
}

return nil
}

func buildHTTPClient(config *Config, host component.Host, telemetrySettings component.TelemetrySettings) (*http.Client, error) {
// we handle compression explicitly.
config.HTTPClientSettings.Compression = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't mutate the original config. We should make a copy instead. Can you please create an issue to follow up on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I think we do this in signalfx too. I'll make an issue. I guess those objects will eventually be immutable, is that right?

Copy link
Member

@dmitryax dmitryax Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing them to be immutable would be pretty complicated, but as a rule we've been trying to not mutate the original user-supplied config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so would you agree a better fix is to validate the user doesn't set a compression algo then? That would take care of this, and might be more elegant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be better, but we also override MaxIdleConns and MaxIdleConnsPerHost

if config.MaxConnections != 0 && (config.MaxIdleConns == nil || config.HTTPClientSettings.MaxIdleConnsPerHost == nil) {
telemetrySettings.Logger.Warn("You are using the deprecated `max_connections` option that will be removed soon; use `max_idle_conns` and/or `max_idle_conns_per_host` instead: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/splunkhecexporter#advanced-configuration")
intMaxConns := int(config.MaxConnections)
if config.HTTPClientSettings.MaxIdleConns == nil {
config.HTTPClientSettings.MaxIdleConns = &intMaxConns
}
if config.HTTPClientSettings.MaxIdleConnsPerHost == nil {
config.HTTPClientSettings.MaxIdleConnsPerHost = &intMaxConns
}
}
return config.ToClient(host, telemetrySettings)
}

func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]string {
appVersion := config.SplunkAppVersion
if appVersion == "" {
appVersion = buildInfo.Version
}
return map[string]string{
"Connection": "keep-alive",
"Content-Type": "application/json",
"User-Agent": config.SplunkAppName + "/" + appVersion,
"Authorization": splunk.HECTokenHeader + " " + string(config.Token),
"__splunk_app_name": config.SplunkAppName,
"__splunk_app_version": config.SplunkAppVersion,
}
}
29 changes: 12 additions & 17 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -208,7 +209,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i
}

factory := NewFactory()
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.Token = "1234-1234"

rr := make(chan receivedRequest)
Expand Down Expand Up @@ -258,7 +259,7 @@ func runTraceExport(testConfig *Config, traces ptrace.Traces, expectedBatchesNum

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.DisableCompression = testConfig.DisableCompression
cfg.MaxContentLengthTraces = testConfig.MaxContentLengthTraces
cfg.Token = "1234-1234"
Expand Down Expand Up @@ -319,7 +320,7 @@ func runLogExport(cfg *Config, ld plog.Logs, expectedBatchesNum int, t *testing.
panic(err)
}

cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.Token = "1234-1234"

rr := make(chan receivedRequest)
Expand Down Expand Up @@ -881,7 +882,7 @@ func TestErrorReceived(t *testing.T) {

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
Expand Down Expand Up @@ -930,7 +931,7 @@ func TestInvalidURL(t *testing.T) {
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.Endpoint = "ftp://example.com:134"
cfg.HTTPClientSettings.Endpoint = "ftp://example.com:134"
cfg.Token = "1234-1234"
params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
Expand All @@ -957,12 +958,6 @@ func TestInvalidJson(t *testing.T) {
assert.Error(t, err)
}

func TestStartAlwaysReturnsNil(t *testing.T) {
atoulme marked this conversation as resolved.
Show resolved Hide resolved
c := client{}
err := c.start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
}

func Test_pushLogData_nil_Logs(t *testing.T) {
tests := []struct {
name func(bool) string
Expand Down Expand Up @@ -1095,7 +1090,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {

// An HTTP client that returns status code 400 and response body responseBody.
httpClient, _ := newTestClient(400, responseBody)
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}
// Sending logs using the client.
err := splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
Expand All @@ -1106,7 +1101,7 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {

// An HTTP client that returns some other status code other than 400 and response body responseBody.
httpClient, _ = newTestClient(500, responseBody)
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
splunkClient.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}
// Sending logs using the client.
err = splunkClient.pushLogData(context.Background(), logs)
// TODO: Uncomment after consumererror.Logs implements method Unwrap.
Expand All @@ -1132,7 +1127,7 @@ func Test_pushLogData_ShouldReturnUnsentLogsOnly(t *testing.T) {

// The first record is to be sent successfully, the second one should not
httpClient, _ := newTestClientWithPresetResponses([]int{200, 400}, []string{"OK", "NOK"})
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

err := c.pushLogData(context.Background(), logs)
require.Error(t, err)
Expand All @@ -1157,7 +1152,7 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) {
var headers *[]http.Header

httpClient, headers := newTestClient(200, "OK")
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

// A 300-byte buffer only fits one record (around 200 bytes), so each record will be sent separately
c.config.MaxContentLengthLogs, c.config.DisableCompression = 300, true
Expand Down Expand Up @@ -1223,7 +1218,7 @@ func benchPushLogData(b *testing.B, numResources int, numProfiling int, numNonPr
}

httpClient, _ := newTestClient(200, "OK")
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config)}
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}

c.config.MaxContentLengthLogs = bufSize
logs := createLogDataWithCustomLibraries(numResources, []string{"otel.logs", "otel.profiling"}, []int{numNonProfiling, numProfiling})
Expand All @@ -1241,7 +1236,7 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) {
c := client{
config: config,
logger: zaptest.NewLogger(t),
hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config)},
hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())},
}
c.config.MaxContentLengthLogs = 1

Expand Down
Loading