Skip to content

Commit

Permalink
Changed: config defaults to be aligned with Kafka client defaults (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
kares authored Apr 29, 2020
1 parent 818a3a2 commit a8c923a
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 149 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 10.2.0
- Changed: config defaults to be aligned with Kafka client defaults [#30](https://github.com/logstash-plugins/logstash-integration-kafka/pull/30)

## 10.1.0
- updated kafka client (and its dependencies) to version 2.4.1 ([#16](https://github.com/logstash-plugins/logstash-integration-kafka/pull/16))
- added the input `client_rack` parameter to enable support for follower fetching
Expand Down
2 changes: 1 addition & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ task :default do
end

task :vendor do
exit(1) unless system './gradlew vendor'
exit(1) unless system './gradlew --no-daemon vendor'
end

task :clean do
Expand Down
2 changes: 1 addition & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ The Kafka Integration Plugin provides integrated plugins for working with the ht
- {logstash-ref}/plugins-inputs-kafka.html[Kafka Input Plugin]
- {logstash-ref}/plugins-outputs-kafka.html[Kafka Output Plugin]

This plugin uses Kafka Client 2.3.0. For broker compatibility, see the official https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix[Kafka compatibility reference]. If the linked compatibility wiki is not up-to-date, please contact Kafka support/community to confirm compatibility.
This plugin uses Kafka Client 2.4. For broker compatibility, see the official https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix[Kafka compatibility reference]. If the linked compatibility wiki is not up-to-date, please contact Kafka support/community to confirm compatibility.

:no_codec!:
133 changes: 70 additions & 63 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance --
more threads than partitions means that some threads will be idle

For more information see http://kafka.apache.org/documentation.html#theconsumer
For more information see https://kafka.apache.org/24/documentation.html#theconsumer

Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
Kafka consumer configuration: https://kafka.apache.org/24/documentation.html#consumerconfigs

==== Metadata fields

Expand All @@ -71,46 +71,48 @@ inserted into your original event, you'll have to use the `mutate` filter to man

This plugin supports these configuration options plus the <<plugins-{type}s-{plugin}-common-options>> described later.

NOTE: Some of these options map to a Kafka option. See the https://kafka.apache.org/documentation for more details.
NOTE: Some of these options map to a Kafka option. Defaults usually reflect the Kafka default setting,
and might change if Kafka's consumer defaults change.
See the https://kafka.apache.org/24/documentation for more details.

[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-auto_offset_reset>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-bootstrap_servers>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-check_crcs>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-check_crcs>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_rack>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-consumer_threads>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-decorate_events>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-enable_auto_commit>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-enable_auto_commit>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-exclude_internal_topics>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-key_deserializer_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_poll_records>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_poll_records>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
| <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-ssl_endpoint_identification_algorithm>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-ssl_key_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-ssl_keystore_location>> |a valid filesystem path|No
Expand All @@ -132,8 +134,8 @@ input plugins.
[id="plugins-{type}s-{plugin}-auto_commit_interval_ms"]
===== `auto_commit_interval_ms`

* Value type is <<string,string>>
* Default value is `"5000"`
* Value type is <<number,number>>
* Default value is `5000`.

The frequency in milliseconds that the consumer offsets are committed to Kafka.

Expand Down Expand Up @@ -165,12 +167,12 @@ case a server is down).
[id="plugins-{type}s-{plugin}-check_crcs"]
===== `check_crcs`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<boolean,boolean>>
* Default value is `true`

Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
corruption to the messages occurred. This check adds some overhead, so it may be
disabled in cases seeking extreme performance.
Automatically check the CRC32 of the records consumed.
This ensures no on-the-wire or on-disk corruption to the messages occurred.
This check adds some overhead, so it may be disabled in cases seeking extreme performance.

[id="plugins-{type}s-{plugin}-client_id"]
===== `client_id`
Expand Down Expand Up @@ -198,8 +200,8 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+
[id="plugins-{type}s-{plugin}-connections_max_idle_ms"]
===== `connections_max_idle_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `540000` milliseconds (9 minutes).

Close idle connections after the number of milliseconds specified by this config.

Expand Down Expand Up @@ -230,8 +232,8 @@ This will add a field named `kafka` to the logstash event containing the followi
[id="plugins-{type}s-{plugin}-enable_auto_commit"]
===== `enable_auto_commit`

* Value type is <<string,string>>
* Default value is `"true"`
* Value type is <<boolean,boolean>>
* Default value is `true`

This committed offset will be used when the process fails as the position from
which the consumption will begin.
Expand All @@ -252,8 +254,8 @@ If set to true the only way to receive records from an internal topic is subscri
[id="plugins-{type}s-{plugin}-fetch_max_bytes"]
===== `fetch_max_bytes`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `52428800` (50MB)

The maximum amount of data the server should return for a fetch request. This is not an
absolute maximum, if the first message in the first non-empty partition of the fetch is larger
Expand All @@ -262,8 +264,8 @@ than this value, the message will still be returned to ensure that the consumer
[id="plugins-{type}s-{plugin}-fetch_max_wait_ms"]
===== `fetch_max_wait_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `500` milliseconds.

The maximum amount of time the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy `fetch_min_bytes`. This
Expand All @@ -272,7 +274,7 @@ should be less than or equal to the timeout used in `poll_timeout_ms`
[id="plugins-{type}s-{plugin}-fetch_min_bytes"]
===== `fetch_min_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The minimum amount of data the server should return for a fetch request. If insufficient
Expand All @@ -292,8 +294,8 @@ Logstash instances with the same `group_id`
[id="plugins-{type}s-{plugin}-heartbeat_interval_ms"]
===== `heartbeat_interval_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `3000` milliseconds (3 seconds).

The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure
that the consumer's session stays active and to facilitate rebalancing when new
Expand Down Expand Up @@ -343,8 +345,8 @@ Java Class used to deserialize the record's key
[id="plugins-{type}s-{plugin}-max_partition_fetch_bytes"]
===== `max_partition_fetch_bytes`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `1048576` (1MB).

The maximum amount of data per-partition the server will return. The maximum total memory used for a
request will be `#partitions * max.partition.fetch.bytes`. This size must be at least
Expand All @@ -355,28 +357,28 @@ to fetch a large message on a certain partition.
[id="plugins-{type}s-{plugin}-max_poll_interval_ms"]
===== `max_poll_interval_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `300000` milliseconds (5 minutes).

The maximum delay between invocations of poll() when using consumer group management. This places
an upper bound on the amount of time that the consumer can be idle before fetching more records.
If poll() is not called before expiration of this timeout, then the consumer is considered failed and
the group will rebalance in order to reassign the partitions to another member.
The value of the configuration `request_timeout_ms` must always be larger than max_poll_interval_ms
The value of the configuration `request_timeout_ms` must always be larger than `max_poll_interval_ms`. ???

[id="plugins-{type}s-{plugin}-max_poll_records"]
===== `max_poll_records`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `500`.

The maximum number of records returned in a single call to poll().

[id="plugins-{type}s-{plugin}-metadata_max_age_ms"]
===== `metadata_max_age_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `300000` milliseconds (5 minutes).

The period of time in milliseconds after which we force a refresh of metadata even if
we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
Expand All @@ -402,23 +404,28 @@ implementations.
===== `poll_timeout_ms`

* Value type is <<number,number>>
* Default value is `100`
* Default value is `100` milliseconds.

Time kafka consumer will wait to receive new messages from topics
Time Kafka consumer will wait to receive new messages from topics.

After subscribing to a set of topics, the Kafka consumer automatically joins the group when polling.
The plugin poll-ing in a loop ensures consumer liveness.
Underneath the covers, Kafka client sends periodic heartbeats to the server.
The timeout specified the time to block waiting for input on each poll.

[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `32768` (32KB).

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.

[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
===== `reconnect_backoff_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `50` milliseconds.

The amount of time to wait before attempting to reconnect to a given host.
This avoids repeatedly connecting to a host in a tight loop.
Expand All @@ -427,8 +434,8 @@ This backoff applies to all requests sent by the consumer to the broker.
[id="plugins-{type}s-{plugin}-request_timeout_ms"]
===== `request_timeout_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `40000` milliseconds (40 seconds).

The configuration controls the maximum amount of time the client will wait
for the response of a request. If the response is not received before the timeout
Expand All @@ -438,8 +445,8 @@ retries are exhausted.
[id="plugins-{type}s-{plugin}-retry_backoff_ms"]
===== `retry_backoff_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `100` milliseconds.

The amount of time to wait before attempting to retry a failed fetch request
to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
Expand Down Expand Up @@ -492,16 +499,16 @@ Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SA
[id="plugins-{type}s-{plugin}-send_buffer_bytes"]
===== `send_buffer_bytes`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `131072` (128KB).

The size of the TCP send buffer (SO_SNDBUF) to use when sending data

[id="plugins-{type}s-{plugin}-session_timeout_ms"]
===== `session_timeout_ms`

* Value type is <<string,string>>
* There is no default value for this setting.
* Value type is <<number,number>>
* Default value is `10000` milliseconds (10 seconds).

The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead
and a rebalance operation is triggered for the group identified by `group_id`
Expand Down Expand Up @@ -561,7 +568,7 @@ The JKS truststore path to validate the Kafka broker's certificate.
* Value type is <<password,password>>
* There is no default value for this setting.

The truststore password
The truststore password.

[id="plugins-{type}s-{plugin}-ssl_truststore_type"]
===== `ssl_truststore_type`
Expand Down
Loading

0 comments on commit a8c923a

Please sign in to comment.