Skip to content

Commit

Permalink
Adds schema registry's truststore and keystore settings (#137)
Browse files Browse the repository at this point in the history
This commit mainly exposes location password and type settings for schema registry's secret and key stores.
It brings those configuration options, if available, and directly forward down to the Kafka's SerDes library and Manticore client.
Introduces a script named setup_keystore_and_truststore.sh to setup keystore and truststore used in integration tests.
Furthermore it reworks a little bit the bash scripts to setup Kafka and Schema Registry in integration test to avoid download of artifacts if they are already locally downloaded.

Co-authored-by: João Duarte <[email protected]>
Co-authored-by: Mashhur <[email protected]>
  • Loading branch information
3 people authored Feb 14, 2023
1 parent 0462434 commit 91bc9ce
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ build/
.idea/
vendor
*.jar
tls_repository/
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.2.0
- Added TLS truststore and keystore settings specifically to access the schema registry [#137](https://github.com/logstash-plugins/logstash-integration-kafka/pull/137)

## 11.1.0
- Added config `group_instance_id` to use the Kafka's consumer static membership feature [#135](https://github.com/logstash-plugins/logstash-integration-kafka/pull/135)

Expand Down
54 changes: 54 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-schema_registry_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_registry_proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-schema_registry_secret>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_location>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_type>> |<<string,string>>, one of `["jks", "PKCS12"]`|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_location>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_type>> |<<string,string>>, one of `["jks", "PKCS12"]`|No
| <<plugins-{type}s-{plugin}-schema_registry_url>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-schema_registry_validation>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
Expand Down Expand Up @@ -598,6 +604,54 @@ Set the address of a forward HTTP proxy. An empty string is treated as if proxy

Set the password for basic authorization to access remote Schema Registry.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_location"]
===== `schema_registry_ssl_keystore_location`

* Value type is <<path,path>>
* There is no default value for this setting.

If schema registry client authentication is required, this setting stores the keystore path.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_password"]
===== `schema_registry_ssl_keystore_password`

* Value type is <<password,password>>
* There is no default value for this setting.

If schema registry authentication is required, this setting stores the keystore password.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_type"]
===== `schema_registry_ssl_keystore_type`

* Value type is <<string,string>>
* There is no default value for this setting.

The format of the keystore file. It must be either `jks` or `PKCS12`.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_location"]
===== `schema_registry_ssl_truststore_location`

* Value type is <<path,path>>
* There is no default value for this setting.

The truststore path to validate the schema registry's certificate.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_password"]
===== `schema_registry_ssl_truststore_password`

* Value type is <<password,password>>
* There is no default value for this setting.

The schema registry truststore password.

[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_type"]
===== `schema_registry_ssl_truststore_type`

* Value type is <<string,string>>
* There is no default value for this setting.

The format of the schema registry's truststore file. It must be either `jks` or `PKCS12`.

[id="plugins-{type}s-{plugin}-schema_registry_url"]
===== `schema_registry_url`

Expand Down
40 changes: 36 additions & 4 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Setup Kafka and create test topics

set -ex
# check if KAFKA_VERSION env var is set
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
Expand All @@ -13,8 +14,12 @@ export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"
rm -rf build
mkdir build

echo "Downloading Kafka version $KAFKA_VERSION"
curl -s -o build/kafka.tgz "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
echo "Setup Kafka version $KAFKA_VERSION"
if [ ! -e "kafka_2.12-$KAFKA_VERSION.tgz" ]; then
echo "Kafka not present locally, downloading"
curl -s -o "kafka_2.12-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
fi
cp kafka_2.12-$KAFKA_VERSION.tgz build/kafka.tgz
mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1

echo "Starting ZooKeeper"
Expand All @@ -24,9 +29,36 @@ echo "Starting Kafka broker"
build/kafka/bin/kafka-server-start.sh -daemon build/kafka/config/server.properties --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs"
sleep 10

echo "Downloading Confluent Platform"
curl -s -o build/confluent_platform.tar.gz http://packages.confluent.io/archive/5.5/confluent-community-5.5.1-2.12.tar.gz
echo "Setup Confluent Platform"
# check if CONFLUENT_VERSION env var is set
if [ -n "${CONFLUENT_VERSION+1}" ]; then
echo "CONFLUENT_VERSION is $CONFLUENT_VERSION"
else
CONFLUENT_VERSION=5.5.1
fi
if [ ! -e confluent-community-$CONFLUENT_VERSION-2.12.tar.gz ]; then
echo "Confluent Platform not present locally, downloading"
CONFLUENT_MINOR=$(echo "$CONFLUENT_VERSION" | sed -n 's/^\([[:digit:]]*\.[[:digit:]]*\)\.[[:digit:]]*$/\1/p')
echo "CONFLUENT_MINOR is $CONFLUENT_MINOR"
curl -s -o confluent-community-$CONFLUENT_VERSION-2.12.tar.gz http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
fi
cp confluent-community-$CONFLUENT_VERSION-2.12.tar.gz build/confluent_platform.tar.gz
mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1

echo "Configuring TLS on Schema registry"
rm -Rf tls_repository
mkdir tls_repository
./setup_keystore_and_truststore.sh
# configure schema-registry to handle https on 8083 port
if [[ "$OSTYPE" == "darwin"* ]]; then
sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
else
sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
fi
echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
echo "ssl.keystore.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
echo "ssl.key.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties

cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
Expand Down
3 changes: 3 additions & 0 deletions kafka_test_teardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ echo "Stopping Kafka broker"
build/kafka/bin/kafka-server-stop.sh
echo "Stopping zookeeper"
build/kafka/bin/zookeeper-server-stop.sh

echo "Clean TLS folder"
rm -Rf tls_repository
11 changes: 11 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,17 @@ def create_consumer(client_id, group_instance_id)
set_trustore_keystore_config(props)
set_sasl_config(props)
end
if schema_registry_ssl_truststore_location
props.put('schema.registry.ssl.truststore.location', schema_registry_ssl_truststore_location)
props.put('schema.registry.ssl.truststore.password', schema_registry_ssl_truststore_password.value)
props.put('schema.registry.ssl.truststore.type', schema_registry_ssl_truststore_type)
end

if schema_registry_ssl_keystore_location
props.put('schema.registry.ssl.keystore.location', schema_registry_ssl_keystore_location)
props.put('schema.registry.ssl.keystore.password', schema_registry_ssl_keystore_password.value)
props.put('schema.registry.ssl.keystore.type', schema_registry_ssl_keystore_type)
end

org.apache.kafka.clients.consumer.KafkaConsumer.new(props)
rescue => e
Expand Down
31 changes: 31 additions & 0 deletions lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ def setup_schema_registry_config
# This option permits to define a proxy to be used to reach the schema registry service instance.
config :schema_registry_proxy, :validate => :uri

# If schema registry client authentication is required, this setting stores the keystore path.
config :schema_registry_ssl_keystore_location, :validate => :string

# The keystore password.
config :schema_registry_ssl_keystore_password, :validate => :password

# The keystore type
config :schema_registry_ssl_keystore_type, :validate => ['jks', 'PKCS12'], :default => "jks"

# The JKS truststore path to validate the Schema Registry's certificate.
config :schema_registry_ssl_truststore_location, :validate => :string

# The truststore password.
config :schema_registry_ssl_truststore_password, :validate => :password

# The truststore type
config :schema_registry_ssl_truststore_type, :validate => ['jks', 'PKCS12'], :default => "jks"

# Option to skip validating the schema registry during registration. This can be useful when using
# certificate based auth
config :schema_registry_validation, :validate => ['auto', 'skip'], :default => 'auto'
Expand Down Expand Up @@ -68,6 +86,19 @@ def check_for_schema_registry_connectivity_and_subjects
if schema_registry_key and !schema_registry_key.empty?
options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value}
end
if schema_registry_ssl_truststore_location and !schema_registry_ssl_truststore_location.empty?
options[:ssl] = {} unless options.key?(:ssl)
options[:ssl][:truststore] = schema_registry_ssl_truststore_location unless schema_registry_ssl_truststore_location.nil?
options[:ssl][:truststore_password] = schema_registry_ssl_truststore_password.value unless schema_registry_ssl_truststore_password.nil?
options[:ssl][:truststore_type] = schema_registry_ssl_truststore_type unless schema_registry_ssl_truststore_type.nil?
end
if schema_registry_ssl_keystore_location and !schema_registry_ssl_keystore_location.empty?
options[:ssl] = {} unless options.key? :ssl
options[:ssl][:keystore] = schema_registry_ssl_keystore_location unless schema_registry_ssl_keystore_location.nil?
options[:ssl][:keystore_password] = schema_registry_ssl_keystore_password.value unless schema_registry_ssl_keystore_password.nil?
options[:ssl][:keystore_type] = schema_registry_ssl_keystore_type unless schema_registry_ssl_keystore_type.nil?
end

client = Manticore::Client.new(options)
begin
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.1.0'
s.version = '11.2.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
12 changes: 12 additions & 0 deletions setup_keystore_and_truststore.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
# Setup Schema Registry keystore and Kafka's schema registry client's truststore
set -ex

echo "Generating schema registry key store"
keytool -genkey -alias schema_reg -keyalg RSA -keystore tls_repository/schema_reg.jks -keypass changeit -storepass changeit -validity 365 -keysize 2048 -dname "CN=localhost, OU=John Doe, O=Acme Inc, L=Unknown, ST=Unknown, C=IT"

echo "Exporting schema registry certificate"
keytool -exportcert -rfc -keystore tls_repository/schema_reg.jks -storepass changeit -alias schema_reg -file tls_repository/schema_reg_certificate.pem

echo "Creating client's truststore and importing schema registry's certificate"
keytool -import -trustcacerts -file tls_repository/schema_reg_certificate.pem -keypass changeit -storepass changeit -keystore tls_repository/clienttruststore.jks -noprompt
Loading

0 comments on commit 91bc9ce

Please sign in to comment.