Skip to content

Commit

Permalink
Extend publishing api for kafka plugin to support publishing of kafka…
Browse files Browse the repository at this point in the history
… headers (#86)
  • Loading branch information
sam0r040 authored Oct 22, 2022
1 parent c8ad5dd commit d51c271
Show file tree
Hide file tree
Showing 8 changed files with 348 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ResponseStatusException;

import java.util.Map;

@Slf4j
@RestController
@RequestMapping("/springwolf/kafka")
Expand All @@ -18,14 +17,18 @@ public class SpringwolfKafkaController {
private final SpringwolfKafkaProducer kafkaProducer;

@PostMapping("/publish")
public void publish(@RequestParam String topic, @RequestBody Map<String, Object> payload) {
public void publish(@RequestParam String topic, @RequestBody KafkaMessageDto kafkaMessage) {
if(kafkaMessage.getPayload() == null) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Message payload is required");
}

if (!kafkaProducer.isEnabled()) {
log.debug("Kafka producer is not enabled - message will not be published");
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Kafka producer is not enabled");
}

log.info("Publishing to kafka topic {}: {}", topic, payload);
kafkaProducer.send(topic, payload);
log.info("Publishing to kafka topic {}: {}", topic, kafkaMessage);
kafkaProducer.send(topic, kafkaMessage.getHeaders(), kafkaMessage.getPayload());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.github.stavshamir.springwolf.asyncapi.dtos;

import lombok.Data;

import java.util.Map;

@Data
public class KafkaMessageDto {

private final Map<String, String> headers;

private final Map<String, ?> payload;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.stavshamir.springwolf.configuration;

import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SpringwolfKafkaProducerConfiguration {

@Bean
public SpringwolfKafkaProducer springwolfKafkaProducer(@Autowired SpringwolfKafkaTemplateFactory templateFactory) {
return new SpringwolfKafkaProducer(templateFactory.buildKafkaTemplate());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.stavshamir.springwolf.configuration;

import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

@Component
@RequiredArgsConstructor
public class SpringwolfKafkaTemplateFactory {

private final AsyncApiDocket docket;

public Optional<KafkaTemplate<Object, Map<String, ?>>> buildKafkaTemplate() {
return getBootstrapServers(docket)
.map(this::buildProducerConfiguration)
.map(producerConfiguration -> new DefaultKafkaProducerFactory<Object, Map<String, ?>>(producerConfiguration))
.map(KafkaTemplate::new);
}

private Optional<String> getBootstrapServers(AsyncApiDocket docket) {
return docket.getServers().values().stream()
.filter(server -> server.getProtocol().equals("kafka"))
.map(Server::getUrl)
.findFirst();
}

private Map<String, Object> buildProducerConfiguration(String bootstrapServers) {
return ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
JsonSerializer.ADD_TYPE_INFO_HEADERS, false
);
}

}
Original file line number Diff line number Diff line change
@@ -1,72 +1,48 @@
package io.github.stavshamir.springwolf.producer;

import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

@Slf4j
@Service
@RequiredArgsConstructor
public class SpringwolfKafkaProducer {

private final KafkaTemplate<String, Map<String, Object>> kafkaTemplate;

@Getter
private boolean isEnabled = true;

public SpringwolfKafkaProducer(@Autowired AsyncApiDocket docket) {
Optional<String> bootstrapServers = getBootstrapServers(docket);
private final Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate;

if (bootstrapServers.isPresent()) {
Map<String, Object> config = buildProducerConfiguration(bootstrapServers.get());
DefaultKafkaProducerFactory<String, Map<String, Object>> factory = new DefaultKafkaProducerFactory<>(config);
this.kafkaTemplate = new KafkaTemplate<>(factory);
} else {
log.warn("No Kafka server found in the docket - at least one server must be configured with protocol 'kafka'");
kafkaTemplate = null;
isEnabled = false;
}
public boolean isEnabled() {
return kafkaTemplate.isPresent();
}

public void send(String topic, Map<String, Object> payload) {
if (!isEnabled) {
log.debug("Kafka producer is disabled");
return;
}

if (kafkaTemplate == null) {
public void send(String topic, Map<String, String> headers, Map<String, ?> payload) {
if (kafkaTemplate.isPresent()) {
kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload));
} else {
log.warn("Kafka producer is not configured");
return;
}

kafkaTemplate.send(topic, payload);
}

private Optional<String> getBootstrapServers(AsyncApiDocket docket) {
return docket.getServers().values().stream()
.filter(server -> server.getProtocol().equals("kafka"))
.map(Server::getUrl)
.findFirst();
private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, Map<String, String> headers, Map<String, ?> payload) {
List<Header> recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList();

return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders);
}

private Map<String, Object> buildProducerConfiguration(String bootstrapServers) {
return ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
JsonSerializer.ADD_TYPE_INFO_HEADERS, false
);
private List<Header> buildHeaders(Map<String, String> headers) {
return headers.entrySet().stream()
.map(header -> new RecordHeader(header.getKey(), header.getValue().getBytes(UTF_8)))
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.github.stavshamir.springwolf.asyncapi;

import io.github.stavshamir.springwolf.asyncapi.dtos.KafkaMessageDto;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ResponseStatusException;

import java.util.Collections;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
public class SpringwolfKafkaControllerTest {

@InjectMocks
private SpringwolfKafkaController springwolfKafkaController;

@Mock
private SpringwolfKafkaProducer springwolfKafkaProducer;

@Captor
private ArgumentCaptor<Map<String, ?>> payloadCaptor;

@Captor
private ArgumentCaptor<Map<String, String>> headerCaptor;

@Test
public void testControllerShouldReturnBadRequestIfPayloadIsEmpty() {
try {
springwolfKafkaController.publish("test-topic", new KafkaMessageDto(null, null));
failBecauseExceptionWasNotThrown(ResponseStatusException.class);
} catch (ResponseStatusException e) {
assertThat(e.getStatus()).isEqualTo(HttpStatus.BAD_REQUEST);
verifyZeroInteractions(springwolfKafkaProducer);
}
}

@Test
public void testControllerShouldReturnNotFoundIfNoKafkaProducerIsEnabled() {
when(springwolfKafkaProducer.isEnabled()).thenReturn(false);

Map<String, String> payload = Collections.singletonMap("some-key", "some-value");
KafkaMessageDto messageToPublish = new KafkaMessageDto(null, payload);

try {
springwolfKafkaController.publish("test-topic", messageToPublish);
failBecauseExceptionWasNotThrown(ResponseStatusException.class);
} catch (ResponseStatusException e) {
assertThat(e.getStatus()).isEqualTo(HttpStatus.NOT_FOUND);
}
}

@Test
public void testControllerShouldCallKafkaProducerIfOnlyPayloadIsSend() {
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);

Map<String, String> payload = Collections.singletonMap("some-key", "some-value");
KafkaMessageDto messageToPublish = new KafkaMessageDto(null, payload);

springwolfKafkaController.publish("test-topic", messageToPublish);

verify(springwolfKafkaProducer).send(eq("test-topic"), isNull(), payloadCaptor.capture());

assertThat(payloadCaptor.getValue()).isEqualTo(payload);
}

@Test
public void testControllerShouldCallKafkaProducerIfPayloadAndHeadersAreSend() {
when(springwolfKafkaProducer.isEnabled()).thenReturn(true);

Map<String, String> headers = Collections.singletonMap("some-header-key", "some-header-value");
Map<String, String> payload = Collections.singletonMap("some-payload-key", "some-payload-value");

KafkaMessageDto messageToPublish = new KafkaMessageDto(headers, payload);

springwolfKafkaController.publish("test-topic", messageToPublish);

verify(springwolfKafkaProducer).send(eq("test-topic"), headerCaptor.capture(), payloadCaptor.capture());

assertThat(headerCaptor.getValue()).isEqualTo(headers);
assertThat(payloadCaptor.getValue()).isEqualTo(payload);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.github.stavshamir.springwolf.configuration;


import com.asyncapi.v2.model.server.Server;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SpringwolfKafkaTemplateFactoryTest {

@InjectMocks
private SpringwolfKafkaTemplateFactory springwolfKafkaTemplateFactory;

@Mock
private AsyncApiDocket asyncApiDocket;

@Test
public void testNoSpringwolfKafkaProducerCreatedIfNoKafkaInstanceConfigured() {
Server noKafkaServer = Server.builder()
.url("some-url")
.protocol("not-kafka")
.build();
when(asyncApiDocket.getServers()).thenReturn(Collections.singletonMap("some-server", noKafkaServer));

Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate = springwolfKafkaTemplateFactory.buildKafkaTemplate();

assertThat(kafkaTemplate).isNotPresent();
}

@Test
public void testNoSpringwolfKafkaProducerCreatedIfNoServersConfigured() {
when(asyncApiDocket.getServers()).thenReturn(Collections.emptyMap());

Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate = springwolfKafkaTemplateFactory.buildKafkaTemplate();

assertThat(kafkaTemplate).isNotPresent();
}

@Test
public void testSpringwolfKafkaProducerCreatedIfKafkaInstanceIsConfigured() {
Server noKafkaServer = Server.builder()
.url("some-url")
.protocol("kafka")
.build();
when(asyncApiDocket.getServers()).thenReturn(Collections.singletonMap("some-server", noKafkaServer));

Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate = springwolfKafkaTemplateFactory.buildKafkaTemplate();

assertThat(kafkaTemplate).isPresent();
}
}
Loading

0 comments on commit d51c271

Please sign in to comment.