Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to describe exchange and routing key for AMQP consumer and producer #69

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.github.stavshamir.springwolf.asyncapi;

import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaChannelBindingSerializer;
import io.github.stavshamir.springwolf.asyncapi.serializers.KafkaOperationBindingSerializer;
import io.github.stavshamir.springwolf.asyncapi.types.AsyncAPI;
import org.springframework.stereotype.Service;
Expand All @@ -26,6 +28,7 @@ void postConstruct() {

private void registerKafkaOperationBindingSerializer() {
SimpleModule module = new SimpleModule();
module.addSerializer(KafkaChannelBinding.class, new KafkaChannelBindingSerializer());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment in KafkaChannelBindingSerializer.

module.addSerializer(KafkaOperationBinding.class, new KafkaOperationBindingSerializer());
jsonMapper.registerModule(module);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
Expand Down Expand Up @@ -48,7 +49,13 @@ public Map<String, ChannelItem> scan() {

/**
* @param annotation An instance of a listener annotation.
* @return A map containing an operation binding pointed to by the the protocol binding name.
* @return A map containing the channel binding pointed to by the protocol binding name.
*/
protected abstract Map<String, ? extends ChannelBinding> buildChannelBinding(T annotation);

/**
* @param annotation An instance of a listener annotation.
* @return A map containing an operation binding pointed to by the protocol binding name.
*/
protected abstract Map<String, ? extends OperationBinding> buildOperationBinding(T annotation);

Expand Down Expand Up @@ -76,14 +83,17 @@ private Map.Entry<String, ChannelItem> mapMethodToChannel(Method method) {

String channelName = getChannelName(annotation);

Map<String, ? extends ChannelBinding> channelBinding = buildChannelBinding(annotation);
Map<String, ? extends OperationBinding> operationBinding = buildOperationBinding(annotation);
Class<?> payload = getPayloadType(method);
ChannelItem channel = buildChannel(payload, operationBinding);
ChannelItem channel = buildChannel(channelBinding, payload, operationBinding);

return Maps.immutableEntry(channelName, channel);
}

private ChannelItem buildChannel(Class<?> payloadType, Map<String, ? extends OperationBinding> operationBinding) {
private ChannelItem buildChannel(Map<String, ? extends ChannelBinding> channelBinding,
Class<?> payloadType,
Map<String, ? extends OperationBinding> operationBinding) {
String modelName = schemasService.register(payloadType);

Message message = Message.builder()
Expand All @@ -98,6 +108,7 @@ private ChannelItem buildChannel(Class<?> payloadType, Map<String, ? extends Ope
.build();

return ChannelItem.builder()
.bindings(channelBinding)
.publish(operation)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
Expand Down Expand Up @@ -41,7 +42,7 @@ public Map<String, ChannelItem> scan() {
private boolean allFieldsAreNonNull(ProducerData producerData) {
boolean allNonNull = producerData.getChannelName() != null
&& producerData.getPayloadType() != null
&& producerData.getBinding() != null;
&& producerData.getOperationBinding() != null;

if (!allNonNull) {
log.warn("Some producer data fields are null - this producer will not be documented: {}", producerData);
Expand All @@ -53,14 +54,16 @@ private boolean allFieldsAreNonNull(ProducerData producerData) {
private ChannelItem buildChannel(List<ProducerData> producerDataList) {
// All bindings in the group are assumed to be the same
// AsyncApi does not support multiple bindings on a single channel
Map<String, ? extends OperationBinding> binding = producerDataList.get(0).getBinding();
Map<String, ? extends ChannelBinding> channelBinding = producerDataList.get(0).getChannelBinding();
Map<String, ? extends OperationBinding> operationBinding = producerDataList.get(0).getOperationBinding();

Operation operation = Operation.builder()
.message(getMessageObject(producerDataList))
.bindings(binding)
.bindings(operationBinding)
.build();

return ChannelItem.builder()
.bindings(channelBinding)
.subscribe(operation)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.stavshamir.springwolf.asyncapi.serializers;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this at this point - currently the binding tab in the UI displays only the operation binding, so this will appear only in the document itself. It's ok to have it serialized as it is without this custom serializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it because the endpoint wouldn't work, it resulted in a 500 internal error failing to serialize the channelBinding field. Is there other way to fix this problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok, so let's keep it, I can figure it out later.



import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;

public class KafkaChannelBindingSerializer extends StdSerializer<KafkaChannelBinding> {

public KafkaChannelBindingSerializer() {
this(null);
}

public KafkaChannelBindingSerializer(Class<KafkaChannelBinding> t) {
super(t);
}

@Override
public void serialize(KafkaChannelBinding value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeStartObject();
gen.writeEndObject();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.types;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import lombok.*;

Expand All @@ -20,19 +21,29 @@ public class ProducerData {
*/
private String channelName;

/**
* The channel binding of the producer.
* <br>
* For example:
* <code>
* ImmutableMap.of("kafka", new KafkaChannelBinding())
* </code>
*/
private Map<String, ? extends ChannelBinding> channelBinding;

/**
* The class object of the payload published by this producer.
*/
private Class<?> payloadType;

/**
* The binding of the producer.
* The operation binding of the producer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the name of this field to operationBinding now that we have channelBinding. It's a breaking change but that's ok.

* <br>
* For example:
* <code>
* ImmutableMap.of("kafka", new KafkaOperationBinding())
* </code>
*/
private Map<String, ? extends OperationBinding> binding;
private Map<String, ? extends OperationBinding> operationBinding;

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ProducerChannelScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.components.DefaultClassPathComponentsScanner;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
Expand Down Expand Up @@ -46,7 +45,7 @@ public AsyncApiDocket docket() {
ProducerData kafkaProducerData = ProducerData.builder()
.channelName("producer-topic")
.payloadType(String.class)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.build();

return AsyncApiDocket.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ public void scan_componentHasListenerMethod() {
.build();

Operation operation = Operation.builder()
.bindings(ImmutableMap.of("test", new TestChannelScanner.TestBinding()))
.bindings(ImmutableMap.of("test-operation-binding", new TestChannelScanner.TestOperationBinding()))
.message(message)
.build();

ChannelItem expectedChannel = ChannelItem.builder().publish(operation).build();
ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("test-channel-binding", new TestChannelScanner.TestChannelBinding()))
.publish(operation)
.build();

assertThat(actualChannels)
.containsExactly(Maps.immutableEntry("test-channel", expectedChannel));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.kafka.KafkaChannelBinding;
import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
Expand Down Expand Up @@ -41,7 +42,8 @@ public void allFieldsProducerData() {
String channelName = "example-producer-topic-foo1";
ProducerData producerData = ProducerData.builder()
.channelName(channelName)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();

Expand All @@ -64,6 +66,7 @@ public void allFieldsProducerData() {
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.subscribe(operation)
.build();

Expand Down Expand Up @@ -95,13 +98,15 @@ public void multipleProducersForSameTopic() {

ProducerData producerData1 = ProducerData.builder()
.channelName(channelName)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();

ProducerData producerData2 = ProducerData.builder()
.channelName(channelName)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.channelBinding(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.operationBinding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(AnotherExamplePayloadDto.class)
.build();

Expand Down Expand Up @@ -134,6 +139,7 @@ public void multipleProducersForSameTopic() {
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
.subscribe(operation)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.stavshamir.springwolf.asyncapi.scanners.channels;

import com.asyncapi.v2.binding.ChannelBinding;
import com.asyncapi.v2.binding.OperationBinding;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
Expand All @@ -19,9 +20,14 @@ protected String getChannelName(AbstractChannelScannerTest.TestChannelListener a
return "test-channel";
}

@Override
protected Map<String, ? extends ChannelBinding> buildChannelBinding(AbstractChannelScannerTest.TestChannelListener annotation) {
return ImmutableMap.of("test-channel-binding", new TestChannelBinding());
}

@Override
protected Map<String, ? extends OperationBinding> buildOperationBinding(AbstractChannelScannerTest.TestChannelListener annotation) {
return ImmutableMap.of("test", new TestBinding());
return ImmutableMap.of("test-operation-binding", new TestOperationBinding());
}

@Override
Expand All @@ -34,9 +40,13 @@ protected Class<?> getPayloadType(Method method) {
return parameterTypes[0];
}

@EqualsAndHashCode(callSuper = true)
public static class TestChannelBinding extends ChannelBinding {
}


@EqualsAndHashCode(callSuper = true)
public static class TestBinding extends OperationBinding {
public static class TestOperationBinding extends OperationBinding {
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package io.github.stavshamir.springwolf.example.configuration;

import com.asyncapi.v2.binding.amqp.AMQPChannelBinding;
import com.asyncapi.v2.binding.amqp.AMQPOperationBinding;
import com.asyncapi.v2.model.info.Info;
import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.configuration.EnableAsyncApi;
import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Collections;

@Configuration
@EnableAsyncApi
public class AsyncApiConfiguration {
Expand All @@ -34,10 +41,25 @@ public AsyncApiDocket asyncApiDocket() {
.url(String.format("%s:%s", amqpHost, amqpPort))
.build();

AMQPChannelBinding.ExchangeProperties exchangeProperties = new AMQPChannelBinding.ExchangeProperties();
exchangeProperties.setName("example-topic-exchange");
ProducerData exampleProducer = ProducerData.builder()
.channelName("example-producer-channel")
.channelBinding(ImmutableMap.of("amqp", AMQPChannelBinding.builder()
.is("routingKey")
.exchange(exchangeProperties)
.build()))
.payloadType(AnotherPayloadDto.class)
.operationBinding(ImmutableMap.of("amqp", AMQPOperationBinding.builder()
.cc(Collections.singletonList("example-topic-routing-key"))
.build()))
.build();

return AsyncApiDocket.builder()
.basePackage("io.github.stavshamir.springwolf.example.consumers")
.info(info)
.server("amqp", amqp)
.producer(exampleProducer)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.stavshamir.springwolf.example.configuration;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -31,4 +31,22 @@ public Queue exampleBindingsQueue() {
return new Queue("example-bindings-queue", false);
}

@Bean
public Exchange exampleTopicExchange() {
return new TopicExchange("example-topic-exchange");
}

@Bean
public Queue exampleTopicQueue() {
return new Queue("example-topic-queue");
}

@Bean
public Binding exampleTopicBinding(Queue exampleTopicQueue, Exchange exampleTopicExchange) {
return BindingBuilder.bind(exampleTopicQueue)
.to(exampleTopicExchange)
.with("example-topic-routing-key")
.noargs();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public void bindingsExample(AnotherPayloadDto payload) {
logger.info("Received new message in example-bindings-queue: {}", payload.toString());
}

@RabbitListener(queues = "example-topic-queue")
public void bindingsBeanExample(AnotherPayloadDto payload) {
logger.info("Received new message in example-topic-queue: {}", payload.toString());
}
}
Loading