Skip to content

Commit

Permalink
add zstd compression type (#112)
Browse files Browse the repository at this point in the history
* add zstd compression type

Co-authored-by: Rob Bavey <[email protected]>
Co-authored-by: Karen Metts <[email protected]>
  • Loading branch information
3 people authored Feb 2, 2022
1 parent cc2cbb7 commit e5bf126
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.10.0

- Added config setting to enable 'zstd' compression in the Kafka output [#112](https://github.com/logstash-plugins/logstash-integration-kafka/pull/112)

## 10.9.0
- Refactor: leverage codec when using schema registry [#106](https://github.com/logstash-plugins/logstash-integration-kafka/pull/106)

Expand Down
2 changes: 1 addition & 1 deletion DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ See http://kafka.apache.org/documentation.html#producerconfigs for details about
kafka {
topic_id => ... # string (required), The topic to produce the messages to
broker_list => ... # string (optional), default: "localhost:9092", This is for bootstrapping and the producer will only use it for getting metadata
compression_codec => ... # string (optional), one of ["none", "gzip", "snappy"], default: "none"
compression_codec => ... # string (optional), one of ["none", "gzip", "snappy", "lz4", "zstd"], default: "none"
compressed_topics => ... # string (optional), default: "", This parameter allows you to set whether compression should be turned on for particular
request_required_acks => ... # number (optional), one of [-1, 0, 1], default: 0, This value controls when a produce request is considered completed
serializer_class => ... # string, (optional) default: "kafka.serializer.StringEncoder", The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]
Expand Down
6 changes: 3 additions & 3 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-buffer_memory>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-client_dns_lookup>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-compression_type>> |<<string,string>>, one of `["none", "gzip", "snappy", "lz4"]`|No
| <<plugins-{type}s-{plugin}-compression_type>> |<<string,string>>, one of `["none", "gzip", "snappy", "lz4", "zstd"]`|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-key_serializer>> |<<string,string>>|No
Expand Down Expand Up @@ -193,11 +193,11 @@ ip/port by allowing a logical application name to be included with the request
[id="plugins-{type}s-{plugin}-compression_type"]
===== `compression_type`

* Value can be any of: `none`, `gzip`, `snappy`, `lz4`
* Value can be any of: `none`, `gzip`, `snappy`, `lz4`, `zstd`
* Default value is `"none"`

The compression type for all data generated by the producer.
The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4.
The default is none (meaning no compression). Valid values are none, gzip, snappy, lz4, or zstd.

[id="plugins-{type}s-{plugin}-jaas_path"]
===== `jaas_path`
Expand Down
1 change: 1 addition & 0 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 -
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_gzip_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_snappy_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_lz4_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_zstd_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --zookeeper localhost:2181
curl -s -o build/apache_logs.txt https://s3.amazonaws.com/data.elasticsearch.org/apache_logs/apache_logs.txt
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --broker-list localhost:9092
Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
# The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
config :buffer_memory, :validate => :number, :default => 33_554_432 # (32M) Kafka default
# The compression type for all data generated by the producer.
# The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
config :compression_type, :validate => ["none", "gzip", "snappy", "lz4"], :default => "none"
# The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4 or zstd.
config :compression_type, :validate => ["none", "gzip", "snappy", "lz4", "zstd"], :default => "none"
# How DNS lookups should be done. If set to `use_all_dns_ips`, when the lookup returns multiple
# IP addresses for a hostname, they will all be attempted to connect to before failing the
# connection. If the value is `resolve_canonical_bootstrap_servers_only` each entry will be
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.9.0'
s.version = '10.10.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
19 changes: 19 additions & 0 deletions spec/integration/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,25 @@
# end
end

context 'when using zstd compression' do
let(:test_topic) { 'logstash_integration_zstd_topic' }

before :each do
config = base_config.merge({"topic_id" => test_topic, "compression_type" => "zstd"})
load_kafka_data(config)
end

# NOTE: depends on zstd-ruby gem which is using a C-extension
# it 'should have data integrity' do
# messages = fetch_messages(test_topic)
#
# expect(messages.size).to eq(num_events)
# messages.each do |m|
# expect(m.value).to eq(event.to_s)
# end
# end
end

context 'when using multi partition topic' do
let(:num_events) { 100 } # ~ more than (batch.size) 16,384 bytes
let(:test_topic) { 'logstash_integration_topic3' }
Expand Down

0 comments on commit e5bf126

Please sign in to comment.