Skip to content

Commit

Permalink
Updates Kafka client from 2.8.1 to 3.3.1 (#130)
Browse files Browse the repository at this point in the history
Kafka 3.3.1 correspond to Confluent Platform 7.3.0.
This PR updates the client library and, due to the removal of zookeeper flag in launching integration Kafka instance, update also the bash scripts used in integration testing.
The update of the Schema Registry forces the dependency on Logstash 8.3 because it's the first that ships Jackson 2.13.3 which is strictly required during the instantiation of the client.

Co-authored-by: Karen Metts <[email protected]>
  • Loading branch information
andsel and karenzone authored Dec 16, 2022
1 parent 1322ae5 commit a388a3f
Show file tree
Hide file tree
Showing 16 changed files with 104 additions and 47 deletions.
6 changes: 3 additions & 3 deletions .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ env

set -ex

export KAFKA_VERSION=2.8.1
export KAFKA_VERSION=3.3.1
./kafka_test_setup.sh

jruby -rbundler/setup -S rspec -fd
jruby -rbundler/setup -S rspec -fd --tag integration
bundle exec rspec -fd
bundle exec rspec -fd --tag integration

./kafka_test_teardown.sh
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
import:
- logstash-plugins/.ci:travis/[email protected]
- logstash-plugins/.ci:travis/[email protected]
- logstash-plugins/.ci:travis/[email protected]

env:
jobs:
# lock on version 8.x because use of Jackson 2.13.3 available from 8.3.0
- ELASTIC_STACK_VERSION=8.x DOCKER_ENV=dockerjdk17.env
- SNAPSHOT=true ELASTIC_STACK_VERSION=8.x DOCKER_ENV=dockerjdk17.env
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Unreleased
- Changed Kafka client to 3.3.1, requires Logstash >= 8.3.0.
- Deprecated `default` value for setting `client_dns_lookup` forcing to `use_all_dns_ips` when explicitly used.
[#130](https://github.com/logstash-plugins/logstash-integration-kafka/pull/130)

## 10.12.0
- bump kafka client to 2.8.1 [#115](https://github.com/logstash-plugins/logstash-integration-kafka/pull/115)

Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ group "org.logstash.integrations"

sourceCompatibility = JavaVersion.VERSION_1_8

// given https://docs.confluent.io/current/installation/versions-interoperability.html matrix
// Confluent Platform 7.3.x is Apache Kafka 3.3.x
String confluentKafkaVersion = '7.3.0'
String apacheKafkaVersion = '3.3.1'

buildscript {
repositories {
mavenCentral()
Expand All @@ -42,31 +47,29 @@ repositories {
}

dependencies {
implementation('io.confluent:kafka-avro-serializer:6.2.2') {
implementation("io.confluent:kafka-avro-serializer:${confluentKafkaVersion}") {
exclude group: 'org.apache.kafka', module:'kafka-clients'
}
implementation('io.confluent:kafka-schema-serializer:6.2.2') {
implementation("io.confluent:kafka-schema-serializer:${confluentKafkaVersion}") {
exclude group: 'org.apache.kafka', module:'kafka-clients'
}
implementation 'io.confluent:common-config:6.2.2'
implementation "io.confluent:common-config:${confluentKafkaVersion}"
implementation 'org.apache.avro:avro:1.11.0'
implementation('io.confluent:kafka-schema-registry-client:6.2.2') {
implementation("io.confluent:kafka-schema-registry-client:${confluentKafkaVersion}") {
exclude group: 'org.apache.kafka', module:'kafka-clients'
}
implementation('org.apache.kafka:kafka_2.12:2.8.1') {
implementation("org.apache.kafka:kafka_2.12:${apacheKafkaVersion}") {
// contains kafka.utils.VerifiableProperties used by kafka-schema-registry-client, explicitly exclude
// transitive dependencies
transitive = false
}
implementation 'io.confluent:common-utils:6.2.2'
implementation "io.confluent:common-utils:${confluentKafkaVersion}"
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
implementation 'org.glassfish.jersey.core:jersey-common:2.33'
// given https://docs.confluent.io/current/installation/versions-interoperability.html matrix
// Confluent Platform 6.2.x is Apache Kafka 2.8.x
implementation 'org.apache.kafka:kafka-clients:2.8.1'
implementation "org.apache.kafka:kafka-clients:${apacheKafkaVersion}"
implementation 'com.github.luben:zstd-jni:1.5.2-2'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'org.lz4:lz4-java:1.7.1'
implementation 'org.lz4:lz4-java:1.8.0'
implementation 'org.xerial.snappy:snappy-java:1.1.8.4'
}
task generateGemJarRequiresFile {
Expand Down
2 changes: 1 addition & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
:plugin: kafka
:type: integration
:no_codec:
:kafka_client: 2.8.1
:kafka_client: 3.3.1

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down
10 changes: 8 additions & 2 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
:plugin: kafka
:type: input
:default_codec: plain
:kafka_client: 2.8
:kafka_client_doc: 25
:kafka_client: 3.3
:kafka_client_doc: 33

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down Expand Up @@ -211,6 +211,12 @@ IP addresses for a hostname, they will all be attempted to connect to before fai
connection. If the value is `resolve_canonical_bootstrap_servers_only` each entry will be
resolved and expanded into a list of canonical names.

[NOTE]
====
Starting from Kafka 3 `default` value for `client.dns.lookup` value has been removed.
If explicitly configured it fallbacks to `use_all_dns_ips`.
====

[id="plugins-{type}s-{plugin}-client_id"]
===== `client_id`

Expand Down
10 changes: 8 additions & 2 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
:plugin: kafka
:type: output
:default_codec: plain
:kafka_client: 2.8
:kafka_client_doc: 25
:kafka_client: 3.3
:kafka_client_doc: 33

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down Expand Up @@ -181,6 +181,12 @@ all IP addresses returned for a hostname before failing the connection.
If set to `resolve_canonical_bootstrap_servers_only`, each entry will be
resolved and expanded into a list of canonical names.

[NOTE]
====
Starting from Kafka 3 `default` value for `client.dns.lookup` value has been removed.
If explicitly configured it fallbacks to `use_all_dns_ips`.
====

[id="plugins-{type}s-{plugin}-client_id"]
===== `client_id`

Expand Down
28 changes: 14 additions & 14 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -ex
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
KAFKA_VERSION=2.8.1
KAFKA_VERSION=3.3.1
fi

export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"
Expand Down Expand Up @@ -35,19 +35,19 @@ cp spec/fixtures/jaas.config build/confluent_platform/etc/schema-registry
cp spec/fixtures/pwd build/confluent_platform/etc/schema-registry

echo "Setting up test topics with test data"
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain_with_headers --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain_with_headers_badly --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_snappy --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_lz4 --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_topic1 --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic logstash_integration_topic2 --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic3 --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_gzip_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_snappy_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_lz4_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_zstd_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --zookeeper localhost:2181
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain_with_headers --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain_with_headers_badly --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_snappy --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_lz4 --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_topic1 --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic logstash_integration_topic2 --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic3 --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_gzip_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_snappy_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_lz4_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic logstash_integration_zstd_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --bootstrap-server localhost:9092
curl -s -o build/apache_logs.txt https://s3.amazonaws.com/data.elasticsearch.org/apache_logs/apache_logs.txt
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --broker-list localhost:9092
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy
Expand Down
4 changes: 2 additions & 2 deletions kafka_test_teardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
set -ex

echo "Unregistering test topics"
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'logstash_integration_.*'
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'topic_avro.*'
build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'logstash_integration_.*'
build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*'

echo "Stopping Kafka broker"
build/kafka/bin/kafka-server-stop.sh
Expand Down
16 changes: 8 additions & 8 deletions lib/logstash-integration-kafka_jars.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# AUTOGENERATED BY THE GRADLE SCRIPT. DO NOT EDIT.

require 'jar_dependencies'
require_jar('io.confluent', 'kafka-avro-serializer', '6.2.2')
require_jar('io.confluent', 'kafka-schema-serializer', '6.2.2')
require_jar('io.confluent', 'common-config', '6.2.2')
require_jar('io.confluent', 'kafka-avro-serializer', '7.3.0')
require_jar('io.confluent', 'kafka-schema-serializer', '7.3.0')
require_jar('io.confluent', 'common-config', '7.3.0')
require_jar('org.apache.avro', 'avro', '1.11.0')
require_jar('io.confluent', 'kafka-schema-registry-client', '6.2.2')
require_jar('org.apache.kafka', 'kafka_2.12', '2.8.1')
require_jar('io.confluent', 'common-utils', '6.2.2')
require_jar('io.confluent', 'kafka-schema-registry-client', '7.3.0')
require_jar('org.apache.kafka', 'kafka_2.12', '3.3.1')
require_jar('io.confluent', 'common-utils', '7.3.0')
require_jar('javax.ws.rs', 'javax.ws.rs-api', '2.1.1')
require_jar('org.glassfish.jersey.core', 'jersey-common', '2.33')
require_jar('org.apache.kafka', 'kafka-clients', '2.8.1')
require_jar('org.apache.kafka', 'kafka-clients', '3.3.1')
require_jar('com.github.luben', 'zstd-jni', '1.5.2-2')
require_jar('org.slf4j', 'slf4j-api', '1.7.36')
require_jar('org.lz4', 'lz4-java', '1.7.1')
require_jar('org.lz4', 'lz4-java', '1.8.0')
require_jar('org.xerial.snappy', 'snappy-java', '1.1.8.4')
4 changes: 3 additions & 1 deletion lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# IP addresses for a hostname, they will all be attempted to connect to before failing the
# connection. If the value is `resolve_canonical_bootstrap_servers_only` each entry will be
# resolved and expanded into a list of canonical names.
config :client_dns_lookup, :validate => ["default", "use_all_dns_ips", "resolve_canonical_bootstrap_servers_only"], :default => "default"
# Starting from Kafka 3 `default` value for `client.dns.lookup` value has been removed. If explicitly configured it fallbacks to `use_all_dns_ips`.
config :client_dns_lookup, :validate => ["default", "use_all_dns_ips", "resolve_canonical_bootstrap_servers_only"], :default => "use_all_dns_ips"
# The id string to pass to the server when making requests. The purpose of this
# is to be able to track the source of requests beyond just ip/port by allowing
# a logical application name to be included.
Expand Down Expand Up @@ -257,6 +258,7 @@ def initialize(params = {})
def register
@runner_threads = []
@metadata_mode = extract_metadata_level(@decorate_events)
reassign_dns_lookup
@pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil?
check_schema_registry_parameters
end
Expand Down
4 changes: 3 additions & 1 deletion lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
# IP addresses for a hostname, they will all be attempted to connect to before failing the
# connection. If the value is `resolve_canonical_bootstrap_servers_only` each entry will be
# resolved and expanded into a list of canonical names.
config :client_dns_lookup, :validate => ["default", "use_all_dns_ips", "resolve_canonical_bootstrap_servers_only"], :default => "default"
# Starting from Kafka 3 `default` value for `client.dns.lookup` value has been removed. If explicitly configured it fallbacks to `use_all_dns_ips`.
config :client_dns_lookup, :validate => ["default", "use_all_dns_ips", "resolve_canonical_bootstrap_servers_only"], :default => "use_all_dns_ips"
# The id string to pass to the server when making requests.
# The purpose of this is to be able to track the source of requests beyond just
# ip/port by allowing a logical application name to be included with the request
Expand Down Expand Up @@ -190,6 +191,7 @@ def register
logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
end

reassign_dns_lookup

@producer = create_producer
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
Expand Down
8 changes: 8 additions & 0 deletions lib/logstash/plugin_mixins/kafka/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,13 @@ def set_sasl_config(props)
props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
end

def reassign_dns_lookup
if @client_dns_lookup == "default"
@client_dns_lookup = "use_all_dns_ips"
logger.warn("client_dns_lookup setting 'default' value is deprecated, forced to 'use_all_dns_ips', please update your configuration")
deprecation_logger.deprecated("Deprecated value `default` for `client_dns_lookup` option; use `use_all_dns_ips` instead.")
end
end

end
end end end
4 changes: 2 additions & 2 deletions logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.12.0'
s.version = '11.0.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down Expand Up @@ -41,7 +41,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-core", ">= 6.5.0"
s.add_runtime_dependency "logstash-core", ">= 8.3.0"

s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'logstash-codec-plain'
Expand Down
10 changes: 10 additions & 0 deletions spec/unit/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@
it "should register" do
expect { subject.register }.to_not raise_error
end

context "when the deprecated `default` is specified" do
let(:config) { common_config.merge('client_dns_lookup' => 'default') }

it 'should fallback `client_dns_lookup` to `use_all_dns_ips`' do
subject.register

expect(subject.client_dns_lookup).to eq('use_all_dns_ips')
end
end
end

describe '#running' do
Expand Down
8 changes: 8 additions & 0 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
expect(kafka.topic_id).to eql 'test'
expect(kafka.key_serializer).to eql 'org.apache.kafka.common.serialization.StringSerializer'
end

it 'should fallback `client_dns_lookup` to `use_all_dns_ips` when the deprecated `default` is specified' do
simple_kafka_config["client_dns_lookup"] = 'default'
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
kafka.register

expect(kafka.client_dns_lookup).to eq('use_all_dns_ips')
end
end

context 'when outputting messages' do
Expand Down

0 comments on commit a388a3f

Please sign in to comment.