diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 7e2c3bc3a381c..6be7fe1b0f44c 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -143,14 +143,16 @@ func (s *ReaderService) starting(ctx context.Context) error { if lastCommittedOffset == int64(KafkaEndOffset) { level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset)) - lastCommittedOffset = int64(KafkaStartOffset) + } else { + level.Debug(r.logger).Log("msg", "last committed offset", "offset", lastCommittedOffset) } + consumeOffset := kafkaStartOffset if lastCommittedOffset >= 0 { - lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset. + // Read from the next offset. + consumeOffset = lastCommittedOffset + 1 } - - s.reader.SetOffsetForConsumption(lastCommittedOffset) + s.reader.SetOffsetForConsumption(consumeOffset) if targetLag, maxLag := s.cfg.TargetConsumerLagAtStartup, s.cfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { consumer, err := s.consumerFactory(s.committer)