Skip to content

Commit

Permalink
Merge pull request #16495 from gsmet/1.13.2-backports-3
Browse files Browse the repository at this point in the history
1.13.2 backports 3
  • Loading branch information
gsmet authored Apr 14, 2021
2 parents 7650bc2 + 5e18e28 commit 27096d8
Show file tree
Hide file tree
Showing 65 changed files with 1,829 additions and 700 deletions.
2 changes: 0 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ updates:
# Kafka
- dependency-name: org.apache.kafka:*
- dependency-name: org.apache.zookeeper:zookeeper
# Debezium
- dependency-name: io.debezium:debezium-core
# Scala
- dependency-name: org.scala-lang:*
- dependency-name: net.alchim31.maven:scala-maven-plugin
Expand Down
14 changes: 10 additions & 4 deletions .github/native-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"Security2",
"Security3",
"Amazon",
"Messaging",
"Messaging1",
"Messaging2",
"Cache",
"HTTP",
"Misc1",
Expand Down Expand Up @@ -63,9 +64,14 @@
"test-modules": "amazon-services amazon-lambda amazon-lambda-http"
},
{
"category": "Messaging",
"timeout": 120,
"test-modules": "artemis-core artemis-jms kafka kafka-avro kafka-snappy kafka-streams reactive-messaging-amqp reactive-messaging-kafka reactive-messaging-http"
"category": "Messaging1",
"timeout": 100,
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka"
},
{
"category": "Messaging2",
"timeout": 70,
"test-modules": "artemis-core artemis-jms reactive-messaging-amqp reactive-messaging-http"
},
{
"category": "Security1",
Expand Down
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<hibernate-orm.version>5.4.29.Final</hibernate-orm.version>
<hibernate-reactive.version>1.0.0.CR1</hibernate-reactive.version>
<hibernate-validator.version>6.2.0.Final</hibernate-validator.version>
<hibernate-search.version>6.0.2.Final</hibernate-search.version>
<hibernate-search.version>6.0.3.Final</hibernate-search.version>
<narayana.version>5.10.6.Final</narayana.version>
<jboss-transaction-api_1.2_spec.version>1.1.1.Final</jboss-transaction-api_1.2_spec.version>
<agroal.version>1.11</agroal.version>
Expand Down
15 changes: 5 additions & 10 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
<description>Dependency management for integration tests. Importable by third party extension developers.</description>

<properties>
<debezium.version>1.4.2.Final</debezium.version>
<testcontainers.version>1.15.2</testcontainers.version>
<strimzi-test-container.version>0.22.1</strimzi-test-container.version>
</properties>

<dependencyManagement>
Expand All @@ -31,15 +31,10 @@
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<type>test-jar</type>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>${strimzi-test-container.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,9 @@ public void contributeBootProperties(BiConsumer<String, Object> propertyCollecto
@Override
public void onMetadataInitialized(Metadata metadata, BootstrapContext bootstrapContext,
BiConsumer<String, Object> propertyCollector) {
Version graalVMVersion = Version.getCurrent();
final boolean isAOT = Boolean.getBoolean("com.oracle.graalvm.isaot");
boolean isGraalVM20OrBelow = isAOT && graalVMVersion.compareTo(GRAAL_VM_VERSION_21) < 0;
HibernateOrmIntegrationBooter booter = HibernateOrmIntegrationBooter.builder(metadata, bootstrapContext)
.valueReadHandleFactory(
// GraalVM 20 or below doesn't support method handles
isGraalVM20OrBelow ? ValueReadHandleFactory.usingJavaLangReflect()
// GraalVM 21+ and OpenJDK can handle the default (method handles)
: null)
// MethodHandles don't work at all in GraalVM 20 and below, and seem unreliable on GraalVM 21
.valueReadHandleFactory(ValueReadHandleFactory.usingJavaLangReflect())
.build();
booter.preBoot(propertyCollector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ public Optional<ServiceBindingConfigSource> convert(List<ServiceBinding> service
Map<String, String> properties = new HashMap<>();
ServiceBinding binding = matchingByType.get();

String bootstrapServers = binding.getProperties().get("bootstrapservers");
String bootstrapServers = binding.getProperties().get("bootstrapServers");
if (bootstrapServers == null) {
bootstrapServers = binding.getProperties().get("bootstrap-servers");
}
if (bootstrapServers != null) {
properties.put("kafka.bootstrap.servers", bootstrapServers);
}

String securityProtocol = binding.getProperties().get("securityprotocol");
String securityProtocol = binding.getProperties().get("securityProtocol");
if (securityProtocol != null) {
properties.put("kafka.security.protocol", securityProtocol);
}

String saslMechanism = binding.getProperties().get("saslmechanism");
String saslMechanism = binding.getProperties().get("saslMechanism");
if (saslMechanism != null) {
properties.put("kafka.sasl.mechanism", saslMechanism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public void initData() {
createPerson("David Lodge", "London");
createPerson("Paul Auster", "New York");
createPerson("John Grisham", "Oxford");

// Add many other entities, so that mass indexing has something to do.
// DO NOT REMOVE, it's important to have many entities to fully test mass indexing.
for (int i = 0; i < 2000; i++) {
createPerson("Other Person #" + i, "Other City #" + i);
}
}

@GET
Expand All @@ -56,6 +62,10 @@ public String testSearch() {
assertEquals(1, person.size());
assertEquals("David Lodge", person.get(0).getName());

assertEquals(4 + 2000, searchSession.search(Person.class)
.where(f -> f.matchAll())
.fetchTotalHitCount());

return "OK";
}

Expand Down
21 changes: 8 additions & 13 deletions integration-tests/kafka-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,15 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -20,29 +21,32 @@
@Path("/avro")
public class AvroEndpoint {

@Inject
AvroKafkaCreator creator;

@GET
@Path("/confluent")
public JsonObject getConfluent() {
return get(AvroKafkaCreator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
return get(creator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
}

@POST
@Path("/confluent")
public void sendConfluent(Pet pet) {
KafkaProducer<Integer, Pet> p = AvroKafkaCreator.createConfluentProducer("test-avro-confluent");
KafkaProducer<Integer, Pet> p = creator.createConfluentProducer("test-avro-confluent");
send(p, pet, "test-avro-confluent-producer");
}

@GET
@Path("/apicurio")
public JsonObject getApicurio() {
return get(AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
return get(creator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
}

@POST
@Path("/apicurio")
public void sendApicurio(Pet pet) {
KafkaProducer<Integer, Pet> p = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio");
KafkaProducer<Integer, Pet> p = creator.createApicurioProducer("test-avro-apicurio");
send(p, pet, "test-avro-apicurio-producer");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
Expand All @@ -26,57 +30,93 @@
/**
* Create Avro Kafka Consumers and Producers
*/
@ApplicationScoped
public class AvroKafkaCreator {

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
Properties p = getConfluentConsumerProperties(groupdIdConfig);
@ConfigProperty(name = "kafka.bootstrap.servers")
String bootstrap;
@ConfigProperty(name = "schema.url.confluent")
String confluent;
@ConfigProperty(name = "schema.url.apicurio")
String apicurio;

public KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
return createConfluentProducer(bootstrap, confluent, clientId);
}

public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, apicurio, clientId);
}

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String bootstrap, String confluent,
String groupdIdConfig, String subscribtionName) {
Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(groupdIdConfig);
public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String bootstrap, String apicurio,
String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
Properties p = getConfluentProducerProperties(clientId);
public static KafkaProducer<Integer, Pet> createConfluentProducer(String bootstrap, String confluent,
String clientId) {
Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId);
return createProducer(p);
}

public static KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
Properties p = getApicurioProducerProperties(clientId);
public static KafkaProducer<Integer, Pet> createApicurioProducer(String bootstrap, String apicurio,
String clientId) {
Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
return createProducer(p);
}

private static KafkaConsumer<Integer, Pet> createConsumer(Properties props, String subscribtionName) {
if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
KafkaConsumer<Integer, Pet> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(subscribtionName));
return consumer;
}

private static KafkaProducer<Integer, Pet> createProducer(Properties props) {
if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
return new KafkaProducer<>(props);
}

private static Properties getConfluentConsumerProperties(String groupdIdConfig) {
Properties props = getGenericConsumerProperties(groupdIdConfig);
private static Properties getConfluentConsumerProperties(String bootstrap, String confluent,
String groupdIdConfig) {
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return props;
}

public static Properties getApicurioConsumerProperties(String groupdIdConfig) {
Properties props = getGenericConsumerProperties(groupdIdConfig);
public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) {
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
ReflectAvroDatumProvider.class.getName());
return props;
}

private static Properties getGenericConsumerProperties(String groupdIdConfig) {
private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
Expand All @@ -85,27 +125,30 @@ private static Properties getGenericConsumerProperties(String groupdIdConfig) {
return props;
}

private static Properties getConfluentProducerProperties(String clientId) {
Properties props = getGenericProducerProperties(clientId);
private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) {
Properties props = getGenericProducerProperties(bootstrap, clientId);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
return props;
}

private static Properties getApicurioProducerProperties(String clientId) {
Properties props = getGenericProducerProperties(clientId);
private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) {
Properties props = getGenericProducerProperties(bootstrap, clientId);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName());
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM,
SimpleTopicIdStrategy.class.getName());
props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM,
GetOrCreateIdStrategy.class.getName());
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
ReflectAvroDatumProvider.class.getName());
return props;
}

private static Properties getGenericProducerProperties(String clientId) {
private static Properties getGenericProducerProperties(String bootstrap, String clientId) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN

# enable health check
quarkus.kafka.health.enabled=true
kafka.bootstrap.servers=localhost:19092

Loading

0 comments on commit 27096d8

Please sign in to comment.