Skip to content

Commit

Permalink
grpc+kafka demo ref #219
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Furer committed Jun 16, 2021
1 parent 4298b5c commit c4b89e4
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 2 deletions.
4 changes: 4 additions & 0 deletions grpc-spring-boot-starter-demo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ dependencies {
testCompile "org.springframework.cloud:spring-cloud-starter-bootstrap"

testCompile "com.playtika.testcontainers:embedded-keycloak:2.0.9"
testCompile "com.playtika.testcontainers:embedded-kafka:2.0.9"
testCompile "org.springframework.cloud:spring-cloud-starter-stream-kafka"


constraints {
testCompile('org.testcontainers:testcontainers:1.15.3'){
because 'embedded-keycloak or testcontainers should be upgraded when https://github.com/testcontainers/testcontainers-java/issues/4125 fixed.'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.lognet.springboot.grpc.kafka;

import io.grpc.examples.custom.Custom;
import io.grpc.examples.custom.CustomServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.lognet.springboot.grpc.GRpcService;
import org.lognet.springboot.grpc.GrpcServerTestBase;
import org.lognet.springboot.grpc.demo.DemoApp;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.Duration;
import java.util.function.Consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;


@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoApp.class}, webEnvironment = NONE)
@ActiveProfiles({"disable-security", "kafka-test"})
@Import({GrpcKafkaTest.TestConfig.class})
public class GrpcKafkaTest extends GrpcServerTestBase {

@Configuration
@Slf4j
public static class TestConfig {
@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

@GRpcService
class MyCustomService extends io.grpc.examples.custom.CustomServiceGrpc.CustomServiceImplBase {
@Override
public void custom(Custom.CustomRequest request, StreamObserver<Custom.CustomReply> responseObserver) {
kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), request.getName().getBytes())
.addCallback(e -> {
responseObserver.onNext(Custom.CustomReply.newBuilder().setMessage(request.getName()).build());
responseObserver.onCompleted();
}, f -> {
responseObserver.onError(f);
});
}
}


@MockBean
public Consumer<String> consumerMock;

@Bean
public Consumer<String> consumer() {
return consumerMock;
}
}

@Autowired
public Consumer<String> consumerMock;

@Test
public void sendCustomMessage() {
String name = "Johnny";
final Custom.CustomReply customReply = CustomServiceGrpc.newBlockingStub(getChannel())
.custom(Custom.CustomRequest.newBuilder()
.setName(name)
.build()
);
assertThat(customReply.getMessage(), Matchers.is(name));
Mockito.verify(consumerMock,
Mockito.timeout(Duration.ofSeconds(3).toMillis()).times(1)
).accept(name);
}
}
21 changes: 21 additions & 0 deletions grpc-spring-boot-starter-demo/src/test/proto/custom.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
option java_package = "io.grpc.examples.custom";



// Custom service definition.
service CustomService {
rpc Custom ( CustomRequest) returns ( CustomReply) {}
}


message CustomRequest {
string name = 1;
}

message CustomReply {
string message = 1;
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
embedded:
kafka:
enabled: true
containers:
enabled: true

spring:
kafka:
bootstrap-servers: ${embedded.kafka.brokerList}
template:
default-topic: testTopic
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: ${embedded.kafka.brokerList}
bindings:
consumer-in-0:
destination: ${spring.kafka.template.default-topic}
function:
definition: consumer
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ embedded:
env:
JAVA_OPTS_APPEND: "-Xms1024m -Xmx1024m"
containers:
enabled: false
kafka:
enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.lognet.springboot.grpc.GRpcServerRunner;
import org.lognet.springboot.grpc.GRpcService;
import org.lognet.springboot.grpc.autoconfigure.GRpcAutoConfiguration;
import org.lognet.springboot.grpc.autoconfigure.GRpcServerProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegistryAutoConfiguration;
Expand All @@ -41,9 +43,10 @@
import java.util.stream.StreamSupport;

@Configuration
@AutoConfigureAfter({MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class})
@AutoConfigureAfter({MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class, GRpcAutoConfiguration.class})
@ConditionalOnClass({MeterRegistry.class})
@Conditional(GRpcMetricsAutoConfiguration.OnGrpcAndMeterRegistryEnabledCondition.class)
@ConditionalOnBean(GRpcServerRunner.class)
@EnableConfigurationProperties(GRpcMetricsProperties.class)
public class GRpcMetricsAutoConfiguration {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.lognet.springboot.grpc.autoconfigure.security;

import org.lognet.springboot.grpc.GRpcServerRunner;
import org.lognet.springboot.grpc.GRpcService;
import org.lognet.springboot.grpc.autoconfigure.GRpcAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegistryAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -9,7 +14,8 @@
import org.springframework.security.config.annotation.authentication.configuration.AuthenticationConfiguration;

@Configuration
@ConditionalOnBean(annotation = GRpcService.class)
@AutoConfigureAfter({GRpcAutoConfiguration.class})
@ConditionalOnBean(value = {GRpcServerRunner.class})
@ConditionalOnProperty(value = "grpc.security.auth.enabled", matchIfMissing = true, havingValue = "true")
@ConditionalOnClass(AuthenticationConfiguration.class)
@Import(GrpcSecurityEnablerConfiguration.class)
Expand Down

0 comments on commit c4b89e4

Please sign in to comment.