Skip to content

Commit

Permalink
Fix support for basic auth with the schema registry (#94)
Browse files Browse the repository at this point in the history
* Fix support for basic auth with the schema registry

This fixes an issue with supporting basic auth when using the schema registry

The plugin currently expects 'schema_registry_secret` and `schema_registry_key` to
be set to authenticate with the schema registry. However, while this works during the
'register' phase, the authentication type `basic.auth.credentials.source` is not set
when creating the kafka consumer, leading to authentication failures when attempting
to deserialize the kafka payload.

This commit correctly sets the `basic.auth.credentials.source` to `USER_INFO` when
`schema_registry_key` and `schema_registry_secret` are set, and also enables the use
of a basic auth embedded in the `schema_registry_url` via `username:password@url`

This commit also adds integration tests for auth'ed schema registry integration
  • Loading branch information
robbavey authored Jul 6, 2021
1 parent 0465820 commit 65f24c3
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 35 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.7.7
- Fix: Correct the settings to allow basic auth to work properly, either by setting `schema_registry_key/secret` or embedding username/password in the
url [#94](https://github.com/logstash-plugins/logstash-integration-kafka/pull/94)

## 10.7.6
- Test: specify development dependency version [#91](https://github.com/logstash-plugins/logstash-integration-kafka/pull/91)

Expand Down
10 changes: 6 additions & 4 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ sleep 10
echo "Downloading Confluent Platform"
curl -s -o build/confluent_platform.tar.gz http://packages.confluent.io/archive/5.5/confluent-community-5.5.1-2.12.tar.gz
mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1
cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.realm=SchemaRegistry-Props" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
cp spec/fixtures/jaas.config build/confluent_platform/etc/schema-registry
cp spec/fixtures/pwd build/confluent_platform/etc/schema-registry

echo "Setting up test topics with test data"
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --zookeeper localhost:2181
Expand All @@ -46,8 +52,4 @@ cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic lo
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --broker-list localhost:9092 --compression-codec lz4

echo "Starting SchemaRegistry"
build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &
sleep 10

echo "Setup complete, running specs"
3 changes: 0 additions & 3 deletions kafka_test_teardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# Setup Kafka and create test topics
set -ex

echo "Stoppping SchemaRegistry"
build/confluent_platform/bin/schema-registry-stop

echo "Unregistering test topics"
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'logstash_integration_.*'
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'topic_avro.*'
Expand Down
5 changes: 4 additions & 1 deletion lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ def create_consumer(client_id)
if schema_registry_url
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class)
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.to_s)
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s)
if schema_registry_proxy && !schema_registry_proxy.empty?
props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host)
props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port)
end
if schema_registry_key && !schema_registry_key.empty?
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
props.put(serdes_config::USER_INFO_CONFIG, schema_registry_key + ":" + schema_registry_secret.value)
else
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'URL')
end
end
if security_protocol == "SSL"
Expand Down
3 changes: 1 addition & 2 deletions lib/logstash/plugin_mixins/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ def check_for_schema_registry_connectivity_and_subjects
options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value}
end
client = Manticore::Client.new(options)

begin
response = client.get(@schema_registry_url.to_s + '/subjects').body
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body
rescue Manticore::ManticoreException => e
raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.7.6'
s.version = '10.7.7'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
5 changes: 5 additions & 0 deletions spec/fixtures/jaas.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SchemaRegistry-Props {
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
file="build/confluent_platform/etc/schema-registry/pwd"
debug="true";
};
5 changes: 5 additions & 0 deletions spec/fixtures/pwd
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fred: OBF:1w8t1tvf1w261w8v1w1c1tvn1w8x,user,admin
barney: changeme,user,developer
admin:admin,admin
betty: MD5:164c88b302622e17050af52c89945d44,user
wilma: CRYPT:adpexzg3FUZAk,admin,sr-user
137 changes: 113 additions & 24 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)


describe "schema registry connection options" do
schema_registry = Manticore::Client.new
before (:all) do
shutdown_schema_registry
startup_schema_registry(schema_registry)
end

after(:all) do
shutdown_schema_registry
end

context "remote endpoint validation" do
it "should fail if not reachable" do
config = {'schema_registry_url' => 'http://localnothost:8081'}
Expand All @@ -232,8 +242,7 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)
end

after(:each) do
schema_registry_client = Manticore::Client.new
delete_remote_schema(schema_registry_client, SUBJECT_NAME)
delete_remote_schema(schema_registry, SUBJECT_NAME)
end

it "should correctly complete registration phase" do
Expand Down Expand Up @@ -264,9 +273,25 @@ def delete_remote_schema(schema_registry_client, subject_name)

# AdminClientConfig = org.alpache.kafka.clients.admin.AdminClientConfig

def startup_schema_registry(schema_registry, auth=false)
system('./stop_schema_registry.sh')
auth ? system('./start_auth_schema_registry.sh') : system('./start_schema_registry.sh')
url = auth ? "http://barney:changeme@localhost:8081" : "http://localhost:8081"
Stud.try(20.times, [Manticore::SocketException, StandardError, RSpec::Expectations::ExpectationNotMetError]) do
expect(schema_registry.get(url).code).to eq(200)
end
end

describe "Schema registry API", :integration => true do
schema_registry = Manticore::Client.new

before(:all) do
startup_schema_registry(schema_registry)
end

let(:schema_registry) { Manticore::Client.new }
after(:all) do
shutdown_schema_registry
end

context 'listing subject on clean instance' do
it "should return an empty set" do
Expand All @@ -292,37 +317,58 @@ def delete_remote_schema(schema_registry_client, subject_name)
expect( subjects ).to be_empty
end
end
end

def shutdown_schema_registry
system('./stop_schema_registry.sh')
end

describe "Deserializing with the schema registry", :integration => true do
schema_registry = Manticore::Client.new

shared_examples 'it reads from a topic using a schema registry' do |with_auth|

before(:all) do
shutdown_schema_registry
startup_schema_registry(schema_registry, with_auth)
end

after(:all) do
shutdown_schema_registry
end

context 'use the schema to serialize' do
after(:each) do
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value').code ).to be(200)
expect( schema_registry.delete("#{subject_url}/#{avro_topic_name}-value").code ).to be(200)
sleep 1
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value?permanent=true').code ).to be(200)
expect( schema_registry.delete("#{subject_url}/#{avro_topic_name}-value?permanent=true").code ).to be(200)

Stud.try(3.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
subjects = JSON.parse schema_registry.get(subject_url).body
subjects.empty?
end.to be_truthy
end
end

let(:group_id_1) {rand(36**8).to_s(36)}

let(:avro_topic_name) { "topic_avro" }

let(:plain_config) do
{ 'schema_registry_url' => 'http://localhost:8081',
'topics' => [avro_topic_name],
'codec' => 'plain',
'group_id' => group_id_1,
'auto_offset_reset' => 'earliest' }
let(:base_config) do
{
'topics' => [avro_topic_name],
'codec' => 'plain',
'group_id' => group_id_1,
'auto_offset_reset' => 'earliest'
}
end

def delete_topic_if_exists(topic_name)
let(:group_id_1) {rand(36**8).to_s(36)}

def delete_topic_if_exists(topic_name, user = nil, password = nil)
props = java.util.Properties.new
props.put(Java::org.apache.kafka.clients.admin.AdminClientConfig::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
unless user.nil?
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
props.put(serdes_config::USER_INFO_CONFIG, "#{user}:#{password}")
end
admin_client = org.apache.kafka.clients.admin.AdminClient.create(props)
topics_list = admin_client.listTopics().names().get()
if topics_list.contains(topic_name)
Expand All @@ -331,14 +377,18 @@ def delete_topic_if_exists(topic_name)
end
end

def write_some_data_to(topic_name)
def write_some_data_to(topic_name, user = nil, password = nil)
props = java.util.Properties.new
config = org.apache.kafka.clients.producer.ProducerConfig

serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")

props.put(config::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
unless user.nil?
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
props.put(serdes_config::USER_INFO_CONFIG, "#{user}:#{password}")
end
props.put(config::KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.java_class)
props.put(config::VALUE_SERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroSerializer.java_class)

Expand All @@ -360,11 +410,11 @@ def write_some_data_to(topic_name)
end

it "stored a new schema using Avro Kafka serdes" do
delete_topic_if_exists avro_topic_name
write_some_data_to avro_topic_name
auth ? delete_topic_if_exists(avro_topic_name, user, password) : delete_topic_if_exists(avro_topic_name)
auth ? write_some_data_to(avro_topic_name, user, password) : write_some_data_to(avro_topic_name)

subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
expect( subjects ).to contain_exactly("topic_avro-value")
subjects = JSON.parse schema_registry.get(subject_url).body
expect( subjects ).to contain_exactly("#{avro_topic_name}-value")

num_events = 1
queue = consume_messages(plain_config, timeout: 30, event_count: num_events)
Expand All @@ -375,4 +425,43 @@ def write_some_data_to(topic_name)
expect( elem.get("map_field")["inner_field"] ).to eq("inner value")
end
end

context 'with an unauthed schema registry' do
let(:auth) { false }
let(:avro_topic_name) { "topic_avro" }
let(:subject_url) { "http://localhost:8081/subjects" }
let(:plain_config) { base_config.merge!({'schema_registry_url' => "http://localhost:8081"}) }

it_behaves_like 'it reads from a topic using a schema registry', false
end

context 'with an authed schema registry' do
let(:auth) { true }
let(:user) { "barney" }
let(:password) { "changeme" }
let(:avro_topic_name) { "topic_avro_auth" }
let(:subject_url) { "http://#{user}:#{password}@localhost:8081/subjects" }

context 'using schema_registry_key' do
let(:plain_config) do
base_config.merge!({
'schema_registry_url' => "http://localhost:8081",
'schema_registry_key' => user,
'schema_registry_secret' => password
})
end

it_behaves_like 'it reads from a topic using a schema registry', true
end

context 'using schema_registry_url' do
let(:plain_config) do
base_config.merge!({
'schema_registry_url' => "http://#{user}:#{password}@localhost:8081"
})
end

it_behaves_like 'it reads from a topic using a schema registry', true
end
end
end
6 changes: 6 additions & 0 deletions start_auth_schema_registry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
# Setup Kafka and create test topics
set -ex

echo "Starting authed SchemaRegistry"
SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=build/confluent_platform/etc/schema-registry/jaas.config build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/authed-schema-registry.properties > /dev/null 2>&1 &
6 changes: 6 additions & 0 deletions start_schema_registry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
# Setup Kafka and create test topics
set -ex

echo "Starting SchemaRegistry"
build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &
7 changes: 7 additions & 0 deletions stop_schema_registry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash
# Setup Kafka and create test topics
set -ex

echo "Stoppping SchemaRegistry"
build/confluent_platform/bin/schema-registry-stop
sleep 5

0 comments on commit 65f24c3

Please sign in to comment.