Skip to content

Commit

Permalink
fix: updated CloudStreamFunctionChannelsScanner scan to merge channel (
Browse files Browse the repository at this point in the history
…#416)

fixes #415

Co-authored-by: Sheheryar <[email protected]>
  • Loading branch information
SheheryarAamir and Sheheryar authored Oct 27, 2023
1 parent 765ecd3 commit 52663b6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.asyncapi.v2._6_0.model.server.Server;
import com.asyncapi.v2.binding.message.MessageBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.beans.BeanMethodsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.types.channel.bindings.EmptyChannelBinding;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.bindings.EmptyOperationBinding;
Expand All @@ -24,8 +25,7 @@
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Set;

import static java.util.stream.Collectors.toMap;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -39,12 +39,12 @@ public class CloudStreamFunctionChannelsScanner implements ChannelsScanner {
@Override
public Map<String, ChannelItem> scan() {
Set<Method> beanMethods = beanMethodsScanner.getBeanMethods();
return beanMethods.stream()
return ChannelMerger.merge(beanMethods.stream()
.map(FunctionalChannelBeanData::fromMethodBean)
.flatMap(Set::stream)
.filter(this::isChannelBean)
.map(this::toChannelEntry)
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toList()));
}

private boolean isChannelBean(FunctionalChannelBeanData beanData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,64 @@ void testKStreamFunctionBinding() {
.contains(Map.entry(inputTopicName, publishChannel), Map.entry(outputTopicName, subscribeChannel));
}

@Test
void testFunctionBindingWithSameTopicName() {
// Given a binding "spring.cloud.stream.bindings.testFunction-in-0.destination=test-topic"
// And a binding "spring.cloud.stream.bindings.testFunction-out-0.destination=test-topic"
String topicName = "test-topic";
BindingProperties testFunctionInBinding = new BindingProperties();
testFunctionInBinding.setDestination(topicName);

BindingProperties testFunctionOutBinding = new BindingProperties();
testFunctionOutBinding.setDestination(topicName);
when(bindingServiceProperties.getBindings())
.thenReturn(Map.of(
"testFunction-in-0", testFunctionInBinding,
"testFunction-out-0", testFunctionOutBinding));

// When scan is called
Map<String, ChannelItem> channels = scanner.scan();

// Then the returned merged channels contain a publish operation and a subscribe operation
Message subscribeMessage = Message.builder()
.name(Integer.class.getName())
.title(Integer.class.getSimpleName())
.payload(PayloadReference.fromModelName(Integer.class.getSimpleName()))
.headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName()))
.bindings(messageBinding)
.build();

Operation subscribeOperation = Operation.builder()
.bindings(operationBinding)
.description("Auto-generated description")
.operationId("test-topic_subscribe_testFunction")
.message(subscribeMessage)
.build();

Message publishMessage = Message.builder()
.name(String.class.getName())
.title(String.class.getSimpleName())
.payload(PayloadReference.fromModelName(String.class.getSimpleName()))
.headers(HeaderReference.fromModelName(AsyncHeaders.NOT_DOCUMENTED.getSchemaName()))
.bindings(messageBinding)
.build();

Operation publishOperation = Operation.builder()
.bindings(operationBinding)
.description("Auto-generated description")
.operationId("test-topic_publish_testFunction")
.message(publishMessage)
.build();

ChannelItem mergedChannel = ChannelItem.builder()
.bindings(channelBinding)
.publish(publishOperation)
.subscribe(subscribeOperation)
.build();

assertThat(channels).contains(Map.entry(topicName, mergedChannel));
}

@TestConfiguration
public static class Configuration {

Expand Down

0 comments on commit 52663b6

Please sign in to comment.