Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka: Add support to configure heartbeat interval and session timeout to kafka's consumer #3375

Merged
12 changes: 12 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ metadata:
Disables consumer retry by setting this to "false".
example: '"true"'
default: '"false"'
- name: heartbeatInterval
type: duration
description: |
The interval between heartbeats to the consumer coordinator.
example: '"5s"'
default: '"3s"'
- name: sessionTimeout
type: duration
description: |
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
example: '"20s"'
default: '"10s"'
- name: version
type: string
description: |
Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Offsets.Initial = k.initialOffset
config.Consumer.Fetch.Min = meta.consumerFetchMin
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.Consumer.Group.Heartbeat.Interval = meta.HeartbeatInterval
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
config.ChannelBufferSize = meta.channelBufferSize

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
Expand Down
4 changes: 4 additions & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type KafkaMetadata struct {
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`
Expand Down Expand Up @@ -158,6 +160,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
consumerFetchDefault: 1024 * 1024,
ClientConnectionTopicMetadataRefreshInterval: defaultClientConnectionTopicMetadataRefreshInterval,
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
HeartbeatInterval: 3 * time.Second,
SessionTimeout: 10 * time.Second,
}

err := metadata.DecodeMetadata(meta, &m)
Expand Down
62 changes: 62 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func getCompleteMetadata() map[string]string {
consumerFetchDefault: "1048576",
consumerFetchMin: "1",
channelBufferSize: "256",
"heartbeatInterval": "2s",
"sessionTimeout": "30s",
}
}

Expand Down Expand Up @@ -132,6 +134,8 @@ func assertMetadata(t *testing.T, meta *KafkaMetadata) {
require.Equal(t, int32(1024*1024), meta.consumerFetchDefault)
require.Equal(t, int32(1), meta.consumerFetchMin)
require.Equal(t, 256, meta.channelBufferSize)
require.Equal(t, 2*time.Second, meta.HeartbeatInterval)
require.Equal(t, 30*time.Second, meta.SessionTimeout)
require.Equal(t, 8*time.Minute, defaultClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 0*time.Minute, defaultClientConnectionKeepAliveInterval)
}
Expand Down Expand Up @@ -443,6 +447,64 @@ func TestMetadataChannelBufferSize(t *testing.T) {
require.Equal(t, 128, meta.channelBufferSize)
}

func TestMetadataHeartbeartInterval(t *testing.T) {
k := getKafka()

t.Run("default heartbeat interval", func(t *testing.T) {
// arrange
m := getBaseMetadata()

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 3*time.Second, meta.HeartbeatInterval)
})

t.Run("with heartbeat interval set", func(t *testing.T) {
// arrange
m := getBaseMetadata()
m["heartbeatInterval"] = "1s"

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 1*time.Second, meta.HeartbeatInterval)
})
}

func TestMetadataSessionTimeout(t *testing.T) {
k := getKafka()

t.Run("default session timeout", func(t *testing.T) {
// arrange
m := getBaseMetadata()

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 10*time.Second, meta.SessionTimeout)
})

t.Run("with session timeout set", func(t *testing.T) {
// arrange
m := getBaseMetadata()
m["sessionTimeout"] = "20s"

// act
meta, err := k.getKafkaMetadata(m)

// assert
require.NoError(t, err)
require.Equal(t, 20*time.Second, meta.SessionTimeout)
})
}

func TestGetEventMetadata(t *testing.T) {
ts := time.Now()

Expand Down
12 changes: 12 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ metadata:
Disables consumer retry by setting this to "false".
example: '"true"'
default: '"false"'
- name: heartbeatInterval
type: duration
description: |
The interval between heartbeats to the consumer coordinator.
example: '"5s"'
default: '"3s"'
- name: sessionTimeout
type: duration
description: |
The maximum time between heartbeats before the consumer is considered inactive and will timeout.
example: '"20s"'
default: '"10s"'
- name: version
type: string
description: |
Expand Down
Loading