Skip to content

Commit

Permalink
Add fine tuned oath/sasl configuration to kafka client (#189)
Browse files Browse the repository at this point in the history
* Add missing oauth config (#180)

* Add missing `sasl.login.callback.handler.class"` config (#180)

* Add unit tests for new oauth and sasl config options

* 11.6.0 release prep

Add changelog entry and increment minor version for new configuration features.

* Align default values with kafka client lib

This commit updates the plugin defaults to align with the default (or lack of
default) values in the underlying kafka client library.

https://github.com/apache/kafka/blob/3.8.1/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java

* Explicitly document no default setting

Co-authored-by: Mashhur <[email protected]>

* Explicitly document no default setting

Co-authored-by: Mashhur <[email protected]>

* Explicitly document no default setting

Co-authored-by: Mashhur <[email protected]>

* Explicitly document no default setting

Co-authored-by: Mashhur <[email protected]>

---------

Co-authored-by: darby.han <[email protected]>
Co-authored-by: Mashhur <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent 3b5789c commit 6de2996
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.6.0
- Support additional `oauth` and `sasl` configuration options for configuring kafka client [#189](https://github.com/logstash-plugins/logstash-integration-kafka/pull/189)

## 11.5.4
- Update kafka client to 3.8.1 and transitive dependencies [#188](https://github.com/logstash-plugins/logstash-integration-kafka/pull/188)
- Removed Jar Dependencies dependency [#187](https://github.com/logstash-plugins/logstash-integration-kafka/pull/187)
Expand Down
62 changes: 59 additions & 3 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
Expand Down Expand Up @@ -556,13 +563,62 @@ retries are exhausted.
The amount of time to wait before attempting to retry a failed fetch request
to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.

[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
===== `sasl_client_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
===== `sasl_oauthbearer_token_endpoint_url`
* Value type is <<string,string>>
* There is no default value for this setting.

The URL for the OAuth 2.0 issuer token endpoint.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
===== `sasl_oauthbearer_scope_claim_name`
* Value type is <<string,string>>
* Default value is `"scope"`

(optional) The override name of the scope claim.

[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
===== `sasl_login_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL login callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
===== `sasl_login_connect_timeout_ms`
* Value type is <<number,number>>
* There is no default value for this setting.

(optional) The duration, in milliseconds, for HTTPS connect timeout

[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
===== `sasl_login_read_timeout_ms`
* Value type is <<number,number>>
* There is no default value for this setting.

(optional) The duration, in milliseconds, for HTTPS read timeout.

[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
===== `sasl_login_retry_backoff_ms`
* Value type is <<number,number>>
* Default value is `100` milliseconds.

(optional) The duration, in milliseconds, to wait between HTTPS call attempts.

[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
===== `sasl_login_retry_backoff_max_ms`
* Value type is <<number,number>>
* Default value is `10000` milliseconds.

(optional) The maximum duration, in milliseconds, for HTTPS call attempts.

[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
===== `sasl_jaas_config`

Expand Down
62 changes: 59 additions & 3 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-retries>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
Expand Down Expand Up @@ -392,13 +399,62 @@ In versions prior to 10.5.0, any exception is retried indefinitely unless the `r

The amount of time to wait before attempting to retry a failed produce request to a given topic partition.

[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
===== `sasl_client_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
===== `sasl_oauthbearer_token_endpoint_url`
* Value type is <<string,string>>
* There is no default value for this setting.

The URL for the OAuth 2.0 issuer token endpoint.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
===== `sasl_oauthbearer_scope_claim_name`
* Value type is <<string,string>>
* Default value is `"scope"`

(optional) The override name of the scope claim.

[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
===== `sasl_login_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL login callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
===== `sasl_login_connect_timeout_ms`
* Value type is <<number,number>>
* There is no default value for this setting.

(optional) The duration, in milliseconds, for HTTPS connect timeout

[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
===== `sasl_login_read_timeout_ms`
* Value type is <<number,number>>
* There is no default value for this setting.

(optional) The duration, in milliseconds, for HTTPS read timeout.

[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
===== `sasl_login_retry_backoff_ms`
* Value type is <<number,number>>
* Default value is `100` milliseconds.

(optional) The duration, in milliseconds, to wait between HTTPS call attempts.

[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
===== `sasl_login_retry_backoff_max_ms`
* Value type is <<number,number>>
* Default value is `10000` milliseconds.

(optional) The maximum duration, in milliseconds, for HTTPS call attempts.

[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
===== `sasl_jaas_config`

Expand Down
14 changes: 14 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# The URL for the OAuth 2.0 issuer token endpoint.
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
# (optional) The override name of the scope claim.
config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default
# SASL login callback handler class
config :sasl_login_callback_handler_class, :validate => :string
# (optional) The duration, in milliseconds, for HTTPS connect timeout
config :sasl_login_connect_timeout_ms, :validate => :number
# (optional) The duration, in milliseconds, for HTTPS read timeout.
config :sasl_login_read_timeout_ms, :validate => :number
# (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default
# (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
# This may be any mechanism for which a security provider is available.
# GSSAPI is the default mechanism.
Expand Down
14 changes: 14 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# The URL for the OAuth 2.0 issuer token endpoint.
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
# (optional) The override name of the scope claim.
config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default
# SASL login callback handler class
config :sasl_login_callback_handler_class, :validate => :string
# (optional) The duration, in milliseconds, for HTTPS connect timeout
config :sasl_login_connect_timeout_ms, :validate => :number
# (optional) The duration, in milliseconds, for HTTPS read timeout.
config :sasl_login_read_timeout_ms, :validate => :number
# (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default
# (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
# This may be any mechanism for which a security provider is available.
# GSSAPI is the default mechanism.
Expand Down
7 changes: 7 additions & 0 deletions lib/logstash/plugin_mixins/kafka/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ def set_sasl_config(props)
props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil?
props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil?
props.put("sasl.oauthbearer.scope.claim.name", sasl_oauthbearer_scope_claim_name) unless sasl_oauthbearer_scope_claim_name.nil?
props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil?
props.put("sasl.login.connect.timeout.ms", sasl_login_connect_timeout_ms.to_s) unless sasl_login_connect_timeout_ms.nil?
props.put("sasl.login.read.timeout.ms", sasl_login_read_timeout_ms.to_s) unless sasl_login_read_timeout_ms.nil?
props.put("sasl.login.retry.backoff.ms", sasl_login_retry_backoff_ms.to_s) unless sasl_login_retry_backoff_ms.nil?
props.put("sasl.login.retry.backoff.max.ms", sasl_login_retry_backoff_max_ms.to_s) unless sasl_login_retry_backoff_max_ms.nil?
end

def reassign_dns_lookup
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.5.4'
s.version = '11.6.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
48 changes: 48 additions & 0 deletions spec/unit/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,54 @@

end

context 'when oauth is configured' do
let(:config) { super().merge(
'security_protocol' => 'SASL_PLAINTEXT',
'sasl_mechanism' => 'OAUTHBEARER',
'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token',
'sasl_oauthbearer_scope_claim_name' => 'custom_scope'
)}

it "sets oauth properties" do
expect(org.apache.kafka.clients.consumer.KafkaConsumer).
to receive(:new).with(hash_including(
'security.protocol' => 'SASL_PLAINTEXT',
'sasl.mechanism' => 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token',
'sasl.oauthbearer.scope.claim.name' => 'custom_scope'
)).and_return(kafka_client = double('kafka-consumer'))

expect(subject.send(:create_consumer, 'test-client-1', 'group_instance_id')).to be kafka_client
end
end

context 'when sasl is configured' do
let(:config) { super().merge(
'security_protocol' => 'SASL_PLAINTEXT',
'sasl_mechanism' => 'OAUTHBEARER',
'sasl_login_connect_timeout_ms' => 15000,
'sasl_login_read_timeout_ms' => 5000,
'sasl_login_retry_backoff_ms' => 200,
'sasl_login_retry_backoff_max_ms' => 15000,
'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler'
)}

it "sets sasl login properties" do
expect(org.apache.kafka.clients.consumer.KafkaConsumer).
to receive(:new).with(hash_including(
'security.protocol' => 'SASL_PLAINTEXT',
'sasl.mechanism' => 'OAUTHBEARER',
'sasl.login.connect.timeout.ms' => '15000',
'sasl.login.read.timeout.ms' => '5000',
'sasl.login.retry.backoff.ms' => '200',
'sasl.login.retry.backoff.max.ms' => '15000',
'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler'
)).and_return(kafka_client = double('kafka-consumer'))

expect(subject.send(:create_consumer, 'test-client-2', 'group_instance_id')).to be kafka_client
end
end

describe "schema registry" do
let(:base_config) do {
'schema_registry_url' => 'http://localhost:8081',
Expand Down
52 changes: 50 additions & 2 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'@timestamp' => LogStash::Timestamp.now}) }

let(:future) { double('kafka producer future') }
subject { LogStash::Outputs::Kafka.new(config) }

context 'when initializing' do
it "should register" do
Expand Down Expand Up @@ -267,8 +268,6 @@
File.join(File.dirname(__FILE__), '../../fixtures/trust-store_stub.jks')
end

subject { LogStash::Outputs::Kafka.new(config) }

it 'sets empty ssl.endpoint.identification.algorithm' do
expect(org.apache.kafka.clients.producer.KafkaProducer).
to receive(:new).with(hash_including('ssl.endpoint.identification.algorithm' => ''))
Expand All @@ -283,4 +282,53 @@

end

context 'when oauth is configured' do
let(:config) {
simple_kafka_config.merge(
'security_protocol' => 'SASL_PLAINTEXT',
'sasl_mechanism' => 'OAUTHBEARER',
'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token',
'sasl_oauthbearer_scope_claim_name' => 'custom_scope'
)
}

it "sets oauth properties" do
expect(org.apache.kafka.clients.producer.KafkaProducer).
to receive(:new).with(hash_including(
'security.protocol' => 'SASL_PLAINTEXT',
'sasl.mechanism' => 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token',
'sasl.oauthbearer.scope.claim.name' => 'custom_scope'
))
subject.register
end
end

context 'when sasl is configured' do
let(:config) {
simple_kafka_config.merge(
'security_protocol' => 'SASL_PLAINTEXT',
'sasl_mechanism' => 'OAUTHBEARER',
'sasl_login_connect_timeout_ms' => 15000,
'sasl_login_read_timeout_ms' => 5000,
'sasl_login_retry_backoff_ms' => 200,
'sasl_login_retry_backoff_max_ms' => 15000,
'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler'
)
}

it "sets sasl login properties" do
expect(org.apache.kafka.clients.producer.KafkaProducer).
to receive(:new).with(hash_including(
'security.protocol' => 'SASL_PLAINTEXT',
'sasl.mechanism' => 'OAUTHBEARER',
'sasl.login.connect.timeout.ms' => '15000',
'sasl.login.read.timeout.ms' => '5000',
'sasl.login.retry.backoff.ms' => '200',
'sasl.login.retry.backoff.max.ms' => '15000',
'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler'
))
subject.register
end
end
end

0 comments on commit 6de2996

Please sign in to comment.