Skip to content

Commit

Permalink
Feat: added input isolation_level for fine control of transactional…
Browse files Browse the repository at this point in the history
… messages (#44)

Co-authored-by: Karol Bucek <[email protected]>
  • Loading branch information
timtebeek and kares authored Jul 3, 2020
1 parent 05ffbed commit a2b84cb
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 10.4.0
- added the input `isolation_level` to allow fine control of whether to return transactional messages [#44](https://github.com/logstash-plugins/logstash-integration-kafka/pull/44)

## 10.3.0
- added the input and output `client_dns_lookup` parameter to allow control of how DNS requests are made

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Contributors:
* Kurt Hurtado (kurtado)
* Ry Biesemeyer (yaauie)
* Rob Cowart (robcowart)
* Tim te Beek (timtebeek)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
12 changes: 12 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ See the https://kafka.apache.org/24/documentation for more details.
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-isolation_level>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-key_deserializer_class>> |<<string,string>>|No
Expand Down Expand Up @@ -315,6 +316,17 @@ consumers join or leave the group. The value must be set lower than
`session.timeout.ms`, but typically should be set no higher than 1/3 of that value.
It can be adjusted even lower to control the expected time for normal rebalances.

[id="plugins-{type}s-{plugin}-isolation_level"]
===== `isolation_level`

* Value type is <<string,string>>
* Default value is `"read_uncommitted"`

Controls how to read messages written transactionally. If set to `read_committed`, polling messages will only return
transactional messages which have been committed. If set to `read_uncommitted` (the default), polling messages will
return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned
unconditionally in either mode.

[id="plugins-{type}s-{plugin}-jaas_path"]
===== `jaas_path`

Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# `session.timeout.ms`, but typically should be set no higher than 1/3 of that value.
# It can be adjusted even lower to control the expected time for normal rebalances.
config :heartbeat_interval_ms, :validate => :number, :default => 3000 # Kafka default
# Controls how to read messages written transactionally. If set to read_committed, consumer.poll()
# will only return transactional messages which have been committed. If set to read_uncommitted'
# (the default), consumer.poll() will return all messages, even transactional messages which have
# been aborted. Non-transactional messages will be returned unconditionally in either mode.
config :isolation_level, :validate => ["read_uncommitted", "read_committed"], :default => "read_uncommitted" # Kafka default
# Java Class used to deserialize the record's key
config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer"
# The maximum delay between invocations of poll() when using consumer group management. This places
Expand Down Expand Up @@ -311,6 +316,7 @@ def create_consumer(client_id)
props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_s) unless fetch_min_bytes.nil?
props.put(kafka::GROUP_ID_CONFIG, group_id)
props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_s) unless heartbeat_interval_ms.nil?
props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level)
props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes.to_s) unless max_partition_fetch_bytes.nil?
props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records.to_s) unless max_poll_records.nil?
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.3.0'
s.version = '10.4.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down

0 comments on commit a2b84cb

Please sign in to comment.