diff --git a/config.go b/config.go index 26e49b039..f6f14f5fc 100644 --- a/config.go +++ b/config.go @@ -296,8 +296,17 @@ type Config struct { // coordinator for the group. UserData []byte } + // support KIP-345 InstanceId string + + // If true, consumer offsets will be automatically reset to configured Initial value + // if the fetched consumer offset is out of range of available offsets. Out of range + // can happen if the data has been deleted from the server, or during situations of + // under-replication where a replica does not have all the data yet. It can be + // dangerous to reset the offset automatically, particularly in the latter case. Defaults + // to true to maintain existing behavior. + ResetInvalidOffsets bool } Retry struct { @@ -499,6 +508,7 @@ func NewConfig() *Config { c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second + c.Consumer.Group.ResetInvalidOffsets = false c.ClientID = defaultClientID c.ChannelBufferSize = 256 diff --git a/consumer_group.go b/consumer_group.go index 950a14935..b20edd978 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -991,7 +991,8 @@ type consumerGroupClaim struct { func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) - if errors.Is(err, ErrOffsetOutOfRange) { + + if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets { offset = sess.parent.config.Consumer.Offsets.Initial pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) }