From 210c5f49083963b2c44fa5a49af603d5c64bff74 Mon Sep 17 00:00:00 2001 From: Vishal Patel <38330754+patelvp@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:07:16 -0500 Subject: [PATCH] WIP write pseudo logic on how the scaling evenely based on parition count would work --- pkg/scalers/kafka_scaler.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index f8e4c978e64..7258ac312db 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -977,8 +977,14 @@ func (s *kafkaScaler) getTotalLag() (int64, int64, error) { s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold)) if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag { - // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings upperBound := totalTopicPartitions + // Ensure that the number of partitions is evenly distributed across the number of consumers + if s.metadata.ensureEvenDistributionOfPartitions { + // How do we find the current number of consumers? + nextFactor := GetNextFactor(1, totalTopicPartitions) + totalLag = nextFactor * s.metadata.lagThreshold + } + // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings if s.metadata.limitToPartitionsWithLag { upperBound = partitionsWithLag }