Skip to content

Commit

Permalink
contrib/IBM/sarama.v1: stop using mockbroker for tests (#3104)
Browse files Browse the repository at this point in the history
  • Loading branch information
rarguelloF authored Jan 20, 2025
1 parent c9ff7bb commit e394045
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 167 deletions.
33 changes: 14 additions & 19 deletions contrib/IBM/sarama.v1/consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package sarama
import (
"context"
"log"
"os"
"sync"
"testing"

Expand All @@ -21,19 +20,14 @@ import (
)

func TestWrapConsumerGroupHandler(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
cfg := newIntegrationTestConfig(t)
topic := topicName(t)
groupID := "IBM/sarama/TestWrapConsumerGroupHandler"

mt := mocktracer.Start()
defer mt.Stop()

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg)
cg, err := sarama.NewConsumerGroup(kafkaBrokers, groupID, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, cg.Close())
Expand All @@ -44,7 +38,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
ready: make(chan bool),
rcvMessages: make(chan *sarama.ConsumerMessage, 1),
}
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID))
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(groupID))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -57,7 +51,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil {
if err := cg.Consume(ctx, []string{topic}, tracedHandler); err != nil {
assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup)
return
}
Expand All @@ -72,13 +66,14 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
log.Println("Sarama consumer up and running!...")

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)

require.NoError(t, err)
p = WrapSyncProducer(cfg, p, WithDataStreams())
defer func() {
assert.NoError(t, p.Close())
}()

produceMsg := &sarama.ProducerMessage{
Topic: testTopic,
Topic: topic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
Expand All @@ -96,28 +91,28 @@ func TestWrapConsumerGroupHandler(t *testing.T) {
s0 := spans[0]
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic gotest_ibm_sarama", s0.Tag(ext.ResourceName))
assert.Equal(t, "Produce Topic "+topic, s0.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s0.Tag("offset"))
assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, testTopic, produceMsg)
assertDSMProducerPathway(t, topic, produceMsg)

s1 := spans[1]
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, "Consume Topic gotest_ibm_sarama", s1.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s1.Tag(ext.ResourceName))
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s1.Tag("offset"))
assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true)
assertDSMConsumerPathway(t, topic, groupID, consumeMsg, true)

assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child")
}
Expand Down
58 changes: 32 additions & 26 deletions contrib/IBM/sarama.v1/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package sarama

import (
"fmt"
"testing"

"github.com/IBM/sarama"
Expand All @@ -18,44 +19,49 @@ import (
)

func TestWrapConsumer(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Version = sarama.MinVersion
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

broker := sarama.NewMockBroker(t, 0)
defer broker.Close()

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader("test-topic", 0, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("test-topic", 0, sarama.OffsetOldest, 0).
SetOffset("test-topic", 0, sarama.OffsetNewest, 1),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")).
SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")),
})
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion

client, err := sarama.NewClient([]string{broker.Addr()}, cfg)
client, err := sarama.NewClient(kafkaBrokers, cfg)
require.NoError(t, err)
defer client.Close()

consumer, err := sarama.NewConsumerFromClient(client)
require.NoError(t, err)
consumer = WrapConsumer(consumer, WithDataStreams())
defer consumer.Close()

consumer = WrapConsumer(consumer, WithDataStreams())
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
defer partitionConsumer.Close()

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0)
p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, p.Close())
}()

for i := 1; i <= 2; i++ {
produceMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("test %d", i)),
Metadata: fmt.Sprintf("test %d", i),
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)
}

msg1 := <-partitionConsumer.Messages()
msg2 := <-partitionConsumer.Messages()
err = partitionConsumer.Close()
require.NoError(t, err)
// wait for the channel to be closed
<-partitionConsumer.Messages()
waitForSpans(mt, 2)

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
Expand All @@ -67,16 +73,16 @@ func TestWrapConsumer(t *testing.T) {
"span context should be injected into the consumer message headers")

assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(0), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, "test-topic", "", msg1, false)
assertDSMConsumerPathway(t, topic, "", msg1, false)
}
{
s := spans[1]
Expand All @@ -86,15 +92,15 @@ func TestWrapConsumer(t *testing.T) {
"span context should be injected into the consumer message headers")

assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(1), s.Tag("offset"))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, "test-topic", "", msg2, false)
assertDSMConsumerPathway(t, topic, "", msg2, false)
}
}
Loading

0 comments on commit e394045

Please sign in to comment.