Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for manual consumer definition. #77

Merged
merged 3 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.SchemasService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static io.github.stavshamir.springwolf.asyncapi.Constants.ONE_OF;
import static java.util.stream.Collectors.*;

@Slf4j
@RequiredArgsConstructor
@Component
public class ConsumerChannelScanner implements ChannelsScanner {

private final AsyncApiDocket docket;
private final SchemasService schemasService;

@Override
public Map<String, ChannelItem> scan() {
Map<String, List<ConsumerData>> consumerDataGroupedByChannelName = docket.getConsumers().stream()
.filter(this::allFieldsAreNonNull)
.collect(groupingBy(ConsumerData::getChannelName));

return consumerDataGroupedByChannelName.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> buildChannel(entry.getValue())));
}

private boolean allFieldsAreNonNull(ConsumerData consumerData) {
boolean allNonNull = consumerData.getChannelName() != null
&& consumerData.getPayloadType() != null
&& consumerData.getOperationBinding() != null;

if (!allNonNull) {
log.warn("Some consumer data fields are null - this consumer will not be documented: {}", consumerData);
}

return allNonNull;
}

private ChannelItem buildChannel(List<ConsumerData> consumerDataList) {
// All bindings in the group are assumed to be the same
// AsyncApi does not support multiple bindings on a single channel
Map<String, ? extends ChannelBinding> channelBinding = consumerDataList.get(0).getChannelBinding();
Map<String, ? extends OperationBinding> operationBinding = consumerDataList.get(0).getOperationBinding();

Operation operation = Operation.builder()
.message(getMessageObject(consumerDataList))
.bindings(operationBinding)
.build();

return ChannelItem.builder()
.bindings(channelBinding)
.publish(operation)
.build();
}

private Object getMessageObject(List<ConsumerData> consumerDataList) {
Set<Message> messages = consumerDataList.stream()
.map(this::buildMessage)
.collect(toSet());

return messages.size() == 1
? messages.toArray()[0]
: ImmutableMap.of(ONE_OF, messages);
}

private Message buildMessage(ConsumerData consumerData) {
Class<?> payloadType = consumerData.getPayloadType();
String modelName = schemasService.register(payloadType);

return Message.builder()
.name(payloadType.getName())
.title(modelName)
.payload(PayloadReference.fromModelName(modelName))
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.github.stavshamir.springwolf.asyncapi.types;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

/**
* Holds information about a producer channel.
* All fields must be set and not null.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumerData {

/**
* The name of the channel (topic, queue etc.).
*/
protected String channelName;

/**
* The channel binding of the producer.
* <br>
* For example:
* <code>
* ImmutableMap.of("kafka", new KafkaChannelBinding())
* </code>
*/
protected Map<String, ? extends ChannelBinding> channelBinding;

/**
* The class object of the payload published by this producer.
*/
protected Class<?> payloadType;

/**
* The operation binding of the producer.
* <br>
* For example:
* <code>
* ImmutableMap.of("kafka", new KafkaOperationBinding())
* </code>
*/
protected Map<String, ? extends OperationBinding> operationBinding;

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.asyncapi.v2.model.info.Info;
import com.asyncapi.v2.model.server.Server;
import io.github.stavshamir.springwolf.asyncapi.scanners.components.*;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -48,6 +49,9 @@ public class AsyncApiDocket {
@Singular
private final List<ProducerData> producers;

@Singular
private final List<ConsumerData> consumers;

@SuppressWarnings("unused")
public static class AsyncApiDocketBuilder {
@SuppressWarnings("FieldCanBeLocal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.asyncapi.v2.model.info.Info;
import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ConsumerChannelScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ProducerChannelScanner;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
Expand All @@ -27,7 +29,8 @@
DefaultAsyncApiService.class,
DefaultChannelsService.class,
DefaultSchemasService.class,
ProducerChannelScanner.class
ProducerChannelScanner.class,
ConsumerChannelScanner.class
})
@Import(DefaultAsyncApiServiceTest.DefaultAsyncApiServiceTestConfiguration.class)
public class DefaultAsyncApiServiceTest {
Expand All @@ -48,11 +51,17 @@ public AsyncApiDocket docket() {
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.build();

ConsumerData kafkaConsumerData = ConsumerData.builder()
.channelName("consumer-topic")
.payloadType(String.class)
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding())).build();

return AsyncApiDocket.builder()
.info(info)
.basePackage("package")
.server("kafka", Server.builder().protocol("kafka").url("kafka:9092").build())
.producer(kafkaProducerData)
.consumer(kafkaConsumerData)
.build();
}

Expand Down Expand Up @@ -92,4 +101,16 @@ public void getAsyncAPI_producers_should_be_correct() {
assertThat(channel.getSubscribe()).isNotNull();
}

@Test
public void getAsyncAPI_consumers_should_be_correct() {
Map<String, ChannelItem> actualChannels = asyncApiService.getAsyncAPI().getChannels();

assertThat(actualChannels)
.isNotEmpty()
.containsKey("consumer-topic");

final ChannelItem channel = actualChannels.get("consumer-topic");
assertThat(channel.getPublish()).isNotNull();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Map;
import java.util.Set;

import static io.github.stavshamir.springwolf.asyncapi.Constants.ONE_OF;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ConsumerChannelScanner.class, DefaultSchemasService.class})
public class ConsumerChannelScannerTest {

@Autowired
private ConsumerChannelScanner scanner;

@MockBean
private AsyncApiDocket asyncApiDocket;

@Test
public void allFieldsConsumerData() {
// Given a consumer data with all fields set
String channelName = "example-consumer-topic-foo1";
ConsumerData consumerData = ConsumerData.builder()
.channelName(channelName)
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();

when(asyncApiDocket.getConsumers()).thenReturn(ImmutableList.of(consumerData));

// When scanning for consumers
Map<String, ChannelItem> consumerChannels = scanner.scan();

// Then the channel should be created correctly
assertThat(consumerChannels)
.containsKey(channelName);

Operation operation = Operation.builder()
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(Message.builder()
.name(ExamplePayloadDto.class.getName())
.title(ExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName()))
.build())
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.publish(operation)
.build();

assertThat(consumerChannels.get(channelName))
.isEqualTo(expectedChannel);
}

@Test
public void missingFieldConsumerData() {
// Given a consumer data with missing fields
String channelName = "example-consumer-topic-foo1";
ConsumerData consumerData = ConsumerData.builder()
.channelName(channelName)
.build();

when(asyncApiDocket.getConsumers()).thenReturn(ImmutableList.of(consumerData));

// When scanning for consumers
Map<String, ChannelItem> consumerChannels = scanner.scan();

// Then the channel is not created, and no exception is thrown
assertThat(consumerChannels).isEmpty();
}

@Test
public void multipleConsumersForSameTopic() {
// Given a multiple ConsumerData objects for the same topic
String channelName = "example-consumer-topic";

ConsumerData consumerData1 = ConsumerData.builder()
.channelName(channelName)
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();

ConsumerData consumerData2 = ConsumerData.builder()
.channelName(channelName)
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(AnotherExamplePayloadDto.class)
.build();

when(asyncApiDocket.getConsumers()).thenReturn(ImmutableList.of(consumerData1, consumerData2));

// When scanning for consumers
Map<String, ChannelItem> consumerChannels = scanner.scan();

// Then one channel is created for the ConsumerData objects with multiple messages
assertThat(consumerChannels)
.hasSize(1)
.containsKey(channelName);

Set<Message> messages = ImmutableSet.of(
Message.builder()
.name(ExamplePayloadDto.class.getName())
.title(ExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName()))
.build(),
Message.builder()
.name(AnotherExamplePayloadDto.class.getName())
.title(AnotherExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName()))
.build()
);

Operation operation = Operation.builder()
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(ImmutableMap.of(ONE_OF, messages))
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.publish(operation)
.build();

assertThat(consumerChannels.get(channelName))
.isEqualTo(expectedChannel);
}

static class ExamplePayloadDto {
private String foo;
}

static class AnotherExamplePayloadDto {
private String bar;
}

}
Loading