Skip to content

Commit

Permalink
GH-278: fix: Handle identical listeners with different kafka group ids
Browse files Browse the repository at this point in the history
  • Loading branch information
timonback authored and sam0r040 committed Jul 21, 2023
1 parent eba689d commit edb3e1c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation;

import com.asyncapi.v2._6_0.model.channel.ChannelItem;
import com.asyncapi.v2._6_0.model.channel.operation.Operation;
import com.asyncapi.v2.binding.channel.ChannelBinding;
import com.asyncapi.v2.binding.message.MessageBinding;
import com.asyncapi.v2.binding.operation.OperationBinding;
import com.asyncapi.v2._6_0.model.channel.ChannelItem;
import com.asyncapi.v2._6_0.model.channel.operation.Operation;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ComponentClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
Expand Down Expand Up @@ -41,7 +41,7 @@ public Map<String, ChannelItem> scan() {
return componentClassScanner.scan().stream()
.map(this::getAnnotatedMethods).flatMap(Collection::stream)
.map(this::mapMethodToChannel)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (el1, el2) -> el1));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ void scan_componentHasKafkaListenerMethods_withGroupId() {
.isEqualTo(KafkaListenerUtil.buildKafkaGroupIdSchema(ClassWithKafkaListenerAnnotationWithGroupId.GROUP_ID));
}

@Test
void scan_componentHasKafkaListenerMethods_withDifferentGroupId() {
// Given a class with methods annotated with KafkaListener, with a group id
setClassToScan(ClassWithKafkaListenerAnnotationWithDifferentGroupId.class);

// When scan is called
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();

// Then the returned collection contains a correct binding
Map<String, Object> actualBindings = actualChannels.get(TOPIC)
.getPublish()
.getBindings();

assertThat(actualBindings).isNotNull();
KafkaOperationBinding kafka = (KafkaOperationBinding) actualBindings.get("kafka");
assertThat(kafka).isNotNull();
assertThat(kafka.getGroupId())
.isIn(
KafkaListenerUtil.buildKafkaGroupIdSchema(ClassWithKafkaListenerAnnotationWithDifferentGroupId.GROUP_ID_FIRST),
KafkaListenerUtil.buildKafkaGroupIdSchema(ClassWithKafkaListenerAnnotationWithDifferentGroupId.GROUP_ID_SECOND)
);
}

@Test
void scan_componentHasKafkaListenerMethods_multipleParamsWithoutPayloadAnnotation() {
// Given a class with a method annotated with KafkaListener:
Expand Down Expand Up @@ -274,6 +297,20 @@ private void methodWithoutAnnotation() {

}

private static class ClassWithKafkaListenerAnnotationWithDifferentGroupId {
private static final String GROUP_ID_FIRST = "test-group-id-first";
private static final String GROUP_ID_SECOND = "test-group-id-second";

@KafkaListener(topics = TOPIC, groupId = GROUP_ID_FIRST)
private void methodWithAnnotation(SimpleFoo payload) {
}

@KafkaListener(topics = TOPIC, groupId = GROUP_ID_SECOND)
private void sameMethodWithDifferentGroupId(SimpleFoo payload) {
}

}

private static class ClassWithKafkaListenerAnnotationMultipleParamsWithoutPayloadAnnotation {

@KafkaListener(topics = TOPIC)
Expand Down

0 comments on commit edb3e1c

Please sign in to comment.