From 043318f395a300f0404aa5fe7a1d1d7a530cb901 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Mon, 7 Jun 2021 12:58:08 +0200 Subject: [PATCH 1/3] Add extension for Apicurio Registry Avro --- bom/application/pom.xml | 16 + .../io/quarkus/deployment/Capability.java | 3 + .../java/io/quarkus/deployment/Feature.java | 1 + devtools/bom-descriptor-json/pom.xml | 13 + docs/pom.xml | 13 + .../apicurio-registry-dev-services.adoc | 46 +++ .../src/main/asciidoc/kafka-dev-services.adoc | 6 +- .../asciidoc/kafka-schema-registry-avro.adoc | 319 ++++++++++-------- docs/src/main/asciidoc/kafka.adoc | 3 +- .../apicurio-registry-avro/deployment/pom.xml | 64 ++++ .../avro/ApicurioRegistryAvroProcessor.java | 44 +++ ...rioRegistryDevServicesBuildTimeConfig.java | 35 ++ .../DevServicesApicurioRegistryProcessor.java | 246 ++++++++++++++ extensions/apicurio-registry-avro/pom.xml | 21 ++ .../apicurio-registry-avro/runtime/pom.xml | 64 ++++ .../resources/META-INF/quarkus-extension.yaml | 11 + .../deployment/DevServicesKafkaProcessor.java | 8 +- .../KafkaDevServicesBuildTimeConfig.java | 2 +- .../client/deployment/KafkaProcessor.java | 35 +- extensions/pom.xml | 1 + .../kafka-avro-apicurio2/pom.xml | 9 +- .../src/main/resources/application.properties | 6 +- 22 files changed, 783 insertions(+), 183 deletions(-) create mode 100644 docs/src/main/asciidoc/apicurio-registry-dev-services.adoc create mode 100644 extensions/apicurio-registry-avro/deployment/pom.xml create mode 100644 extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java create mode 100644 extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryDevServicesBuildTimeConfig.java create mode 100644 extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java create mode 100644 extensions/apicurio-registry-avro/pom.xml create mode 100644 extensions/apicurio-registry-avro/runtime/pom.xml create mode 100644 extensions/apicurio-registry-avro/runtime/src/main/resources/META-INF/quarkus-extension.yaml diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 0bd9f8d4fe442..539fae2682ba9 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -209,6 +209,7 @@ 1.0.0.Final 1.2.0.Final 1.10.2 + 2.0.0.Final 0.8.7 1.15.3 3.2.8 @@ -1106,6 +1107,16 @@ quarkus-avro-deployment ${project.version} + + io.quarkus + quarkus-apicurio-registry-avro + ${project.version} + + + io.quarkus + quarkus-apicurio-registry-avro-deployment + ${project.version} + io.quarkus quarkus-smallrye-health @@ -2844,6 +2855,11 @@ agroal-pool ${agroal.version} + + io.apicurio + apicurio-registry-serdes-avro-serde + ${apicurio-registry.version} + io.smallrye.reactive mutiny diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 4755540a303d6..1e02dfb497a31 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -102,4 +102,7 @@ public interface Capability { String VERTX = QUARKUS_PREFIX + "vertx"; String VERTX_CORE = VERTX + ".core"; + + String APICURIO_REGISTRY = QUARKUS_PREFIX + "apicurio.registry"; + String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro"; } diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index ffc1ba19b04e8..6cc1761515713 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -19,6 +19,7 @@ public enum Feature { AMAZON_SES, AMAZON_KMS, AMAZON_SSM, + APICURIO_REGISTRY_AVRO, ARTEMIS_CORE, ARTEMIS_JMS, CACHE, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index afe2b7aff156b..0abaed110bd15 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -318,6 +318,19 @@ + + io.quarkus + quarkus-apicurio-registry-avro + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc diff --git a/docs/pom.xml b/docs/pom.xml index 9e67166360b6f..9ac2366e2a793 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -279,6 +279,19 @@ + + io.quarkus + quarkus-apicurio-registry-avro-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc-deployment diff --git a/docs/src/main/asciidoc/apicurio-registry-dev-services.adoc b/docs/src/main/asciidoc/apicurio-registry-dev-services.adoc new file mode 100644 index 0000000000000..2ab4b713c0ad3 --- /dev/null +++ b/docs/src/main/asciidoc/apicurio-registry-dev-services.adoc @@ -0,0 +1,46 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Dev Services for Apicurio Registry + +include::./attributes.adoc[] + +If the `quarkus-apicurio-registry-avro` extension is present, Dev Services for Apicurio Registry automatically starts an Apicurio Registry instance in dev mode and when running tests. +Also, all Kafka channels in SmallRye Reactive Messaging are automatically configured to use this registry. +(This automatic configuration of course only applies to serializers and deserializers from the Apicurio Registry Avro library.) + +== Enabling / Disabling Dev Services for Apicurio Registry + +Dev Services for Apicurio Registry is automatically enabled unless: + +- `quarkus.apicurio-registry.devservices.enabled` is set to `false` +- `mp.messaging.connector.smallrye-kafka.apicurio.registry.url` is configured +- all the Reactive Messaging Kafka channels have the `apicurio.registry.url` attribute set + +Dev Services for Apicurio Registry relies on Docker to start the registry. +If your environment does not support Docker, you will need to start the registry manually, or use an already running registry. +You can configure the registry URL for all Kafka channels in SmallRye Reactive Messaging with a single property: + +[source,properties] +---- +mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 +---- + +== Setting the port + +By default, Dev Services for Apicurio Registry picks a random port and configures the application. +You can set the port by configuring the `quarkus.apicurio-registry.devservices.port` property. + +Note that the Kafka channels in SmallRye Reactive messaging are automatically configured with the chosen port. + +== Configuring the image + +Dev Services for Apicurio Registry uses `apicurio/apicurio-registry-mem` images. +You can select any version from https://hub.docker.com/r/apicurio/apicurio-registry-mem: + +[source, properties] +---- +quarkus.apicurio-registry.devservices.image-name=apicurio/apicurio-registry-mem:latest-snapshot +---- diff --git a/docs/src/main/asciidoc/kafka-dev-services.adoc b/docs/src/main/asciidoc/kafka-dev-services.adoc index 7ba436c3eb7d6..5f285a84701e1 100644 --- a/docs/src/main/asciidoc/kafka-dev-services.adoc +++ b/docs/src/main/asciidoc/kafka-dev-services.adoc @@ -7,11 +7,11 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc include::./attributes.adoc[] -Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. +If any Kafka-related extension is present (e.g. `quarkus-smallrye-reactive-messaging-kafka`), Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. So, you don't have to start a broker manually. The application is configured automatically. -IMPORTANT: Because starting a Kafka broker can be long, Dev Services for Kafka uses https://vectorized.io/redpanda[Red Panda], a Kafka compatible broker which starts in ~1 second. +IMPORTANT: Because starting a Kafka broker can be long, Dev Services for Kafka uses https://vectorized.io/redpanda[Redpanda], a Kafka compatible broker which starts in ~1 second. == Enabling / Disabling Dev Services for Kafka @@ -42,4 +42,4 @@ You can select any version from https://hub.docker.com/r/vectorized/redpanda: quarkus.kafka.devservices.image-name=vectorized/redpanda:latest ---- -IMPORTANT: Dev Services for Kafka only support Red Panda. +IMPORTANT: Dev Services for Kafka only support Redpanda. diff --git a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc index 841170b5488c0..8807b69690589 100644 --- a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc +++ b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc @@ -8,7 +8,7 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc include::./attributes.adoc[] This guide shows how your Quarkus application can use Apache Kafka, http://avro.apache.org/docs/current/[Avro] serialized -records, and connect to Schema Registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Schema Registry]. +records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry]. If you are not familiar with Kafka and Kafka in Quarkus in particular, consider first going through the link:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide. @@ -39,6 +39,7 @@ The schema, describing the _Movie_, is stored in Apicurio Registry. The same concept applies if you are using the Confluent Avro _serde_ and Confluent Schema Registry. == Solution + We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example. @@ -57,31 +58,16 @@ mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ -DprojectArtifactId=kafka-avro-schema-quickstart \ -DclassName="org.acme.kafka.MovieResource" \ -Dpath="/movies" \ - -Dextensions="resteasy-reactive,resteasy-reactive-jackson,smallrye-reactive-messaging-kafka,avro" + -Dextensions="resteasy-reactive,resteasy-reactive-jackson,smallrye-reactive-messaging-kafka,apicurio-registry-avro" cd kafka-avro-schema-quickstart ---- -Additionally, we need a serializer and deserializer for Avro. -In this guide, we will use the ones provided by Apicurio. - -[source,xml] ----- - - io.apicurio - apicurio-registry-serdes-avro-serde - 2.0.0.Final - - - - io.quarkus - quarkus-apache-httpclient - ----- - [TIP] ==== -If you use Confluent Schema Registry, you need the following dependencies and the Confluent Maven repository added +If you use Confluent Schema Registry, you don't need the `quarkus-apicurio-registry-avro` extension. +Instead, you need the following dependencies and the Confluent Maven repository added to your `pom.xml`: + [source,xml] ---- @@ -117,6 +103,7 @@ to your `pom.xml`: ==== == Avro schema + Apache Avro is a data serialization system. Data structures are described using schemas. The first thing we need to do is to create a schema describing the `Movie` structure. Create a file called `src/main/avro/movie.avsc` with the schema for our record (Kafka message): @@ -147,11 +134,11 @@ the Avro syntax and supported types. TIP: With Quarkus, there's no need to use a specific Maven plugin to process the Avro schema, this is all done for you! -If you run the project with `mvn compile quarkus:dev`, the changes you do to the schema file will be +If you run the project with `mvn quarkus:dev`, the changes you do to the schema file will be automatically applied to the generated Java files. - == The `Movie` producer + Having defined the schema, we can now jump to implementing the `MovieResource`. Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method @@ -189,25 +176,25 @@ public class MovieResource { Now, we need to _map_ the `movies` channel (the `Emitter` emits to this channel) to a Kafka topic. To achieve this, edit the `application.properties` file, and add the following content: + [source,properties] ---- -# set the URL of the Apicurio Schema Registry, a global setting shared between all Kafka producers and consumers -mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 - # set the connector to use for the `movies` channel to smallrye-kafka mp.messaging.outgoing.movies.connector=smallrye-kafka # the name of the corresponding Kafka topic to `movies` mp.messaging.outgoing.movies.topic=movies -# set the serializer for the `movies` channel to the Apicurio Avro Serializer -mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer - # automatically register the schema with the registry, if not present mp.messaging.outgoing.movies.apicurio.registry.auto-register=true ---- +TIP: You might have noticed that we didn't define the `value.serializer`. +That's because Quarkus can xref:kafka.adoc#serialization-autodetection[autodetect] that `io.apicurio.registry.serde.avro.AvroKafkaSerializer` is appropriate here, based on the `@Channel` declaration, structure of the `Movie` type, and presence of the Apicurio Registry libraries. +We still have to define the `apicurio.registry.auto-register` property. + == The `Movie` consumer + So, we can write records into Kafka containing our `Movie` data. That data is serialized using Avro. Now, it's time to implement a consumer for them. @@ -249,6 +236,7 @@ public class ConsumedMovieResource { The last bit of the application's code is the configuration of the `movies-from-kafka` channel in `application.properties`: + [source,properties] ---- # set the connector for the incoming channel to `smallrye-kafka` @@ -257,9 +245,6 @@ mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka # set the topic name for the channel to `movies` mp.messaging.incoming.movies-from-kafka.topic=movies -# set the deserializer for the `movies-from-kafka` channel to the Apicurio Avro Deserializer -mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer - # disable auto-commit, Reactive Messaging handles it itself mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false @@ -267,9 +252,73 @@ mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest mp.messaging.incoming.movies-from-kafka.apicurio.registry.use-specific-avro-reader=true ---- +TIP: You might have noticed that we didn't define the `value.deserializer`. +That's because Quarkus can xref:kafka.adoc#serialization-autodetection[autodetect] that `io.apicurio.registry.serde.avro.AvroKafkaDeserializer` is appropriate here, based on the `@Channel` declaration, structure of the `Movie` type, and presence of the Apicurio Registry libraries. +We still have to define the `apicurio.registry.use-specific-avro-reader` property. + +== Running the application + +Start the application in dev mode: + +[source,bash] +---- +mvn quarkus:dev +---- + +Kafka broker and Apicurio Registry instance are started automatically thanks to Dev Services. +See xref:kafka-dev-services.adoc[Dev Services for Kafka] and xref:apicurio-registry-dev-services.adoc[Dev Services for Apicurio Registry] for more details. + +TIP: You might have noticed that we didn't configure the schema registry URL anywhere. +This is because Dev Services for Apicurio Registry configures all Kafka channels in SmallRye Reactive Messaging to use the automatically started registry instance. + +In the second terminal, query the `ConsumedMovieResource` resource with `curl`: + +[source,bash] +---- +curl -N http://localhost:8080/consumed-movies +---- + +In the third one, post a few movies: + +[source,bash] +---- +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Shawshank Redemption","year":1994}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Godfather","year":1972}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Dark Knight","year":2008}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"12 Angry Men","year":1957}' \ + http://localhost:8080/movies +---- + +Observe what is printed in the second terminal. You should see something along the lines of: + +[source] +---- +data:'The Shawshank Redemption' from 1994 + +data:'The Godfather' from 1972 + +data:'The Dark Knight' from 2008 -== The infrastructure -To use our application, we need Kafka and Apicurio Schema Registry. +data:'12 Angry Men' from 1957 +---- + +== Running in JVM or Native mode + +When not running in dev or test mode, you will need to start your own Kafka broker and Apicurio Registry. The easiest way to get them running is to use `docker-compose` to start the appropriate containers. Create a `docker-compose.yaml` file at the root of the project with the following content: @@ -317,77 +366,50 @@ services: QUARKUS_PROFILE: prod ---- -== Running the application -Let's first start the Schema Registry and Kafka containers: -[source,shell script] +Before starting the application, let's first start the Kafka broker and Apicurio Registry: + +[source,bash] ---- docker-compose up ---- + NOTE: To stop the containers, use `docker-compose down`. You can also clean up the containers with `docker-compose rm` -Then, start the application: -[source,shell script] ----- -mvn compile quarkus:dev ----- - +You can build and run the application in JVM mode with: -In the second terminal, query the `ConsumedMovieResource` resource with `curl`: -[source,shell script] +[source, bash] ---- -curl -N http://localhost:8080/consumed-movies ----- - -In the third one, post a few movies: -[source,shell script] +mvn package +java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar ---- -curl --header "Content-Type: application/json" \ - --request POST \ - --data '{"title":"The Shawshank Redemption","year":1994}' \ - http://localhost:8080/movies -curl --header "Content-Type: application/json" \ - --request POST \ - --data '{"title":"The Godfather","year":1972}' \ - http://localhost:8080/movies +NOTE: By default, the application tries to connect to a Kafka broker listening at `localhost:9092`. +You can configure the bootstrap server using: `java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar` -curl --header "Content-Type: application/json" \ - --request POST \ - --data '{"title":"The Dark Knight","year":2008}' \ - http://localhost:8080/movies +Specifying the registry URL on the command line is not very convenient, so you can add a configuration property only for the `prod` profile: -curl --header "Content-Type: application/json" \ - --request POST \ - --data '{"title":"12 Angry Men","year":1957}' \ - http://localhost:8080/movies +[source,properties] ---- - -Observe what is printed in the second terminal. You should see something along the lines of: -[source] +%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 ---- -data:'The Shawshank Redemption' from 1994 -data:'The Godfather' from 1972 +You can build and run the native executable with: -data:'The Dark Knight' from 2008 - -data:'12 Angry Men' from 1957 +[source,bash] +---- +mvn package -Dnative +./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092 ---- - -== Building a native executable -Building a native executable -You can build a native executable with the usual command `./mvnw package -Dnative`. - -Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner`. == Testing the application -=== Infrastructure for tests -We will now use Testcontainers to set up Kafka and Apicurio Schema Registry for tests. +As mentioned above, Dev Services for Kafka and Apicurio Registry automatically start and configure a Kafka broker and Apicurio Registry instance in dev mode and for tests. +Hence, we don't have to set up Kafka and Apicurio Registry ourselves. +We can just focus on writing the test. + +First, let's add test dependencies on REST Client and Awaitility to `pom.xml`: -First, let's add test dependencies on REST Client, Awaitility and Strimzi to `pom.xml`. Testcontainers will be pulled -in transitively by `strimzi-test-container`: [source,xml] ---- @@ -398,18 +420,6 @@ in transitively by `strimzi-test-container`: quarkus-rest-client test - - io.strimzi - strimzi-test-container - 0.22.1 - test - - - org.apache.logging.log4j - log4j-core - - - org.awaitility awaitility @@ -418,49 +428,6 @@ in transitively by `strimzi-test-container`: ---- -Now, let's define a link:getting-started-testing.adoc#quarkus-test-resource[QuarkusTestResourceLifecycleManager] that will create the appropriate -containers: -[source,java] ----- -package org.acme.kafka; - -import java.util.HashMap; -import java.util.Map; - -import org.testcontainers.containers.GenericContainer; - -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import io.strimzi.StrimziKafkaContainer; - -public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { - - private final StrimziKafkaContainer kafka = new StrimziKafkaContainer(); - - private GenericContainer registry; - - @Override - public Map start() { - kafka.start(); - registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final") - .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod"); - registry.start(); - Map properties = new HashMap<>(); - properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); - properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); - return properties; - } - - @Override - public void stop() { - registry.stop(); - kafka.stop(); - } -} ----- - -=== The test In the test, we will send movies in a loop and check if the `ConsumedMovieResource` returns what we send. @@ -492,8 +459,6 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; @QuarkusTest -// register the class that sets up Testcontainers: -@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) public class MovieResourceTest { @TestHTTPResource("/consumed-movies") @@ -563,8 +528,80 @@ NOTE: We modified the `MovieResourceTest` that was generated together with the p subclass, `NativeMovieResourceIT`, that runs the same test against the native executable. To run it, execute `mvn verify -Dnative`, or `mvn clean install -Dnative` +=== Manual setup + +If we couldn't use Dev Services and wanted to start a Kafka broker and Apicurio Registry instance manually, we would define a link:getting-started-testing.adoc#quarkus-test-resource[QuarkusTestResourceLifecycleManager]. + +[source,xml] +---- + + ... + + io.strimzi + strimzi-test-container + 0.22.1 + test + + + org.apache.logging.log4j + log4j-core + + + + +---- + +[source,java] +---- +package org.acme.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.testcontainers.containers.GenericContainer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.strimzi.StrimziKafkaContainer; + +public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { + + private final StrimziKafkaContainer kafka = new StrimziKafkaContainer(); + + private GenericContainer registry; + + @Override + public Map start() { + kafka.start(); + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final") + .withExposedPorts(8080) + .withEnv("QUARKUS_PROFILE", "prod"); + registry.start(); + Map properties = new HashMap<>(); + properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); + properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); + return properties; + } + + @Override + public void stop() { + registry.stop(); + kafka.stop(); + } +} +---- + +[source,java] +---- +@QuarkusTest +@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) +public class MovieResourceTest { + ... +} +---- == Avro code generation details + In this guide we used the Quarkus code generation mechanism to generate Java files from Avro schema. @@ -592,4 +629,4 @@ regular getter will be generated. Defaults to `false` * link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.9/kafka/kafka.html[SmallRye Reactive Messaging Kafka] documentation * link:https://quarkus.io/blog/kafka-avro/[How to Use Kafka, Schema Registry and Avro with Quarkus] - a blog post on which -the guide is based. It gives a good introduction to Avro and the concept of Schema Registry +the guide is based. It gives a good introduction to Avro and the concept of schema registry diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 9ef49cd21c219..98a07d74d4532 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -184,7 +184,8 @@ import org.jboss.resteasy.annotations.SseElementType; public class PriceResource { @Inject - @Channel("my-data-stream") Publisher prices; // <1> + @Channel("my-data-stream") + Publisher prices; // <1> @GET @Path("/stream") diff --git a/extensions/apicurio-registry-avro/deployment/pom.xml b/extensions/apicurio-registry-avro/deployment/pom.xml new file mode 100644 index 0000000000000..ba504cdff01d8 --- /dev/null +++ b/extensions/apicurio-registry-avro/deployment/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-avro-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-avro-deployment + Quarkus - Apicurio Registry - Avro - Deployment + + + + io.quarkus + quarkus-apicurio-registry-avro + + + + io.quarkus + quarkus-core-deployment + + + io.quarkus + quarkus-avro-deployment + + + io.quarkus + quarkus-apache-httpclient-deployment + + + + org.testcontainers + testcontainers + + + + io.quarkus + quarkus-junit5-internal + test + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java new file mode 100644 index 0000000000000..cca84983b1891 --- /dev/null +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java @@ -0,0 +1,44 @@ +package io.quarkus.apicurio.registry.avro; + +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; + +public class ApicurioRegistryAvroProcessor { + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.APICURIO_REGISTRY_AVRO); + } + + @BuildStep + public void apicurioRegistryAvro(BuildProducer reflectiveClass, + BuildProducer sslNativeSupport) { + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.avro.AvroKafkaDeserializer", + "io.apicurio.registry.serde.avro.AvroKafkaSerializer")); + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", + "io.apicurio.registry.serde.strategy.TopicIdStrategy", + "io.apicurio.registry.serde.avro.DefaultAvroDatumProvider", + "io.apicurio.registry.serde.avro.ReflectAvroDatumProvider", + "io.apicurio.registry.serde.avro.strategy.RecordIdStrategy", + "io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy")); + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.DefaultSchemaResolver", + "io.apicurio.registry.serde.DefaultIdHandler", + "io.apicurio.registry.serde.Legacy4ByteIdHandler", + "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", + "io.apicurio.registry.serde.headers.DefaultHeadersHandler")); + + // Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL + // TODO when the new HTTP client SPI in Apicurio Registry client appears, this will no longer be needed + // (but we'll have to make sure that the Vert.x HTTP client is used) + sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_AVRO)); + } +} diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryDevServicesBuildTimeConfig.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryDevServicesBuildTimeConfig.java new file mode 100644 index 0000000000000..b4de867195da4 --- /dev/null +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryDevServicesBuildTimeConfig.java @@ -0,0 +1,35 @@ +package io.quarkus.apicurio.registry.avro; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "apicurio-registry.devservices", phase = ConfigPhase.BUILD_TIME) +public class ApicurioRegistryDevServicesBuildTimeConfig { + + /** + * If Dev Services for Apicurio Registry has been explicitly enabled or disabled. Dev Services are generally enabled + * by default, unless there is an existing configuration present. For Apicurio Registry, Dev Services starts a registry + * unless {@code mp.messaging.connector.smallrye-kafka.apicurio.registry.url} is set. + */ + @ConfigItem + public Optional enabled = Optional.empty(); + + /** + * Optional fixed port the dev service will listen to. + *

+ * If not defined, the port will be chosen randomly. + */ + @ConfigItem + public Optional port; + + /** + * The Apicurio Registry image to use. + * Note that only Apicurio Registry 2.x images are supported. + */ + @ConfigItem(defaultValue = "apicurio/apicurio-registry-mem:2.0.0.Final") + public String imageName; + +} diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java new file mode 100644 index 0000000000000..4f4c970774966 --- /dev/null +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java @@ -0,0 +1,246 @@ +package io.quarkus.apicurio.registry.avro; + +import java.util.Objects; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.logging.Logger; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.IsDockerWorking; +import io.quarkus.deployment.IsNormal; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.DevServicesNativeConfigResultBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.configuration.ConfigUtils; + +/** + * Starts Apicurio Registry as dev service if needed. + *

+ * In the future, when we have multiple Apicurio Registry extensions (Avro, Protobuf, ...), + * this dev service support should probably be moved into an extra extension (quarkus-apicurio-registry-internal). + */ +public class DevServicesApicurioRegistryProcessor { + + private static final Logger log = Logger.getLogger(DevServicesApicurioRegistryProcessor.class); + + private static final String REGISTRY_URL_CONFIG = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url"; + + static volatile AutoCloseable closeable; + static volatile ApicurioRegistryDevServiceCfg cfg; + static volatile boolean first = true; + + private final IsDockerWorking isDockerWorking = new IsDockerWorking(true); + + @BuildStep(onlyIfNot = IsNormal.class) + public void startApicurioRegistryDevService( + LaunchModeBuildItem launchMode, + ApicurioRegistryDevServicesBuildTimeConfig apicurioRegistryDevServices, + BuildProducer runTimeConfiguration, + BuildProducer devServicesConfiguration) { + + ApicurioRegistryDevServiceCfg configuration = getConfiguration(apicurioRegistryDevServices); + + if (closeable != null) { + boolean restartRequired = launchMode.getLaunchMode() == LaunchMode.TEST; + if (!restartRequired) { + restartRequired = !configuration.equals(cfg); + } + if (!restartRequired) { + return; + } + shutdownApicurioRegistry(); + cfg = null; + } + + ApicurioRegistry apicurioRegistry = startApicurioRegistry(configuration); + if (apicurioRegistry == null) { + return; + } + + cfg = configuration; + closeable = apicurioRegistry.getCloseable(); + + runTimeConfiguration.produce(new RunTimeConfigurationDefaultBuildItem( + REGISTRY_URL_CONFIG, apicurioRegistry.getUrl() + "/apis/registry/v2")); + devServicesConfiguration.produce(new DevServicesNativeConfigResultBuildItem( + REGISTRY_URL_CONFIG, apicurioRegistry.getUrl() + "/apis/registry/v2")); + + log.infof("Dev Services for Apicurio Registry started. The registry is available at %s", apicurioRegistry.getUrl()); + + // Configure the watch dog + if (first) { + first = false; + Runnable closeTask = new Runnable() { + @Override + public void run() { + if (closeable != null) { + shutdownApicurioRegistry(); + } + first = true; + closeable = null; + cfg = null; + } + }; + QuarkusClassLoader cl = (QuarkusClassLoader) Thread.currentThread().getContextClassLoader(); + ((QuarkusClassLoader) cl.parent()).addCloseTask(closeTask); + Thread closeHookThread = new Thread(closeTask, "Apicurio Registry container shutdown thread"); + Runtime.getRuntime().addShutdownHook(closeHookThread); + ((QuarkusClassLoader) cl.parent()).addCloseTask(new Runnable() { + @Override + public void run() { + Runtime.getRuntime().removeShutdownHook(closeHookThread); + } + }); + } + } + + private void shutdownApicurioRegistry() { + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable e) { + log.error("Failed to stop Apicurio Registry", e); + } finally { + closeable = null; + } + } + } + + private ApicurioRegistry startApicurioRegistry(ApicurioRegistryDevServiceCfg config) { + if (!config.devServicesEnabled) { + // explicitly disabled + log.debug("Not starting dev services for Apicurio Registry, as it has been disabled in the config."); + return null; + } + + if (ConfigUtils.isPropertyPresent(REGISTRY_URL_CONFIG)) { + log.debug("Not starting dev services for Apicurio Registry, " + REGISTRY_URL_CONFIG + " is configured."); + return null; + } + + if (!hasKafkaChannelWithoutApicurioRegistry()) { + log.debug("Not starting dev services for Apicurio Registry, all the channels have a registry URL configured."); + return null; + } + + if (!isDockerWorking.getAsBoolean()) { + log.warn("Docker isn't working, please run Apicurio Registry yourself."); + return null; + } + + // Starting the broker + ApicurioRegistryContainer container = new ApicurioRegistryContainer( + DockerImageName.parse(config.imageName), config.fixedExposedPort); + container.start(); + + return new ApicurioRegistry(container.getUrl(), container); + } + + private boolean hasKafkaChannelWithoutApicurioRegistry() { + Config config = ConfigProvider.getConfig(); + for (String name : config.getPropertyNames()) { + boolean isIncoming = name.startsWith("mp.messaging.incoming."); + boolean isOutgoing = name.startsWith("mp.messaging.outgoing."); + boolean isConnector = name.endsWith(".connector"); + boolean isKafka = isConnector + && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored")); + boolean isConfigured = false; + if ((isIncoming || isOutgoing) && isKafka) { + isConfigured = ConfigUtils.isPropertyPresent(name.replace(".connector", ".apicurio.registry.url")); + } + if (!isConfigured) { + return true; + } + } + return false; + } + + private ApicurioRegistryDevServiceCfg getConfiguration(ApicurioRegistryDevServicesBuildTimeConfig cfg) { + return new ApicurioRegistryDevServiceCfg(cfg.enabled.orElse(true), cfg.imageName, cfg.port.orElse(0)); + } + + private static class ApicurioRegistry { + private final String url; + private final AutoCloseable closeable; + + public ApicurioRegistry(String url, AutoCloseable closeable) { + this.url = url; + this.closeable = closeable; + } + + public String getUrl() { + return url; + } + + public AutoCloseable getCloseable() { + return closeable; + } + } + + private static final class ApicurioRegistryDevServiceCfg { + private final boolean devServicesEnabled; + private final String imageName; + private final Integer fixedExposedPort; + + public ApicurioRegistryDevServiceCfg(boolean devServicesEnabled, String imageName, Integer fixedExposedPort) { + this.devServicesEnabled = devServicesEnabled; + this.imageName = imageName; + this.fixedExposedPort = fixedExposedPort; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ApicurioRegistryDevServiceCfg that = (ApicurioRegistryDevServiceCfg) o; + return devServicesEnabled == that.devServicesEnabled + && Objects.equals(imageName, that.imageName) + && Objects.equals(fixedExposedPort, that.fixedExposedPort); + } + + @Override + public int hashCode() { + return Objects.hash(devServicesEnabled, imageName, fixedExposedPort); + } + } + + private static final class ApicurioRegistryContainer extends GenericContainer { + private static final int APICURIO_REGISTRY_PORT = 8080; // inside the container + + private final int port; + + private ApicurioRegistryContainer(DockerImageName dockerImageName, int fixedExposedPort) { + super(dockerImageName); + this.port = fixedExposedPort; + withNetwork(Network.SHARED); + withExposedPorts(APICURIO_REGISTRY_PORT); + withEnv("QUARKUS_PROFILE", "prod"); + if (!dockerImageName.getRepository().equals("apicurio/apicurio-registry-mem")) { + throw new IllegalArgumentException("Only apicurio/apicurio-registry-mem images are supported"); + } + } + + @Override + protected void configure() { + super.configure(); + if (port > 0) { + addFixedExposedPort(port, APICURIO_REGISTRY_PORT); + } + } + + public String getUrl() { + return String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(APICURIO_REGISTRY_PORT)); + } + } +} diff --git a/extensions/apicurio-registry-avro/pom.xml b/extensions/apicurio-registry-avro/pom.xml new file mode 100644 index 0000000000000..3689c6ada15f2 --- /dev/null +++ b/extensions/apicurio-registry-avro/pom.xml @@ -0,0 +1,21 @@ + + + + quarkus-extensions-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + 4.0.0 + quarkus-apicurio-registry-avro-parent + Quarkus - Apicurio Registry - Avro + pom + + + deployment + runtime + + diff --git a/extensions/apicurio-registry-avro/runtime/pom.xml b/extensions/apicurio-registry-avro/runtime/pom.xml new file mode 100644 index 0000000000000..39cafeefaf44d --- /dev/null +++ b/extensions/apicurio-registry-avro/runtime/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-avro-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-avro + Quarkus - Apicurio Registry - Avro - Runtime + Provide support for the Apicurio Registry Avro library + + + io.apicurio + apicurio-registry-serdes-avro-serde + + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-avro + + + + io.quarkus + quarkus-apache-httpclient + + + + + + + + io.quarkus + quarkus-bootstrap-maven-plugin + + + io.quarkus.apicurio.registry.avro + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/apicurio-registry-avro/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/apicurio-registry-avro/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..4954c80bb779d --- /dev/null +++ b/extensions/apicurio-registry-avro/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,11 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Apicurio Registry - Avro" +metadata: + keywords: + - "apicurio" + - "avro" + guide: "https://quarkus.io/guides/kafka-schema-registry-avro" + categories: + - "serialization" + status: "experimental" diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java index 66d8b1f3f78f1..654b5ee048889 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java @@ -171,8 +171,10 @@ private boolean hasKafkaChannelWithoutBootstrapServers() { boolean isIncoming = name.startsWith("mp.messaging.incoming."); boolean isOutgoing = name.startsWith("mp.messaging.outgoing."); boolean isConnector = name.endsWith(".connector"); + boolean isKafka = isConnector + && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored")); boolean isConfigured = false; - if ((isIncoming || isOutgoing) && isConnector) { + if ((isIncoming || isOutgoing) && isKafka) { isConfigured = ConfigUtils.isPropertyPresent(name.replace(".connector", ".bootstrap.servers")); } if (!isConfigured) { @@ -239,7 +241,7 @@ public int hashCode() { } /** - * Container configuring and starting the Red Panda broker. + * Container configuring and starting the Redpanda broker. * See https://vectorized.io/docs/quick-start-docker/ */ private static final class RedPandaKafkaContainer extends GenericContainer { @@ -253,7 +255,7 @@ private RedPandaKafkaContainer(DockerImageName dockerImageName, int fixedExposed this.port = fixedExposedPort; withNetwork(Network.SHARED); withExposedPorts(KAFKA_PORT); - // For red panda, we need to start the broker - see https://vectorized.io/docs/quick-start-docker/ + // For Redpanda, we need to start the broker - see https://vectorized.io/docs/quick-start-docker/ if (dockerImageName.getRepository().equals("vectorized/redpanda")) { withCreateContainerCmdModifier(cmd -> { cmd.withEntrypoint("sh"); diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java index 4ddfe9b9e8317..f17027618e97d 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java @@ -27,7 +27,7 @@ public class KafkaDevServicesBuildTimeConfig { /** * The Kafka image to use. - * Note that only Red Panda images are supported. + * Note that only Redpanda images are supported. * See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda */ @ConfigItem(defaultValue = "vectorized/redpanda:v21.5.5") diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 50358ffca240d..8cd023ee21cc4 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -189,7 +189,7 @@ public void build( reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "java.nio.DirectByteBuffer")); reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "sun.misc.Cleaner")); - handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport); + handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport, capabilities); handleOpenTracing(reflectiveClass, capabilities); handleStrimziOAuth(reflectiveClass); if (config.snappyEnabled) { @@ -272,7 +272,8 @@ private void handleStrimziOAuth(BuildProducer reflecti private void handleAvro(BuildProducer reflectiveClass, BuildProducer proxies, BuildProducer serviceProviders, - BuildProducer sslNativeSupport) { + BuildProducer sslNativeSupport, + Capabilities capabilities) { // Avro - for both Confluent and Apicurio // --- Confluent --- @@ -353,38 +354,20 @@ private void handleAvro(BuildProducer reflectiveClass, "java.lang.AutoCloseable")); } catch (ClassNotFoundException e) { - //ignore, Apicurio Avro is not in the classpath + // ignore, Apicurio Avro is not in the classpath } // --- Apicurio Registry 2.x --- try { Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false, Thread.currentThread().getContextClassLoader()); - reflectiveClass.produce( - new ReflectiveClassBuildItem(true, true, false, - "io.apicurio.registry.serde.avro.AvroKafkaDeserializer", - "io.apicurio.registry.serde.avro.AvroKafkaSerializer")); - - reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, - "io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", - "io.apicurio.registry.serde.strategy.TopicIdStrategy", - "io.apicurio.registry.serde.avro.DefaultAvroDatumProvider", - "io.apicurio.registry.serde.avro.ReflectAvroDatumProvider", - "io.apicurio.registry.serde.avro.strategy.RecordIdStrategy", - "io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy")); - - reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, - "io.apicurio.registry.serde.DefaultSchemaResolver", - "io.apicurio.registry.serde.DefaultIdHandler", - "io.apicurio.registry.serde.Legacy4ByteIdHandler", - "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", - "io.apicurio.registry.serde.headers.DefaultHeadersHandler")); - - // Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL - sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.KAFKA_CLIENT)); + if (!capabilities.isPresent(Capability.APICURIO_REGISTRY_AVRO)) { + throw new RuntimeException( + "Apicurio Registry 2.x Avro classes detected, please use the quarkus-apicurio-registry-avro extension"); + } } catch (ClassNotFoundException e) { - //ignore, Apicurio Avro is not in the classpath + // ignore, Apicurio Avro is not in the classpath } } diff --git a/extensions/pom.xml b/extensions/pom.xml index 17ba5ac841872..1004460c78b4b 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -99,6 +99,7 @@ artemis-core artemis-jms avro + apicurio-registry-avro devservices diff --git a/integration-tests/kafka-avro-apicurio2/pom.xml b/integration-tests/kafka-avro-apicurio2/pom.xml index f2611e2e0629c..ca9ed93cab86d 100644 --- a/integration-tests/kafka-avro-apicurio2/pom.xml +++ b/integration-tests/kafka-avro-apicurio2/pom.xml @@ -74,14 +74,9 @@ - - io.apicurio - apicurio-registry-serdes-avro-serde - ${apicurio.version} - io.quarkus - quarkus-apache-httpclient + quarkus-apicurio-registry-avro @@ -199,7 +194,7 @@ io.quarkus - quarkus-apache-httpclient-deployment + quarkus-apicurio-registry-avro-deployment ${project.version} pom test diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties index e22c540faa69d..e63c0657ab0f4 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties +++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties @@ -3,4 +3,8 @@ quarkus.log.category.\"org.apache.kafka\".level=WARN quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check -quarkus.kafka.health.enabled=true \ No newline at end of file +quarkus.kafka.health.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +# Dev Services are tested by the means of kafka-avro-schema-quickstart +quarkus.apicurio-registry.devservices.enabled=false From 67f0c175d825fcf1974a5c36cb5dd42d67852044 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Wed, 9 Jun 2021 12:29:15 +0200 Subject: [PATCH 2/3] Fix Kafka serde autodetection for outgoing @Channel injection points --- .../deployment/SmallRyeReactiveMessagingKafkaProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 64694bd764b1d..2687f5f4a0f05 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -98,7 +98,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, for (AnnotationInstance annotation : discovery.findAnnotationsOnInjectionPoints(DotNames.CHANNEL)) { String channelName = annotation.value().asString(); - if (!discovery.isKafkaConnector(false, channelName)) { + if (!discovery.isKafkaConnector(false, channelName) + && !discovery.isKafkaConnector(true, channelName)) { continue; } From 334686d623bf5e95f2d518c0499bf084568447ee Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 10 Jun 2021 14:13:37 +0200 Subject: [PATCH 3/3] Disable Dev Services for Kafka in tests that use QuarkusTestResourceLifecycleManager --- .../src/main/resources/application.properties | 1 + .../kafka-avro/src/main/resources/application.properties | 5 ++++- .../kafka-sasl/src/main/resources/application.properties | 3 +++ .../kafka-snappy/src/main/resources/application.properties | 3 +++ .../kafka-ssl/src/main/resources/application.properties | 4 +++- .../kafka-streams/src/main/resources/application.properties | 3 +++ .../kafka/src/main/resources/application.properties | 3 +++ 7 files changed, 20 insertions(+), 2 deletions(-) diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties index e63c0657ab0f4..eda85eda32cf3 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties +++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties @@ -7,4 +7,5 @@ quarkus.kafka.health.enabled=true # using QuarkusTestResourceLifecycleManager in this test # Dev Services are tested by the means of kafka-avro-schema-quickstart +quarkus.kafka.devservices.enabled=false quarkus.apicurio-registry.devservices.enabled=false diff --git a/integration-tests/kafka-avro/src/main/resources/application.properties b/integration-tests/kafka-avro/src/main/resources/application.properties index e22c540faa69d..c0f22a629a999 100644 --- a/integration-tests/kafka-avro/src/main/resources/application.properties +++ b/integration-tests/kafka-avro/src/main/resources/application.properties @@ -3,4 +3,7 @@ quarkus.log.category.\"org.apache.kafka\".level=WARN quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check -quarkus.kafka.health.enabled=true \ No newline at end of file +quarkus.kafka.health.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka-sasl/src/main/resources/application.properties b/integration-tests/kafka-sasl/src/main/resources/application.properties index d4907bbe9f11b..c0f22a629a999 100644 --- a/integration-tests/kafka-sasl/src/main/resources/application.properties +++ b/integration-tests/kafka-sasl/src/main/resources/application.properties @@ -4,3 +4,6 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka-snappy/src/main/resources/application.properties b/integration-tests/kafka-snappy/src/main/resources/application.properties index a1d72fa75140a..57eb449e82e1b 100644 --- a/integration-tests/kafka-snappy/src/main/resources/application.properties +++ b/integration-tests/kafka-snappy/src/main/resources/application.properties @@ -6,3 +6,6 @@ quarkus.kafka.health.enabled=true # Enable snappy: quarkus.kafka.snappy.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka-ssl/src/main/resources/application.properties b/integration-tests/kafka-ssl/src/main/resources/application.properties index 84ce3c79e651d..1548be15b8b16 100644 --- a/integration-tests/kafka-ssl/src/main/resources/application.properties +++ b/integration-tests/kafka-ssl/src/main/resources/application.properties @@ -5,5 +5,7 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true +quarkus.native.enable-all-security-services=true -quarkus.native.enable-all-security-services=true \ No newline at end of file +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka-streams/src/main/resources/application.properties b/integration-tests/kafka-streams/src/main/resources/application.properties index 0effdb41fcd56..b92d104610f07 100644 --- a/integration-tests/kafka-streams/src/main/resources/application.properties +++ b/integration-tests/kafka-streams/src/main/resources/application.properties @@ -26,3 +26,6 @@ kafka.auto.offset.reset=earliest # Set explicitly as for tests the quarkus.application.name does not default to the name of the project %test.quarkus.application.name=streams-test-pipeline kafka-streams.some-property=dummy + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka/src/main/resources/application.properties b/integration-tests/kafka/src/main/resources/application.properties index d4907bbe9f11b..c0f22a629a999 100644 --- a/integration-tests/kafka/src/main/resources/application.properties +++ b/integration-tests/kafka/src/main/resources/application.properties @@ -4,3 +4,6 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false