From e394045315d4f02afbb93cdb615be8d5ad3178c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20Arg=C3=BCello?= Date: Mon, 20 Jan 2025 18:02:30 +0100 Subject: [PATCH] contrib/IBM/sarama.v1: stop using mockbroker for tests (#3104) --- contrib/IBM/sarama.v1/consumer_group_test.go | 33 +++-- contrib/IBM/sarama.v1/consumer_test.go | 58 +++++---- contrib/IBM/sarama.v1/producer_test.go | 120 ++++++++----------- contrib/IBM/sarama.v1/sarama_test.go | 86 ++++++------- 4 files changed, 130 insertions(+), 167 deletions(-) diff --git a/contrib/IBM/sarama.v1/consumer_group_test.go b/contrib/IBM/sarama.v1/consumer_group_test.go index 17609f2629..c9f74e0f3b 100644 --- a/contrib/IBM/sarama.v1/consumer_group_test.go +++ b/contrib/IBM/sarama.v1/consumer_group_test.go @@ -8,7 +8,6 @@ package sarama import ( "context" "log" - "os" "sync" "testing" @@ -21,19 +20,14 @@ import ( ) func TestWrapConsumerGroupHandler(t *testing.T) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") - } + cfg := newIntegrationTestConfig(t) + topic := topicName(t) + groupID := "IBM/sarama/TestWrapConsumerGroupHandler" mt := mocktracer.Start() defer mt.Stop() - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 1 - - cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg) + cg, err := sarama.NewConsumerGroup(kafkaBrokers, groupID, cfg) require.NoError(t, err) defer func() { assert.NoError(t, cg.Close()) @@ -44,7 +38,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { ready: make(chan bool), rcvMessages: make(chan *sarama.ConsumerMessage, 1), } - tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID)) + tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(groupID)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -57,7 +51,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims - if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil { + if err := cg.Consume(ctx, []string{topic}, tracedHandler); err != nil { assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup) return } @@ -72,13 +66,14 @@ func TestWrapConsumerGroupHandler(t *testing.T) { log.Println("Sarama consumer up and running!...") p, err := sarama.NewSyncProducer(kafkaBrokers, cfg) - require.NoError(t, err) - require.NoError(t, err) p = WrapSyncProducer(cfg, p, WithDataStreams()) + defer func() { + assert.NoError(t, p.Close()) + }() produceMsg := &sarama.ProducerMessage{ - Topic: testTopic, + Topic: topic, Value: sarama.StringEncoder("test 1"), Metadata: "test", } @@ -96,7 +91,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { s0 := spans[0] assert.Equal(t, "kafka", s0.Tag(ext.ServiceName)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic gotest_ibm_sarama", s0.Tag(ext.ResourceName)) + assert.Equal(t, "Produce Topic "+topic, s0.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s0.OperationName()) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) assert.NotNil(t, s0.Tag("offset")) @@ -104,12 +99,12 @@ func TestWrapConsumerGroupHandler(t *testing.T) { assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) - assertDSMProducerPathway(t, testTopic, produceMsg) + assertDSMProducerPathway(t, topic, produceMsg) s1 := spans[1] assert.Equal(t, "kafka", s1.Tag(ext.ServiceName)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) - assert.Equal(t, "Consume Topic gotest_ibm_sarama", s1.Tag(ext.ResourceName)) + assert.Equal(t, "Consume Topic "+topic, s1.Tag(ext.ResourceName)) assert.Equal(t, "kafka.consume", s1.OperationName()) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) assert.NotNil(t, s1.Tag("offset")) @@ -117,7 +112,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) - assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true) + assertDSMConsumerPathway(t, topic, groupID, consumeMsg, true) assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child") } diff --git a/contrib/IBM/sarama.v1/consumer_test.go b/contrib/IBM/sarama.v1/consumer_test.go index 8b373b2555..4c709f7086 100644 --- a/contrib/IBM/sarama.v1/consumer_test.go +++ b/contrib/IBM/sarama.v1/consumer_test.go @@ -6,6 +6,7 @@ package sarama import ( + "fmt" "testing" "github.com/IBM/sarama" @@ -18,44 +19,49 @@ import ( ) func TestWrapConsumer(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Version = sarama.MinVersion + topic := topicName(t) + mt := mocktracer.Start() defer mt.Stop() - broker := sarama.NewMockBroker(t, 0) - defer broker.Close() - - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader("test-topic", 0, broker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("test-topic", 0, sarama.OffsetOldest, 0). - SetOffset("test-topic", 0, sarama.OffsetNewest, 1), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")). - SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")), - }) - cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion - - client, err := sarama.NewClient([]string{broker.Addr()}, cfg) + client, err := sarama.NewClient(kafkaBrokers, cfg) require.NoError(t, err) defer client.Close() consumer, err := sarama.NewConsumerFromClient(client) require.NoError(t, err) + consumer = WrapConsumer(consumer, WithDataStreams()) defer consumer.Close() - consumer = WrapConsumer(consumer, WithDataStreams()) + partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) + require.NoError(t, err) + defer partitionConsumer.Close() - partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) + p, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) + defer func() { + assert.NoError(t, p.Close()) + }() + + for i := 1; i <= 2; i++ { + produceMsg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(fmt.Sprintf("test %d", i)), + Metadata: fmt.Sprintf("test %d", i), + } + _, _, err = p.SendMessage(produceMsg) + require.NoError(t, err) + } + msg1 := <-partitionConsumer.Messages() msg2 := <-partitionConsumer.Messages() err = partitionConsumer.Close() require.NoError(t, err) // wait for the channel to be closed <-partitionConsumer.Messages() + waitForSpans(mt, 2) spans := mt.FinishedSpans() require.Len(t, spans, 2) @@ -67,16 +73,16 @@ func TestWrapConsumer(t *testing.T) { "span context should be injected into the consumer message headers") assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, "kafka.consume", s.OperationName()) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - assertDSMConsumerPathway(t, "test-topic", "", msg1, false) + assertDSMConsumerPathway(t, topic, "", msg1, false) } { s := spans[1] @@ -86,15 +92,15 @@ func TestWrapConsumer(t *testing.T) { "span context should be injected into the consumer message headers") assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(1), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, "kafka.consume", s.OperationName()) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - assertDSMConsumerPathway(t, "test-topic", "", msg2, false) + assertDSMConsumerPathway(t, topic, "", msg2, false) } } diff --git a/contrib/IBM/sarama.v1/producer_test.go b/contrib/IBM/sarama.v1/producer_test.go index 92308a9eb5..945c4ac956 100644 --- a/contrib/IBM/sarama.v1/producer_test.go +++ b/contrib/IBM/sarama.v1/producer_test.go @@ -17,36 +17,21 @@ import ( ) func TestSyncProducer(t *testing.T) { + cfg := newIntegrationTestConfig(t) + topic := topicName(t) + mt := mocktracer.Start() defer mt.Stop() - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, WithDataStreams()) + defer func() { + assert.NoError(t, producer.Close()) + }() msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", + Topic: topic, Value: sarama.StringEncoder("test 1"), Metadata: "test", } @@ -54,59 +39,44 @@ func TestSyncProducer(t *testing.T) { require.NoError(t, err) spans := mt.FinishedSpans() - assert.Len(t, spans, 1) + require.Len(t, spans, 1) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - assertDSMProducerPathway(t, "my_topic", msg1) + assertDSMProducerPathway(t, topic, msg1) } } func TestSyncProducerSendMessages(t *testing.T) { + cfg := newIntegrationTestConfig(t) + topic := topicName(t) + mt := mocktracer.Start() defer mt.Stop() - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 2 - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, WithDataStreams()) + defer func() { + assert.NoError(t, producer.Close()) + }() msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", + Topic: topic, Value: sarama.StringEncoder("test 1"), Metadata: "test", } msg2 := &sarama.ProducerMessage{ - Topic: "my_topic", + Topic: topic, Value: sarama.StringEncoder("test 2"), Metadata: "test", } @@ -118,7 +88,7 @@ func TestSyncProducerSendMessages(t *testing.T) { for _, s := range spans { assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) @@ -127,7 +97,7 @@ func TestSyncProducerSendMessages(t *testing.T) { } for _, msg := range []*sarama.ProducerMessage{msg1, msg2} { - assertDSMProducerPathway(t, "my_topic", msg) + assertDSMProducerPathway(t, topic, msg) } } @@ -135,19 +105,22 @@ func TestWrapAsyncProducer(t *testing.T) { // the default for producers is a fire-and-forget model that doesn't return // successes t.Run("Without Successes", func(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Producer.Return.Successes = false + topic := topicName(t) + mt := mocktracer.Start() defer mt.Stop() - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg) require.NoError(t, err) - producer = WrapAsyncProducer(nil, producer, WithDataStreams()) + producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) + defer func() { + assert.NoError(t, producer.Close()) + }() msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", + Topic: topic, Value: sarama.StringEncoder("test 1"), } producer.Input() <- msg1 @@ -160,7 +133,7 @@ func TestWrapAsyncProducer(t *testing.T) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) // these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we @@ -172,26 +145,27 @@ func TestWrapAsyncProducer(t *testing.T) { assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - assertDSMProducerPathway(t, "my_topic", msg1) + assertDSMProducerPathway(t, topic, msg1) } }) t.Run("With Successes", func(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Producer.Return.Successes = true + topic := topicName(t) + mt := mocktracer.Start() defer mt.Stop() - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewAsyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) + defer func() { + assert.NoError(t, producer.Close()) + }() msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", + Topic: topic, Value: sarama.StringEncoder("test 1"), } producer.Input() <- msg1 @@ -203,15 +177,15 @@ func TestWrapAsyncProducer(t *testing.T) { s := spans[0] assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s.OperationName()) assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) + assert.NotNil(t, s.Tag("offset")) assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - assertDSMProducerPathway(t, "my_topic", msg1) + assertDSMProducerPathway(t, topic, msg1) } }) } diff --git a/contrib/IBM/sarama.v1/sarama_test.go b/contrib/IBM/sarama.v1/sarama_test.go index e9205ebe0b..e3263329e4 100644 --- a/contrib/IBM/sarama.v1/sarama_test.go +++ b/contrib/IBM/sarama.v1/sarama_test.go @@ -8,6 +8,8 @@ package sarama import ( "context" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "os" + "strings" "testing" "time" @@ -20,11 +22,8 @@ import ( "github.com/stretchr/testify/require" ) -var kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"} - -const ( - testGroupID = "gotest_ibm_sarama" - testTopic = "gotest_ibm_sarama" +var ( + kafkaBrokers = []string{"localhost:9092"} ) func TestNamingSchema(t *testing.T) { @@ -32,6 +31,9 @@ func TestNamingSchema(t *testing.T) { } func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { + cfg := newIntegrationTestConfig(t) + topic := topicName(t) + var opts []Option if serviceOverride != "" { opts = append(opts, WithServiceName(serviceOverride)) @@ -39,75 +41,56 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { mt := mocktracer.Start() defer mt.Stop() - broker := sarama.NewMockBroker(t, 1) - defer broker.Close() - - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader("test-topic", 0, broker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("test-topic", 0, sarama.OffsetOldest, 0). - SetOffset("test-topic", 0, sarama.OffsetNewest, 1), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")), - "ProduceRequest": sarama.NewMockProduceResponse(t). - SetError("test-topic", 0, sarama.ErrNoError), - }) - cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 1 - - producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, cfg) + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) require.NoError(t, err) producer = WrapSyncProducer(cfg, producer, opts...) + defer func() { + assert.NoError(t, producer.Close()) + }() - c, err := sarama.NewConsumer([]string{broker.Addr()}, cfg) + c, err := sarama.NewConsumer(kafkaBrokers, cfg) require.NoError(t, err) - defer func(c sarama.Consumer) { - err := c.Close() - require.NoError(t, err) - }(c) c = WrapConsumer(c, opts...) + defer func() { + assert.NoError(t, c.Close()) + }() msg1 := &sarama.ProducerMessage{ - Topic: "test-topic", + Topic: topic, Value: sarama.StringEncoder("test 1"), Metadata: "test", } _, _, err = producer.SendMessage(msg1) require.NoError(t, err) - pc, err := c.ConsumePartition("test-topic", 0, 0) + pc, err := c.ConsumePartition(topic, 0, 0) require.NoError(t, err) + defer func() { + assert.NoError(t, pc.Close()) + }() + _ = <-pc.Messages() err = pc.Close() require.NoError(t, err) // wait for the channel to be closed <-pc.Messages() + waitForSpans(mt, 2) spans := mt.FinishedSpans() require.Len(t, spans, 2) return spans } -func newMockBroker(t *testing.T) *sarama.MockBroker { - broker := sarama.NewMockBroker(t, 1) - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError) - broker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - for i := 0; i < 10; i++ { - broker.Returns(prodSuccess) +func newIntegrationTestConfig(t *testing.T) *sarama.Config { + if _, ok := os.LookupEnv("INTEGRATION"); !ok { + t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") } - return broker + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 1 + return cfg } // waitForSpans polls the mock tracer until the expected number of spans @@ -162,7 +145,7 @@ func assertDSMConsumerPathway(t *testing.T, topic, groupID string, msg *sarama.C ctx := context.Background() if withProducer { - ctx, _ = tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") + ctx, _ = tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+topicName(t), "type:kafka") } ctx, _ = tracer.SetDataStreamsCheckpoint(ctx, edgeTags...) want, _ := datastreams.PathwayFromContext(ctx) @@ -170,3 +153,8 @@ func assertDSMConsumerPathway(t *testing.T, topic, groupID string, msg *sarama.C assert.NotEqual(t, want.GetHash(), 0) assert.Equal(t, want.GetHash(), got.GetHash()) } + +// topicName returns a unique topic name for the current test, which ensures results are not affected by each other. +func topicName(t *testing.T) string { + return strings.ReplaceAll("IBM/sarama/"+t.Name(), "/", "_") +}