From dc2d580564cdc2150b6cf4a6099748cf69b50b51 Mon Sep 17 00:00:00 2001 From: prmellor Date: Tue, 2 Jul 2024 09:12:29 +0100 Subject: [PATCH 1/3] docs(tuning): consumer config: updates content on rebalances Signed-off-by: prmellor --- .../con-consumer-config-properties.adoc | 87 +++++++++++++------ 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/documentation/modules/managing/con-consumer-config-properties.adoc b/documentation/modules/managing/con-consumer-config-properties.adoc index b3e675c320f..f446644bfb8 100644 --- a/documentation/modules/managing/con-consumer-config-properties.adoc +++ b/documentation/modules/managing/con-consumer-config-properties.adoc @@ -250,6 +250,19 @@ A shorter interval between consecutive heartbeats allows for quicker detection o The heartbeat interval must be lower, usually by a third, than the session timeout. Decreasing the heartbeat interval reduces the chance of accidental rebalancing, but more frequent heartbeats increases the overhead on broker resources. +[source,env] +---- +# ... +max.poll.records=500 # <1> +session.timeout.ms=45000 # <2> +heartbeat.interval.ms=3000 # <3> +# ... +---- +<1> Sets the number of processed records returned from the consumer. +<2> Adjust the heartbeat interval lower according to anticipated rebalances. +<3> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated. +If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. + == Managing offset policy Use the `auto.offset.reset` property to control how a consumer behaves when no offsets have been committed, @@ -267,22 +280,18 @@ If a consumer group or standalone consumer is inactive and commits no offsets du [source,env] ---- # ... -heartbeat.interval.ms=3000 <1> -session.timeout.ms=45000 <2> -auto.offset.reset=earliest <3> +auto.offset.reset=earliest # <1> # ... ---- -<1> Adjust the heartbeat interval lower according to anticipated rebalances. -<2> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated. -If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. -<3> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed. +<1> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed. If the amount of data returned in a single fetch request is large, a timeout might occur before the consumer has processed it. In this case, you can lower `max.partition.fetch.bytes` or increase `session.timeout.ms`. -== Minimizing the impact of rebalances +== Minimizing the impact of rebalances +Rebalances in Kafka consumer groups can introduce latency and reduce throughput, impacting overall service performance. The rebalancing of a partition between active consumers in a group is the time it takes for the following to take place: * Consumers to commit their offsets @@ -290,26 +299,54 @@ The rebalancing of a partition between active consumers in a group is the time i * The group leader to assign partitions to group members * The consumers in the group to receive their assignments and start fetching -The rebalancing process can increase the downtime of a service, particularly if it happens repeatedly during a rolling restart of a consumer group cluster. +Rebalances are triggered by changes in consumer health, network issues, configuration updates, and scaling events. +This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of a consumer group cluster. +To minimize the impact of rebalances, consider the following strategies and configurations: + +Static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance. +Static membership introduces persistence so static consumers retain partition assignments across restarts, reducing unnecessary rebalances. -In this situation, you can introduce _static membership_ by assigning a unique identifier (`group.instance.id`) to each consumer instance within the group. -Static membership uses persistence so that a consumer instance is recognized during a restart after a session timeout. -Consequently, the consumer maintains its assignment of topic partitions, reducing unnecessary rebalancing when it rejoins the group after a failure or restart. - -Additionally, adjusting the `max.poll.interval.ms` configuration can prevent rebalances caused by prolonged processing tasks, allowing you to specify the maximum interval between polls for new messages. -Use the `max.poll.records` property to cap the number of records returned from the consumer buffer during each poll. -Reducing the number of records allows the consumer to process fewer messages more efficiently. -In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads. -This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records. +Poll intervals and record limits:: +* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between polls. +* Use the `max.poll.records` property to limit the number of records returned during each poll. +Processing fewer messages more efficiently can prevent delays. -[source,shell,subs="+quotes"] +Session timeout and heartbeat intervals:: +* Use the `session.timeout.ms` property to set a longer timeout to reduce rebalances caused by temporary network glitches or minor processing delays. +* Adjust the `heartbeat.interval.ms` property to balance failure detection checks with minimizing unnecessary rebalances. + +Adopt partition assignment strategies:: Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers. + +Monitor consumer health:: Instability in consumer applications, such as frequent crashes, can trigger rebalances. +Use Kafka consumer metrics to monitor such things as rebalance rates, session timouts, and failed fetch requests. + +.Example configuration to minimize the impact of rebalances +[source,shell] ---- # ... -group.instance.id=_UNIQUE-ID_ <1> -max.poll.interval.ms=300000 <2> -max.poll.records=500 <3> +group.instance.id= +max.poll.interval.ms=300000 +max.poll.records=500 +session.timeout.ms=45000 +heartbeat.interval.ms=3000 +partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor # ... ---- -<1> The unique instance id ensures that a new consumer instance receives the same assignment of topic partitions. -<2> Set the interval to check the consumer is continuing to process messages. -<3> Sets the number of processed records returned from the consumer. + +.Scaling strategies +To minimize the impact of rebalances during scaling of consumer groups, consider the following approaches: + +Set a rebalance delay:: Use the `group.initial.rebalance.delay.ms` property in the Kafka configuration to delay the time it takes for consumers to join a new consumer group before performing a rebalance. + +Avoid frequent scaling:: +* Keep the number of consumers stable, scaling only when necessary and in controlled increments. +* Monitor system performance and adjust your scaling strategy as needed. +* Use the Kafka Exporter to check for consumer lag and determine if scaling is required. + +Implement dynamic scaling policies:: +* If using dynamic or event-driven tools for scaling of consumer applications, set lag thresholds based on the backlog of messages. +* Define maximum and minimum replica counts for consumer groups. +* Set periods between scaling events to prevent rapid scaling. + +NOTE: In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads. +This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records. \ No newline at end of file From 91469b20071b70b270c345c4d54cd21a071e6416 Mon Sep 17 00:00:00 2001 From: prmellor Date: Wed, 3 Jul 2024 09:51:20 +0100 Subject: [PATCH 2/3] docs(review): edits to doc from comments by MM -- 01 Signed-off-by: prmellor --- .../con-consumer-config-properties.adoc | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/documentation/modules/managing/con-consumer-config-properties.adoc b/documentation/modules/managing/con-consumer-config-properties.adoc index f446644bfb8..87534c0b4ae 100644 --- a/documentation/modules/managing/con-consumer-config-properties.adoc +++ b/documentation/modules/managing/con-consumer-config-properties.adoc @@ -253,15 +253,15 @@ Decreasing the heartbeat interval reduces the chance of accidental rebalancing, [source,env] ---- # ... -max.poll.records=500 # <1> -session.timeout.ms=45000 # <2> -heartbeat.interval.ms=3000 # <3> +max.poll.records=100 # <1> +session.timeout.ms=60000 # <2> +heartbeat.interval.ms=10000 # <3> # ... ---- -<1> Sets the number of processed records returned from the consumer. -<2> Adjust the heartbeat interval lower according to anticipated rebalances. -<3> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated. +<1> Set the number records returned to the consumer when calling the `poll()` method. +<2> Set the timeout for detecting client failure. If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. +<3> Adjust the heartbeat interval lower according to anticipated rebalances. == Managing offset policy @@ -300,23 +300,32 @@ The rebalancing of a partition between active consumers in a group is the time i * The consumers in the group to receive their assignments and start fetching Rebalances are triggered by changes in consumer health, network issues, configuration updates, and scaling events. -This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of a consumer group cluster. +This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of consumers in a group. + To minimize the impact of rebalances, consider the following strategies and configurations: -Static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance. +Assess throughput and parallelism:: Assess the expected throughput (bytes and records per second) and parallelism (number of partitions) of the input topics against the number of consumers. ++ +If adjustments are needed, start by setting up static membership, adopting a partition assignment strategy, and setting a limit on the number of records returned using the `max.poll.records` property. +Add further configurations for timeouts and intervals, if required and with care, as these can introduce issues related to the handling of failures. + +Use static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance. Static membership introduces persistence so static consumers retain partition assignments across restarts, reducing unnecessary rebalances. -Poll intervals and record limits:: -* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between polls. +Adopt partition assignment strategies:: +* Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers. +* The `org.apache.kafka.clients.consumer.CooperativeStickyAssignor` strategy is particularly effective, as it ensures minimal partition movement and better stability during rebalances. + +Adjust record limits and poll intervals:: * Use the `max.poll.records` property to limit the number of records returned during each poll. -Processing fewer messages more efficiently can prevent delays. +Processing fewer messages more efficiently can prevent delays. +* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between calls to the `poll()` method. +* Alternatively, consider pausing partitions to retrieve fewer records at a time. -Session timeout and heartbeat intervals:: +Adjust session timeout and heartbeat intervals:: * Use the `session.timeout.ms` property to set a longer timeout to reduce rebalances caused by temporary network glitches or minor processing delays. * Adjust the `heartbeat.interval.ms` property to balance failure detection checks with minimizing unnecessary rebalances. -Adopt partition assignment strategies:: Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers. - Monitor consumer health:: Instability in consumer applications, such as frequent crashes, can trigger rebalances. Use Kafka consumer metrics to monitor such things as rebalance rates, session timouts, and failed fetch requests. @@ -337,10 +346,14 @@ partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStick To minimize the impact of rebalances during scaling of consumer groups, consider the following approaches: Set a rebalance delay:: Use the `group.initial.rebalance.delay.ms` property in the Kafka configuration to delay the time it takes for consumers to join a new consumer group before performing a rebalance. +Introducing a delay helps avoid triggering several rebalances when starting multiple consumers near the same time. +The appropriate delay depends on the orchestration used and might not be suitable in some circumstances. Avoid frequent scaling:: * Keep the number of consumers stable, scaling only when necessary and in controlled increments. * Monitor system performance and adjust your scaling strategy as needed. +** Lag per partition should be constant and low. +** Records processed per second by consumers should match the records per second in the input topics. * Use the Kafka Exporter to check for consumer lag and determine if scaling is required. Implement dynamic scaling policies:: @@ -348,5 +361,7 @@ Implement dynamic scaling policies:: * Define maximum and minimum replica counts for consumer groups. * Set periods between scaling events to prevent rapid scaling. -NOTE: In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads. -This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records. \ No newline at end of file +NOTE: In cases where lengthy message processing is unavoidable, consider pausing and resuming partitions as needed. +If you pause all partitions, `poll()` returns no records, allowing you to keep calling it without overwhelming the consumers. +Alternatively, you can offload the processing tasks to a pool of worker threads. +This helps prevents delays and potential rebalances. \ No newline at end of file From 65221e775131b2db0e2cbc688aa0d8fc4e437048 Mon Sep 17 00:00:00 2001 From: prmellor Date: Thu, 4 Jul 2024 11:19:55 +0100 Subject: [PATCH 3/3] docs(updates to doc from comments by MM and LC - hearbeat interval and session timeout): Signed-off-by: prmellor --- .../managing/con-consumer-config-properties.adoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/documentation/modules/managing/con-consumer-config-properties.adoc b/documentation/modules/managing/con-consumer-config-properties.adoc index 87534c0b4ae..9221181ab0e 100644 --- a/documentation/modules/managing/con-consumer-config-properties.adoc +++ b/documentation/modules/managing/con-consumer-config-properties.adoc @@ -254,14 +254,14 @@ Decreasing the heartbeat interval reduces the chance of accidental rebalancing, ---- # ... max.poll.records=100 # <1> -session.timeout.ms=60000 # <2> -heartbeat.interval.ms=10000 # <3> +session.timeout.ms=30000 # <2> +heartbeat.interval.ms=5000 # <3> # ... ---- <1> Set the number records returned to the consumer when calling the `poll()` method. <2> Set the timeout for detecting client failure. If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. -<3> Adjust the heartbeat interval lower according to anticipated rebalances. +<3> Adjust the heartbeat interval according to anticipated rebalances. == Managing offset policy @@ -336,8 +336,8 @@ Use Kafka consumer metrics to monitor such things as rebalance rates, session ti group.instance.id= max.poll.interval.ms=300000 max.poll.records=500 -session.timeout.ms=45000 -heartbeat.interval.ms=3000 +session.timeout.ms=30000 +heartbeat.interval.ms=5000 partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor # ... ----