Skip to content

Commit

Permalink
WIP write pseudo logic on how the scaling evenely based on parition c…
Browse files Browse the repository at this point in the history
…ount would work
  • Loading branch information
patelvp committed Nov 18, 2024
1 parent 1577600 commit 210c5f4
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,14 @@ func (s *kafkaScaler) getTotalLag() (int64, int64, error) {
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold))

if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag {
// don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings
upperBound := totalTopicPartitions
// Ensure that the number of partitions is evenly distributed across the number of consumers
if s.metadata.ensureEvenDistributionOfPartitions {
// How do we find the current number of consumers?
nextFactor := GetNextFactor(1, totalTopicPartitions)
totalLag = nextFactor * s.metadata.lagThreshold
}
// don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings
if s.metadata.limitToPartitionsWithLag {
upperBound = partitionsWithLag
}
Expand Down

0 comments on commit 210c5f4

Please sign in to comment.