Skip to content

Commit

Permalink
fix simpleConsumer test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Dec 7, 2017
1 parent 4e0c402 commit 249f403
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions kafka/simple_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestSimpleConsumer_AddPartition(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -121,7 +122,8 @@ func TestSimpleConsumer_AddPartition(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)
m, ok := (<-ch).(*BOF)
Expand Down Expand Up @@ -173,6 +175,7 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -188,7 +191,8 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)

Expand Down Expand Up @@ -242,6 +246,7 @@ func TestSimpleConsumer_ErrorBlocked(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -265,7 +270,8 @@ func TestSimpleConsumer_ErrorBlocked(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)
m, ok := (<-ch).(*BOF)
Expand Down

0 comments on commit 249f403

Please sign in to comment.