Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sarama (kafka client) to 1.17 #7665

Merged
merged 9 commits into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2030,12 +2030,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
Dependency: github.com/Shopify/sarama
Version: v1.16.0/enh/offset-replica-id
Revision: 32b4ad5c9537ed14e471779b76713ff65420db39
Version: v1.17.0/enh/offset-replica-id
Revision: d1575e4abe04acbbe8ac766320585cdf271dd189
License type (autodetected): MIT
./vendor/github.com/Shopify/sarama/LICENSE:
--------------------------------------------------------------------
Copyright (c) 2013 Evan Huus
Copyright (c) 2013 Shopify

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
Expand Down
4 changes: 4 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
4 changes: 4 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
14 changes: 12 additions & 2 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be

==== Compatibility

This output works with Kafka 0.8, 0.9, and 0.10.
This output works with all Kafka in between 0.11 and 1.1.1. Older versions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a breaking change? Should we add a changelog entry?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated changelog. We updated the client to 0.16 within the 6.4 dev-cycle, which did already mark the client update as breacking change.

might work as well, but are not supported.

==== Configuration options

Expand All @@ -691,7 +692,7 @@ Kafka version ${beatname_lc} is assumed to run against. Defaults to 1.0.0.

Event timestamps will be added, if version 0.10.0.0+ is enabled.

Valid values are all kafka releases in between `0.8.2.0` and `1.1.0`.
Valid values are all kafka releases in between `0.8.2.0` and `1.1.1`.

===== `username`

Expand Down Expand Up @@ -833,6 +834,15 @@ The keep-alive period for an active network connection. If 0s, keep-alives are d

Sets the output compression codec. Must be one of `none`, `snappy`, `lz4` and `gzip`. The default is `gzip`.

===== `compression_level`

Sets the compression level used by gzip. Setting this value to 0 disables compression.
The compression level must be in the range of 1 (best speed) to 9 (best compression).

Increasing the compression level will reduce the network usage but will increase the cpu usage.

The default value is 4.

[[kafka-max_message_bytes]]
===== `max_message_bytes`

Expand Down
70 changes: 40 additions & 30 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,26 @@ import (
)

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*common.Config `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
Version string `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Partition map[string]*common.Config `config:"partition"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version string `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
}

type metaConfig struct {
Expand Down Expand Up @@ -89,17 +90,18 @@ func defaultConfig() kafkaConfig {
},
RefreshFreq: 10 * time.Minute,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
Version: "1.0.0",
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: "1.0.0",
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
}

Expand All @@ -120,6 +122,13 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("password must be set when username is configured")
}

if c.Compression == "gzip" {
lvl := c.CompressionLevel
if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

return nil
}

Expand All @@ -138,6 +147,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Net.WriteTimeout = timeout
k.Net.KeepAlive = config.KeepAlive
k.Producer.Timeout = config.BrokerTimeout
k.Producer.CompressionLevel = config.CompressionLevel

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions libbeat/outputs/kafka/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
v0_11_0_1 = parseKafkaVersion("0.11.0.1")
v0_11_0_2 = parseKafkaVersion("0.11.0.2")
v1_0_1 = parseKafkaVersion("1.0.1")
v1_1_0 = parseKafkaVersion("1.1.0")
v1_0_2 = parseKafkaVersion("1.0.2")
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V1_0_0_0,
Expand Down Expand Up @@ -61,10 +62,12 @@ var (

"1.0.0": sarama.V1_0_0_0,
"1.0.1": v1_0_1,
"1.0": v1_0_1,
"1.1.0": v1_1_0,
"1.1": v1_1_0,
"1": v1_1_0,
"1.0.2": v1_0_2,
"1.0": v1_0_2,
"1.1.0": sarama.V1_1_0_0,
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,
}
)

Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,10 @@ output.elasticsearch:
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
Expand Down
38 changes: 38 additions & 0 deletions vendor/github.com/Shopify/sarama/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/Shopify/sarama/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions vendor/github.com/Shopify/sarama/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/Shopify/sarama/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading