Skip to content

Commit

Permalink
Fix #85
Browse files Browse the repository at this point in the history
- Added support for multiple class level kafka listeners for the same topic

- `operationId` for class level kafka listeners is now assigned the value of the class name instead of the first method name in that class
  • Loading branch information
stavshamir committed Oct 23, 2022
1 parent 29c754f commit 8ce3c36
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,43 @@

import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import lombok.extern.slf4j.Slf4j;

import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Slf4j
public class MessageHelper {
private static final String ONE_OF = "oneOf";

private static final Comparator<Message> byMessageName = Comparator.comparing(Message::getName);
private static final Supplier<TreeSet<Message>> messageSupplier = () -> new TreeSet<>(byMessageName);
private static final Supplier<Set<Message>> messageSupplier = () -> new TreeSet<>(byMessageName);

public static Object toMessageObjectOrComposition(Set<Message> messages) {
return messages.size() == 1
? messages.toArray()[0]
: ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier)));
switch (messages.size()) {
case 0:
throw new IllegalArgumentException("messages must not be empty");
case 1:
return messages.toArray()[0];
default:
return ImmutableMap.of(ONE_OF, messages.stream().collect(Collectors.toCollection(messageSupplier)));
}
}

@SuppressWarnings("unchecked")
public static Set<Message> messageObjectToSet(Object messageObject) {
if (messageObject instanceof Message) {
return new HashSet<>(Arrays.asList((Message) messageObject));
}

if (messageObject instanceof Map) {
Set<Message> messages = ((Map<String, Set<Message>>) messageObject).get(ONE_OF);
return new HashSet<>(messages);
}

log.warn("Message object must contain either a Message or a Map<String, Set<Message>, but contained: {}", messageObject.getClass());
return new HashSet<>();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.github.stavshamir.springwolf.asyncapi;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.messageObjectToSet;
import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class MessageHelperTest {

@Test
public void toMessageObjectOrComposition_emptySet() {
assertThatThrownBy(() -> toMessageObjectOrComposition(Collections.emptySet()))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
public void toMessageObjectOrComposition_oneMessage() {
Message message = Message.builder()
.name("foo")
.build();

Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message));

assertThat(asObject)
.isInstanceOf(Message.class)
.isEqualTo(message);
}


@Test
public void toMessageObjectOrComposition_multipleMessages() {
Message message1 = Message.builder()
.name("foo")
.build();

Message message2 = Message.builder()
.name("bar")
.build();

Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message1, message2));

assertThat(asObject)
.isInstanceOf(Map.class)
.isEqualTo(ImmutableMap.of("oneOf", ImmutableSet.of(message1, message2)));
}

@Test
public void messageObjectToSet_notAMessageOrAMap() {
Object string = "foo";

Set<Message> messages = messageObjectToSet(string);

assertThat(messages)
.isEmpty();
}

@Test
public void messageObjectToSet_Message() {
Message message = Message.builder()
.name("foo")
.build();
Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message));

Set<Message> messages = messageObjectToSet(asObject);

assertThat(messages)
.containsExactly(message);
}

@Test
public void messageObjectToSet_SetOfMessage() {
Message message1 = Message.builder()
.name("foo")
.build();

Message message2 = Message.builder()
.name("bar")
.build();

Object asObject = toMessageObjectOrComposition(ImmutableSet.of(message1, message2));

Set<Message> messages = messageObjectToSet(asObject);

assertThat(messages)
.containsExactlyInAnyOrder(message1, message2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
},
"multi-payload-topic" : {
"publish" : {
"operationId" : "receiveMonetaryAmount_publish",
"operationId" : "ExampleClassLevelKafkaListener_publish",
"description" : "Auto-generated description",
"bindings" : {
"kafka" : { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.asyncapi.v2.model.channel.operation.Operation;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.github.stavshamir.springwolf.asyncapi.MessageHelper;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
Expand All @@ -29,7 +30,8 @@
import java.util.*;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
import static java.util.stream.Collectors.*;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

@Slf4j
@Service
Expand All @@ -52,11 +54,43 @@ public void setEmbeddedValueResolver(StringValueResolver resolver) {
}

public Map<String, ChannelItem> scan() {
return docket.getComponentsScanner().scanForComponents().stream()
Set<Class<?>> components = docket.getComponentsScanner().scanForComponents();
Set<Map.Entry<String, ChannelItem>> channels = mapToChannels(components);
return mergeChannels(channels);
}

private Set<Map.Entry<String, ChannelItem>> mapToChannels(Set<Class<?>> components) {
return components.stream()
.filter(this::isAnnotatedWithKafkaListener)
.map(this::mapClassToChannel)
.filter(Optional::isPresent).map(Optional::get)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(toSet());
}

private Map<String, ChannelItem> mergeChannels(Set<Map.Entry<String, ChannelItem>> channelEntries) {
Map<String, ChannelItem> mergedChannels = new HashMap<>();

for (Map.Entry<String, ChannelItem> entry : channelEntries) {
if (!mergedChannels.containsKey(entry.getKey())) {
mergedChannels.put(entry.getKey(), entry.getValue());
} else {
ChannelItem channelItem = mergedChannels.get(entry.getKey());
Set<Message> mergedMessages = getChannelMessages(channelItem);
Set<Message> currentEntryMessages = getChannelMessages(entry.getValue());
mergedMessages.addAll(currentEntryMessages);
channelItem.getPublish().setMessage(toMessageObjectOrComposition(mergedMessages));
}
}

return mergedChannels;
}

private Set<Message> getChannelMessages(ChannelItem channelItem) {
return Optional
.ofNullable(channelItem.getPublish())
.map(Operation::getMessage)
.map(MessageHelper::messageObjectToSet)
.orElseGet(HashSet::new);
}

private boolean isAnnotatedWithKafkaListener(Class<?> component) {
Expand All @@ -75,7 +109,7 @@ private Optional<Map.Entry<String, ChannelItem>> mapClassToChannel(Class<?> comp
return Optional.empty();
}

ChannelItem channelItem = buildChannel(annotatedMethods, operationBinding);
ChannelItem channelItem = buildChannel(component.getSimpleName(), annotatedMethods, operationBinding);
return Optional.of(Maps.immutableEntry(channelName, channelItem));
}

Expand Down Expand Up @@ -155,8 +189,8 @@ private Set<Method> getAnnotatedMethods(Class<?> component) {
.collect(toSet());
}

private ChannelItem buildChannel(Set<Method> methods, Map<String, ? extends OperationBinding> operationBinding) {
String operationId = methods.stream().findFirst().map(it -> it.getName() + "_publish").orElse("");
private ChannelItem buildChannel(String simpleName, Set<Method> methods, Map<String, ? extends OperationBinding> operationBinding) {
String operationId = simpleName + "_publish";

Operation operation = Operation.builder()
.description("Auto-generated description")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,51 @@ private void setClassToScan(Class<?> classToScan) {
when(componentsScanner.scanForComponents()).thenReturn(classesToScan);
}

private void setClassesToScan(Set<Class<?>> classesToScan) {
when(componentsScanner.scanForComponents()).thenReturn(classesToScan);
}

@Test
public void scan_componentWithMultipleKafkaListenersAndHandlers() {
// Given multiple @KafkaListener annotated classes with method(s) annotated with @KafkaHandler
ImmutableSet<Class<?>> classesToScan = ImmutableSet.of(
KafkaListenerClassWithOneKafkaHandler.class,
KafkaListenerClassWithMultipleKafkaHandler.class
);
setClassesToScan(classesToScan);

// When scan is called
Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();

// Then the returned collection contains the channel with message set to oneOf
Message fooMessage = Message.builder()
.name(SimpleFoo.class.getName())
.title(SimpleFoo.class.getSimpleName())
.payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName()))
.build();

Message barMessage = Message.builder()
.name(SimpleBar.class.getName())
.title(SimpleBar.class.getSimpleName())
.payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName()))
.build();

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("KafkaListenerClassWithMultipleKafkaHandler_publish")
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(toMessageObjectOrComposition(ImmutableSet.of(fooMessage, barMessage)))
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.publish(operation)
.build();

assertThat(actualChannels)
.containsExactly(Maps.immutableEntry(TOPIC, expectedChannel));
}

@Test
public void scan_componentHasNoClassLevelKafkaListenerAnnotation() {
// Given a class with one @KafkaHandler method, but no class level @KafkaListener annotation
Expand Down Expand Up @@ -103,7 +148,7 @@ public void scan_componentWithSingleKafkaHandlerMethod() {

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("methodWithAnnotation_publish")
.operationId("KafkaListenerClassWithOneKafkaHandler_publish")
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(message)
.build();
Expand Down Expand Up @@ -142,7 +187,7 @@ public void scan_componentWithMultipleKafkaHandlerMethods() {

Operation operation = Operation.builder()
.description("Auto-generated description")
.operationId("anotherMethodWithoutAnnotation_publish")
.operationId("KafkaListenerClassWithMultipleKafkaHandler_publish")
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(toMessageObjectOrComposition(ImmutableSet.of(fooMessage, barMessage)))
.build();
Expand Down

0 comments on commit 8ce3c36

Please sign in to comment.