From d935fc8200d61143ae5e720ea6a3dbf4159156e8 Mon Sep 17 00:00:00 2001 From: Cody Kaczynski Date: Tue, 3 Sep 2024 22:35:23 -0400 Subject: [PATCH] [receiver/kafkareceiver]: allow tunable fetch sizes (#34431) **Description:** This commit adds the ability to tune the minimum, default and maximum fetch sizes for the Kafka Receiver in the OpenTelemetry configuration file. The defaults are kept consistent with the defaults imposed by [sarama](https://pkg.go.dev/github.com/shopify/sarama#Config) **Link to tracking Issue:** Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22741 **Testing:** Built the image with `make docker-otelcontribcol`, uploaded to [Docker Hub](https://hub.docker.com/r/ckaczynski/otelcontribcol) to pull the image into our Kubernetes deployment, and hit the cluster with 200k logs per second with the following settings: ``` default_fetch_size: 15728640 # 15MB max_fetch_size: 31457280 # 30MB min_fetch_size: 1048576 # 1MB ``` These limits are absurdly high for no reason other than because I just wanted to set them higher and see how far I could push it in our cluster. No new tests were added, but added to existing configuration tests in config.go and factory.go **Documentation:** Added the new configuration options to the receiver's README --------- Co-authored-by: Israel Blancas Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com> --- ...feat_kafkareceiver-fetch-configurable.yaml | 27 +++++++++++++++++++ receiver/kafkareceiver/README.md | 3 +++ receiver/kafkareceiver/config.go | 7 +++++ receiver/kafkareceiver/config_test.go | 6 +++++ receiver/kafkareceiver/factory.go | 10 +++++++ receiver/kafkareceiver/factory_test.go | 3 +++ receiver/kafkareceiver/kafka_receiver.go | 21 +++++++++++++++ 7 files changed, 77 insertions(+) create mode 100644 .chloggen/feat_kafkareceiver-fetch-configurable.yaml diff --git a/.chloggen/feat_kafkareceiver-fetch-configurable.yaml b/.chloggen/feat_kafkareceiver-fetch-configurable.yaml new file mode 100644 index 0000000000000..831c10a113317 --- /dev/null +++ b/.chloggen/feat_kafkareceiver-fetch-configurable.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds tunable fetch sizes to Kafka Receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22741, 34431] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Adds the ability to tune the minumum, default and maximum fetch sizes for the Kafka Receiver + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 84dc5a8dcc9e5..495fb3b422ae9 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -45,6 +45,9 @@ The following settings can be optionally configured: - `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. - `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities. - `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. +- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte. +- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB. +- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited. - `auth` - `plain_text` - `username`: The username to use. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 85d72ed3b4291..5b25c7c7c5f30 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -78,6 +78,13 @@ type Config struct { // Extract headers from kafka records HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"` + + // The minimum bytes per fetch from Kafka (default "1") + MinFetchSize int32 `mapstructure:"min_fetch_size"` + // The default bytes per fetch from Kafka (default "1048576") + DefaultFetchSize int32 `mapstructure:"default_fetch_size"` + // The maximum bytes per fetch from Kafka (default "0", no limit) + MaxFetchSize int32 `mapstructure:"max_fetch_size"` } const ( diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index a8fea4b1267f0..2eb643b4275e0 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -62,6 +62,9 @@ func TestLoadConfig(t *testing.T) { Enable: true, Interval: 1 * time.Second, }, + MinFetchSize: 1, + DefaultFetchSize: 1048576, + MaxFetchSize: 0, }, }, { @@ -96,6 +99,9 @@ func TestLoadConfig(t *testing.T) { Enable: true, Interval: 1 * time.Second, }, + MinFetchSize: 1, + DefaultFetchSize: 1048576, + MaxFetchSize: 0, }, }, } diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index ef5cd96107020..a6fe7ace9f743 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -40,6 +40,13 @@ const ( defaultAutoCommitEnable = true // default from sarama.NewConfig() defaultAutoCommitInterval = 1 * time.Second + + // default from sarama.NewConfig() + defaultMinFetchSize = int32(1) + // default from sarama.NewConfig() + defaultDefaultFetchSize = int32(1048576) + // default from sarama.NewConfig() + defaultMaxFetchSize = int32(0) ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") @@ -120,6 +127,9 @@ func createDefaultConfig() component.Config { HeaderExtraction: HeaderExtraction{ ExtractHeaders: false, }, + MinFetchSize: defaultMinFetchSize, + DefaultFetchSize: defaultDefaultFetchSize, + MaxFetchSize: defaultMaxFetchSize, } } diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index e386967abd7df..87c435edba2a3 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -28,6 +28,9 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, defaultInitialOffset, cfg.InitialOffset) assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout) assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval) + assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize) + assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize) + assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize) } func TestCreateTracesReceiver(t *testing.T) { diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 7a322e14d3436..d4554b5437f50 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -49,6 +49,9 @@ type kafkaTracesConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. @@ -67,6 +70,9 @@ type kafkaMetricsConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. @@ -85,6 +91,9 @@ type kafkaLogsConsumer struct { messageMarking MessageMarking headerExtraction bool headers []string + minFetchSize int32 + defaultFetchSize int32 + maxFetchSize int32 } var _ receiver.Traces = (*kafkaTracesConsumer)(nil) @@ -112,6 +121,9 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil } @@ -125,6 +137,9 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) { saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval + saramaConfig.Consumer.Fetch.Min = config.MinFetchSize + saramaConfig.Consumer.Fetch.Default = config.DefaultFetchSize + saramaConfig.Consumer.Fetch.Max = config.MaxFetchSize var err error if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil { @@ -235,6 +250,9 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil } @@ -329,6 +347,9 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar headerExtraction: config.HeaderExtraction.ExtractHeaders, headers: config.HeaderExtraction.Headers, telemetryBuilder: telemetryBuilder, + minFetchSize: config.MinFetchSize, + defaultFetchSize: config.DefaultFetchSize, + maxFetchSize: config.MaxFetchSize, }, nil }