From 9cce89b8922e2b4cef27e8afd55c9df3434881fc Mon Sep 17 00:00:00 2001 From: Alex Bublichenko Date: Wed, 19 Jul 2023 11:37:17 -0400 Subject: [PATCH] [kafka-consumer] Support multiple Kafka topics Resolves issue #4592 Summary of changes: When handling messages use the topic name from PartitionConsumer that reflects the topic where message is consumed from. Signed-off-by: Alex Bublichenko --- cmd/ingester/app/builder/builder.go | 1 - cmd/ingester/app/consumer/consumer.go | 2 +- cmd/ingester/app/consumer/consumer_test.go | 1 - cmd/ingester/app/consumer/processor_factory.go | 7 ++----- cmd/ingester/app/consumer/processor_factory_test.go | 3 +-- 5 files changed, 4 insertions(+), 10 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index d984b5dcd03..a855b531fc4 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -64,7 +64,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit } factoryParams := consumer.ProcessorFactoryParams{ - Topic: options.Topic, Parallelism: options.Parallelism, SaramaConsumer: saramaConsumer, BaseProcessor: spanProcessor, diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 7d9080b68df..af360249a43 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -144,7 +144,7 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { deadlockDetector.incrementMsgCount() if msgProcessor == nil { - msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) + msgProcessor = c.processorFactory.new(pc.Topic(), pc.Partition(), msg.Offset-1) defer msgProcessor.Close() } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index d6f06eed347..be68b5de8f2 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -98,7 +98,6 @@ func newConsumer( Logger: logger, InternalConsumer: consumer, ProcessorFactory: ProcessorFactory{ - topic: topic, consumer: consumer, metricsFactory: metricsFactory, logger: logger, diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 3bef57b8f6c..ba025f9be7a 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -29,7 +29,6 @@ import ( // ProcessorFactoryParams are the parameters of a ProcessorFactory type ProcessorFactoryParams struct { Parallelism int - Topic string BaseProcessor processor.SpanProcessor SaramaConsumer consumer.Consumer Factory metrics.Factory @@ -39,7 +38,6 @@ type ProcessorFactoryParams struct { // ProcessorFactory is a factory for creating startedProcessors type ProcessorFactory struct { - topic string consumer consumer.Consumer metricsFactory metrics.Factory logger *zap.Logger @@ -51,7 +49,6 @@ type ProcessorFactory struct { // NewProcessorFactory constructs a new ProcessorFactory func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) { return &ProcessorFactory{ - topic: params.Topic, consumer: params.SaramaConsumer, metricsFactory: params.Factory, logger: params.Logger, @@ -61,11 +58,11 @@ func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, erro }, nil } -func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { +func (c *ProcessorFactory) new(topic string, partition int32, minOffset int64) processor.SpanProcessor { c.logger.Info("Creating new processors", zap.Int32("partition", partition)) markOffset := func(offset int64) { - c.consumer.MarkPartitionOffset(c.topic, partition, offset, "") + c.consumer.MarkPartitionOffset(topic, partition, offset, "") } om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory) diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index c2b47f9e0d3..f112de72142 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -46,7 +46,6 @@ func Test_new(t *testing.T) { sp.On("Process", mock.Anything).Return(nil) pf := ProcessorFactory{ - topic: topic, consumer: mockConsumer, metricsFactory: metrics.NullFactory, logger: zap.NewNop(), @@ -54,7 +53,7 @@ func Test_new(t *testing.T) { parallelism: 1, } - processor := pf.new(partition, offset) + processor := pf.new(topic, partition, offset) msg := &kmocks.Message{} msg.On("Offset").Return(offset + 1) processor.Process(msg)