From c4b89e443a02ba950115985274d38d8cdbc6ba13 Mon Sep 17 00:00:00 2001 From: Alexander Furer Date: Wed, 16 Jun 2021 17:16:39 +0300 Subject: [PATCH] grpc+kafka demo ref #219 --- grpc-spring-boot-starter-demo/build.gradle | 4 + .../springboot/grpc/kafka/GrpcKafkaTest.java | 83 +++++++++++++++++++ .../src/test/proto/custom.proto | 21 +++++ .../test/resources/bootstrap-kafka-test.yml | 22 +++++ .../src/test/resources/bootstrap.yml | 2 + .../metrics/GRpcMetricsAutoConfiguration.java | 5 +- .../security/SecurityAutoConfiguration.java | 8 +- 7 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 grpc-spring-boot-starter-demo/src/test/java/org/lognet/springboot/grpc/kafka/GrpcKafkaTest.java create mode 100644 grpc-spring-boot-starter-demo/src/test/proto/custom.proto create mode 100644 grpc-spring-boot-starter-demo/src/test/resources/bootstrap-kafka-test.yml diff --git a/grpc-spring-boot-starter-demo/build.gradle b/grpc-spring-boot-starter-demo/build.gradle index aa3a2a9e..f24403f8 100644 --- a/grpc-spring-boot-starter-demo/build.gradle +++ b/grpc-spring-boot-starter-demo/build.gradle @@ -85,6 +85,10 @@ dependencies { testCompile "org.springframework.cloud:spring-cloud-starter-bootstrap" testCompile "com.playtika.testcontainers:embedded-keycloak:2.0.9" + testCompile "com.playtika.testcontainers:embedded-kafka:2.0.9" + testCompile "org.springframework.cloud:spring-cloud-starter-stream-kafka" + + constraints { testCompile('org.testcontainers:testcontainers:1.15.3'){ because 'embedded-keycloak or testcontainers should be upgraded when https://github.com/testcontainers/testcontainers-java/issues/4125 fixed.' diff --git a/grpc-spring-boot-starter-demo/src/test/java/org/lognet/springboot/grpc/kafka/GrpcKafkaTest.java b/grpc-spring-boot-starter-demo/src/test/java/org/lognet/springboot/grpc/kafka/GrpcKafkaTest.java new file mode 100644 index 00000000..d3214f86 --- /dev/null +++ b/grpc-spring-boot-starter-demo/src/test/java/org/lognet/springboot/grpc/kafka/GrpcKafkaTest.java @@ -0,0 +1,83 @@ +package org.lognet.springboot.grpc.kafka; + +import io.grpc.examples.custom.Custom; +import io.grpc.examples.custom.CustomServiceGrpc; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.lognet.springboot.grpc.GRpcService; +import org.lognet.springboot.grpc.GrpcServerTestBase; +import org.lognet.springboot.grpc.demo.DemoApp; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.time.Duration; +import java.util.function.Consumer; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; + + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {DemoApp.class}, webEnvironment = NONE) +@ActiveProfiles({"disable-security", "kafka-test"}) +@Import({GrpcKafkaTest.TestConfig.class}) +public class GrpcKafkaTest extends GrpcServerTestBase { + + @Configuration + @Slf4j + public static class TestConfig { + @Autowired + private KafkaTemplate kafkaTemplate; + + @GRpcService + class MyCustomService extends io.grpc.examples.custom.CustomServiceGrpc.CustomServiceImplBase { + @Override + public void custom(Custom.CustomRequest request, StreamObserver responseObserver) { + kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), request.getName().getBytes()) + .addCallback(e -> { + responseObserver.onNext(Custom.CustomReply.newBuilder().setMessage(request.getName()).build()); + responseObserver.onCompleted(); + }, f -> { + responseObserver.onError(f); + }); + } + } + + + @MockBean + public Consumer consumerMock; + + @Bean + public Consumer consumer() { + return consumerMock; + } + } + + @Autowired + public Consumer consumerMock; + + @Test + public void sendCustomMessage() { + String name = "Johnny"; + final Custom.CustomReply customReply = CustomServiceGrpc.newBlockingStub(getChannel()) + .custom(Custom.CustomRequest.newBuilder() + .setName(name) + .build() + ); + assertThat(customReply.getMessage(), Matchers.is(name)); + Mockito.verify(consumerMock, + Mockito.timeout(Duration.ofSeconds(3).toMillis()).times(1) + ).accept(name); + } +} diff --git a/grpc-spring-boot-starter-demo/src/test/proto/custom.proto b/grpc-spring-boot-starter-demo/src/test/proto/custom.proto new file mode 100644 index 00000000..097b6481 --- /dev/null +++ b/grpc-spring-boot-starter-demo/src/test/proto/custom.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +import "google/protobuf/empty.proto"; +option java_package = "io.grpc.examples.custom"; + + + +// Custom service definition. +service CustomService { + rpc Custom ( CustomRequest) returns ( CustomReply) {} +} + + +message CustomRequest { + string name = 1; +} + +message CustomReply { + string message = 1; +} + + diff --git a/grpc-spring-boot-starter-demo/src/test/resources/bootstrap-kafka-test.yml b/grpc-spring-boot-starter-demo/src/test/resources/bootstrap-kafka-test.yml new file mode 100644 index 00000000..02e22dc9 --- /dev/null +++ b/grpc-spring-boot-starter-demo/src/test/resources/bootstrap-kafka-test.yml @@ -0,0 +1,22 @@ +embedded: + kafka: + enabled: true + containers: + enabled: true + +spring: + kafka: + bootstrap-servers: ${embedded.kafka.brokerList} + template: + default-topic: testTopic + cloud: + stream: + default-binder: kafka + kafka: + binder: + brokers: ${embedded.kafka.brokerList} + bindings: + consumer-in-0: + destination: ${spring.kafka.template.default-topic} + function: + definition: consumer \ No newline at end of file diff --git a/grpc-spring-boot-starter-demo/src/test/resources/bootstrap.yml b/grpc-spring-boot-starter-demo/src/test/resources/bootstrap.yml index ff42d497..4d2c31a2 100644 --- a/grpc-spring-boot-starter-demo/src/test/resources/bootstrap.yml +++ b/grpc-spring-boot-starter-demo/src/test/resources/bootstrap.yml @@ -20,4 +20,6 @@ embedded: env: JAVA_OPTS_APPEND: "-Xms1024m -Xmx1024m" containers: + enabled: false + kafka: enabled: false \ No newline at end of file diff --git a/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/metrics/GRpcMetricsAutoConfiguration.java b/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/metrics/GRpcMetricsAutoConfiguration.java index 008580f2..d0401d4c 100644 --- a/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/metrics/GRpcMetricsAutoConfiguration.java +++ b/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/metrics/GRpcMetricsAutoConfiguration.java @@ -15,7 +15,9 @@ import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcGlobalInterceptor; +import org.lognet.springboot.grpc.GRpcServerRunner; import org.lognet.springboot.grpc.GRpcService; +import org.lognet.springboot.grpc.autoconfigure.GRpcAutoConfiguration; import org.lognet.springboot.grpc.autoconfigure.GRpcServerProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegistryAutoConfiguration; @@ -41,9 +43,10 @@ import java.util.stream.StreamSupport; @Configuration -@AutoConfigureAfter({MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class}) +@AutoConfigureAfter({MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class, GRpcAutoConfiguration.class}) @ConditionalOnClass({MeterRegistry.class}) @Conditional(GRpcMetricsAutoConfiguration.OnGrpcAndMeterRegistryEnabledCondition.class) +@ConditionalOnBean(GRpcServerRunner.class) @EnableConfigurationProperties(GRpcMetricsProperties.class) public class GRpcMetricsAutoConfiguration { diff --git a/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/security/SecurityAutoConfiguration.java b/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/security/SecurityAutoConfiguration.java index 2776ab8a..540e3b8c 100644 --- a/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/security/SecurityAutoConfiguration.java +++ b/grpc-spring-boot-starter/src/main/java/org/lognet/springboot/grpc/autoconfigure/security/SecurityAutoConfiguration.java @@ -1,6 +1,11 @@ package org.lognet.springboot.grpc.autoconfigure.security; +import org.lognet.springboot.grpc.GRpcServerRunner; import org.lognet.springboot.grpc.GRpcService; +import org.lognet.springboot.grpc.autoconfigure.GRpcAutoConfiguration; +import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegistryAutoConfiguration; +import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -9,7 +14,8 @@ import org.springframework.security.config.annotation.authentication.configuration.AuthenticationConfiguration; @Configuration -@ConditionalOnBean(annotation = GRpcService.class) +@AutoConfigureAfter({GRpcAutoConfiguration.class}) +@ConditionalOnBean(value = {GRpcServerRunner.class}) @ConditionalOnProperty(value = "grpc.security.auth.enabled", matchIfMissing = true, havingValue = "true") @ConditionalOnClass(AuthenticationConfiguration.class) @Import(GrpcSecurityEnablerConfiguration.class)