Skip to content

Commit

Permalink
Kafka scaler: concurrent offset fetches (#2405)
Browse files Browse the repository at this point in the history
Signed-off-by: VerstraeteBert <[email protected]>
  • Loading branch information
VerstraeteBert authored and zroubalik committed Jan 4, 2022
1 parent 60d1e8f commit 4828a98
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 28 deletions.
88 changes: 66 additions & 22 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/Shopify/sarama"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -214,18 +215,13 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
return false, err
}

offsets, err := s.getOffsets(partitions)
if err != nil {
return false, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
if err != nil {
return false, err
}

for _, partition := range partitions {
lag, err := s.getLagForPartition(partition, offsets, topicOffsets)
lag, err := s.getLagForPartition(partition, consumerOffsets, producerOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
Expand Down Expand Up @@ -307,7 +303,7 @@ func (s *kafkaScaler) getPartitions() ([]int32, error) {
return partitions, nil
}

func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
func (s *kafkaScaler) getConsumerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{
s.metadata.topic: partitions,
})
Expand Down Expand Up @@ -364,26 +360,57 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS
return []v2beta2.MetricSpec{metricSpec}
}

type consumerOffsetResult struct {
consumerOffsets *sarama.OffsetFetchResponse
err error
}

type producerOffsetResult struct {
producerOffsets map[int32]int64
err error
}

func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, map[int32]int64, error) {
consumerChan := make(chan consumerOffsetResult, 1)
go func() {
consumerOffsets, err := s.getConsumerOffsets(partitions)
consumerChan <- consumerOffsetResult{consumerOffsets, err}
}()

producerChan := make(chan producerOffsetResult, 1)
go func() {
producerOffsets, err := s.getProducerOffsets(partitions)
producerChan <- producerOffsetResult{producerOffsets, err}
}()

consumerRes := <-consumerChan
if consumerRes.err != nil {
return nil, nil, consumerRes.err
}

producerRes := <-producerChan
if producerRes.err != nil {
return nil, nil, producerRes.err
}

return consumerRes.consumerOffsets, producerRes.producerOffsets, nil
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
partitions, err := s.getPartitions()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

offsets, err := s.getOffsets(partitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

totalLag := int64(0)
for _, partition := range partitions {
lag, _ := s.getLagForPartition(partition, offsets, topicOffsets)
lag, _ := s.getLagForPartition(partition, consumerOffsets, producerOffsets)

totalLag += lag
}
Expand All @@ -406,7 +433,12 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) {
type brokerOffsetResult struct {
offsetResp *sarama.OffsetResponse
err error
}

func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
version = 1
Expand All @@ -430,17 +462,29 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro
request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

offsets := make(map[int32]int64)

// Step 2: send requests, one per broker, and collect offsets
resultCh := make(chan brokerOffsetResult, len(requests))
var wg sync.WaitGroup
wg.Add(len(requests))
for broker, request := range requests {
response, err := broker.GetAvailableOffsets(request)
go func(brCopy *sarama.Broker, reqCopy *sarama.OffsetRequest) {
defer wg.Done()
response, err := brCopy.GetAvailableOffsets(reqCopy)
resultCh <- brokerOffsetResult{response, err}
}(broker, request)
}

if err != nil {
return nil, err
wg.Wait()
close(resultCh)

offsets := make(map[int32]int64)

for brokerOffsetRes := range resultCh {
if brokerOffsetRes.err != nil {
return nil, brokerOffsetRes.err
}

for _, blocks := range response.Blocks {
for _, blocks := range brokerOffsetRes.offsetResp.Blocks {
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Prerequisits
## Prerequisites

- [node](https://nodejs.org/en/)
- `kubectl` logged into a Kubernetes cluster.
Expand Down
10 changes: 5 additions & 5 deletions tests/scalers/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const defaultKafkaClient = 'kafka-client'
const strimziOperatorVersion = '0.18.0'
const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`

const strimziOperatroYamlFile = tmp.fileSync()
const strimziOperatorYamlFile = tmp.fileSync()
const kafkaClusterYamlFile = tmp.fileSync()
const kafkaTopicYamlFile = tmp.fileSync()
const kafkaClientYamlFile = tmp.fileSync()
Expand All @@ -25,10 +25,10 @@ test.before('Set up, create necessary resources.', t => {
sh.exec(`kubectl create namespace ${defaultNamespace}`)

const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout
fs.writeFileSync(strimziOperatroYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`))
fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`))
t.is(
0,
sh.exec(`kubectl apply -f ${strimziOperatroYamlFile.name} --namespace ${defaultNamespace}`).code,
sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Strimzi operator should work.'
)

Expand Down Expand Up @@ -195,7 +195,7 @@ test.after.always('Clean up, delete created resources.', t => {
`${kafkaClientYamlFile.name}`,
`${kafkaTopicYamlFile.name}`,
`${kafkaClusterYamlFile.name}`,
`${strimziOperatroYamlFile}`
`${strimziOperatorYamlFile}`
]

for (const resource of resources) {
Expand All @@ -212,7 +212,7 @@ metadata:
spec:
kafka:
version: 2.5.0
replicas: 1
replicas: 3
listeners:
plain: {}
tls: {}
Expand Down

0 comments on commit 4828a98

Please sign in to comment.