diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index e045d16835463..284140bcbb64a 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -88,6 +88,11 @@
kafka_2.12
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java
index ba05f3d5a5dcf..9a0ac69fc8832 100644
--- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java
@@ -52,7 +52,7 @@ private static void addSSL(Properties props) {
public static KafkaConsumer createConsumer() {
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java
index 79ff20527d595..a0c73ab7a689e 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java
@@ -1,5 +1,8 @@
package io.quarkus.it.kafka;
+import static io.quarkus.it.kafka.KafkaTestResource.extract;
+import static org.awaitility.Awaitility.await;
+
import java.io.File;
import java.util.Collections;
import java.util.Map;
@@ -10,6 +13,8 @@
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import kafka.server.KafkaServer;
+import kafka.server.RunningAsBroker;
public class KafkaSASLTestResource implements QuarkusTestResourceLifecycleManager {
@@ -46,6 +51,10 @@ public Map start() {
throw new RuntimeException(e);
}
+ KafkaServer server = extract(kafka);
+ await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
+ server.logger().underlying().info("Broker 'kafka-sasl' started");
+
return Collections.emptyMap();
}
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java
index d24c2e89c4068..5cc87de15f2f0 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java
@@ -1,5 +1,8 @@
package io.quarkus.it.kafka;
+import static io.quarkus.it.kafka.KafkaTestResource.extract;
+import static org.awaitility.Awaitility.await;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -15,6 +18,8 @@
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import kafka.server.KafkaServer;
+import kafka.server.RunningAsBroker;
public class KafkaSSLTestResource implements QuarkusTestResourceLifecycleManager {
@@ -48,7 +53,7 @@ public Map start() {
props.setProperty("zookeeper.connection.timeout.ms", "45000");
//See http://kafka.apache.org/documentation.html#security_ssl for detail
props.setProperty("listener.security.protocol.map", "CLIENT:SSL");
- props.setProperty("listeners", "CLIENT://:19093");
+ props.setProperty("listeners", "CLIENT://:19099");
props.setProperty("inter.broker.listener.name", "CLIENT");
props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath.toString());
props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
@@ -60,7 +65,7 @@ public Map start() {
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
kafka = new KafkaCluster()
- .withPorts(2183, 19093)
+ .withPorts(2189, 19099)
.addBrokers(1)
.usingDirectory(directory)
.deleteDataUponShutdown(true)
@@ -70,6 +75,11 @@ public Map start() {
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ KafkaServer server = extract(kafka);
+ await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
+ server.logger().underlying().info("Broker 'kafka-ssl' started");
+
return Collections.emptyMap();
}
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java
index b4e819b120574..18a2440d4646b 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java
@@ -1,13 +1,18 @@
package io.quarkus.it.kafka;
+import static org.awaitility.Awaitility.await;
+
import java.io.File;
+import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import io.debezium.kafka.KafkaCluster;
+import io.debezium.kafka.KafkaServer;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import kafka.server.RunningAsBroker;
public class KafkaTestResource implements QuarkusTestResourceLifecycleManager {
@@ -29,6 +34,11 @@ public Map start() {
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ kafka.server.KafkaServer server = extract(kafka);
+ await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
+ server.logger().underlying().info("Broker 'kafka' started");
+
return Collections.emptyMap();
}
@@ -38,4 +48,22 @@ public void stop() {
kafka.shutdown();
}
}
+
+ @SuppressWarnings("unchecked")
+ static kafka.server.KafkaServer extract(KafkaCluster cluster) {
+ Field kafkaServersField;
+ Field serverField;
+ try {
+ kafkaServersField = cluster.getClass().getDeclaredField("kafkaServers");
+ kafkaServersField.setAccessible(true);
+ Map map = (Map) kafkaServersField.get(cluster);
+ KafkaServer server = map.get(1);
+ serverField = KafkaServer.class.getDeclaredField("server");
+ serverField.setAccessible(true);
+ return (kafka.server.KafkaServer) serverField.get(server);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
}
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java
index 753ce93953335..db500ffed64b4 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java
@@ -40,7 +40,7 @@ private static void addSsl(Properties props) {
public static Producer createProducer() {
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-ssl-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());