From 6efee4e5fc5edd01de92987e6afccb8c58715e75 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 3 Mar 2025 15:43:03 +0100 Subject: [PATCH 1/2] Example installation setup: replaced zookeeper with kraft --- examples/installation/docker-compose.yml | 25 ++++++------------------ 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/examples/installation/docker-compose.yml b/examples/installation/docker-compose.yml index 030dd6a73b7..2e94d3e8f32 100644 --- a/examples/installation/docker-compose.yml +++ b/examples/installation/docker-compose.yml @@ -127,30 +127,17 @@ services: ### KAFKA-related services: - zookeeper: - image: bitnami/zookeeper:3.9 - restart: unless-stopped - environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - ZOO_4LW_COMMANDS_WHITELIST: "ruok" - healthcheck: - test: [ "CMD-SHELL", 'echo "ruok" | nc -w 2 -q 2 localhost 2181 | grep imok' ] - interval: 5s - retries: 5 - deploy: - resources: - limits: - memory: 256M - kafka: image: bitnami/kafka:3.7.0 restart: unless-stopped hostname: nu-kafka environment: - KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" - depends_on: - zookeeper: - condition: service_healthy + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER healthcheck: test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list" ] interval: 10s From 625c0ea88d6d0905c2a9bcb34b2e4302b31568ec Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 4 Mar 2025 10:23:11 +0100 Subject: [PATCH 2/2] Removed unused zk server based kafka configuration from unit tests --- .../defaultmodel/FlinkWithKafkaSuite.scala | 2 +- .../devmodel/TableKafkaPingPongTest.scala | 10 +-- ...EmbeddedDeploymentManagerRestartTest.scala | 6 +- ...s.scala => EmbeddedKafkaKraftServer.scala} | 90 +++++-------------- .../nussknacker/engine/kafka/KafkaSpec.scala | 14 +-- .../helpers/SchemaRegistryMixin.scala | 2 +- .../src/main/resources/logback-test.xml | 3 - 7 files changed, 39 insertions(+), 88 deletions(-) rename utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/{EmbeddedKafkaServerWithDependencies.scala => EmbeddedKafkaKraftServer.scala} (65%) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index 9a9ba43d09a..33f9e10808c 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -127,7 +127,7 @@ abstract class FlinkWithKafkaSuite .empty() .withValue( KafkaConfigProperties.bootstrapServersProperty("config"), - fromAnyRef(kafkaServerWithDependencies.kafkaAddress) + fromAnyRef(kafkaServer.bootstrapServers) ) .withValue( KafkaConfigProperties.property("config", "auto.offset.reset"), diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala index 0b20cf98118..69d5004d2f9 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala @@ -46,7 +46,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { | ) WITH ( | 'connector' = 'kafka', | 'topic' = '${inputTopicNameTest1.name}', - | 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}', + | 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}', | 'properties.group.id' = 'someConsumerGroupId', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' @@ -59,7 +59,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { | ) WITH ( | 'connector' = 'kafka', | 'topic' = '${inputTopicNameTest2.name}', - | 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}', + | 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}', | 'properties.group.id' = 'someConsumerGroupId', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' @@ -72,7 +72,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { | ) WITH ( | 'connector' = 'kafka', | 'topic' = '${outputTopicNameTest2.name}', - | 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}', + | 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}', | 'properties.group.id' = 'someConsumerGroupId', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' @@ -85,7 +85,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { | ) WITH ( | 'connector' = 'kafka', | 'topic' = '${inputTopicNameTest3.name}', - | 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}', + | 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}', | 'properties.group.id' = 'someConsumerGroupId', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' @@ -98,7 +98,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { | ) WITH ( | 'connector' = 'kafka', | 'topic' = '${outputTopicNameTest3.name}', - | 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}', + | 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}', | 'properties.group.id' = 'someConsumerGroupId', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'json' diff --git a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala index 33293c95639..2682a7afb49 100644 --- a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala +++ b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala @@ -39,7 +39,7 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde fixture.deployScenario(scenario) } - kafkaServerWithDependencies.shutdownKafkaServer() + kafkaServer.shutdownKafkaServer() eventually { val jobStatuses = manager.getScenarioDeploymentsStatuses(name).futureValue.value @@ -47,8 +47,8 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde } // We have to recreate kafka server because after shutdown it is unusable anymore - kafkaServerWithDependencies.recreateKafkaServer() - kafkaServerWithDependencies.startupKafkaServer() + kafkaServer.recreateKafkaServer() + kafkaServer.startupKafkaServer() eventually { manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( diff --git a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaServerWithDependencies.scala b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaKraftServer.scala similarity index 65% rename from utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaServerWithDependencies.scala rename to utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaKraftServer.scala index dcf9cd9c3e4..7c72a41dc2e 100644 --- a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaServerWithDependencies.scala +++ b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/EmbeddedKafkaKraftServer.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.kafka import com.typesafe.scalalogging.LazyLogging import kafka.server -import kafka.server.{KafkaRaftServer, KafkaServer, Server} +import kafka.server.{KafkaRaftServer, Server} import kafka.tools.StorageTool import org.apache.commons.io.FileUtils import org.apache.commons.io.output.NullOutputStream @@ -12,10 +12,8 @@ import org.apache.kafka.common.{IsolationLevel, Uuid} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.server.common.MetadataVersion -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import java.io.{File, PrintStream} -import java.net.InetSocketAddress import java.nio.file.Files import java.util.{Locale, Properties} import scala.language.implicitConversions @@ -23,10 +21,7 @@ import scala.util.control.NonFatal // We should consider switching to KafkaClusterTestKit (https://github.com/apache/kafka/blob/3.6/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java), // it's used by spring-kafka (https://github.com/spring-projects/spring-kafka/blob/3.1.x/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java). -object EmbeddedKafkaServerWithDependencies { - - // TODO: Remove supprot for legacy zk setup - private val kRaftEnabled: Boolean = true +object EmbeddedKafkaKraftServer { private val localhost: String = "127.0.0.1" @@ -34,31 +29,20 @@ object EmbeddedKafkaServerWithDependencies { brokerPort: Int, controllerPort: Int, kafkaBrokerConfig: Map[String, String] - ): EmbeddedKafkaServerWithDependencies = { + ): EmbeddedKafkaKraftServer = { val kafkaServerLogDir = Files.createTempDirectory("embeddedKafka").toFile val clusterId = Uuid.randomUuid() val kafkaConfig = prepareKafkaServerConfig(brokerPort, controllerPort, kafkaServerLogDir, kafkaBrokerConfig, clusterId) - val kafkaServer = if (kRaftEnabled) { - new EmbeddedKafkaServerWithDependencies( - None, - () => { - prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId) - new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM) - }, - s"$localhost:$brokerPort", - kafkaServerLogDir - ) - } else { - val zk = createZookeeperServer(controllerPort) - new EmbeddedKafkaServerWithDependencies( - Some(zk), - () => new KafkaServer(kafkaConfig, time = Time.SYSTEM), - s"$localhost:$brokerPort", - kafkaServerLogDir - ) - } - kafkaServer.startupKafkaServerAndDependencies() + val kafkaServer = new EmbeddedKafkaKraftServer( + () => { + prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId) + new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM) + }, + s"$localhost:$brokerPort", + kafkaServerLogDir + ) + kafkaServer.startupKafkaServer() kafkaServer } @@ -70,20 +54,14 @@ object EmbeddedKafkaServerWithDependencies { clusterId: Uuid ) = { val properties = new Properties() - if (kRaftEnabled) { - properties.setProperty("node.id", "0") - properties.setProperty("process.roles", "broker,controller") - properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort") - properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") - properties.setProperty("controller.listener.names", s"CONTROLLER") - properties.setProperty("inter.broker.listener.name", "PLAINTEXT") - properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort") - properties.setProperty("cluster.id", clusterId.toString) - } else { - properties.setProperty("broker.id", "0") - properties.setProperty("zookeeper.connect", s"$localhost:$controllerPort") - properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort") - } + properties.setProperty("node.id", "0") + properties.setProperty("process.roles", "broker,controller") + properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort") + properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + properties.setProperty("controller.listener.names", s"CONTROLLER") + properties.setProperty("inter.broker.listener.name", "PLAINTEXT") + properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort") + properties.setProperty("cluster.id", clusterId.toString) properties.setProperty("num.partitions", "1") properties.setProperty("group.initial.rebalance.delay.ms", "0") properties.setProperty("offsets.topic.num.partitions", "1") @@ -112,20 +90,11 @@ object EmbeddedKafkaServerWithDependencies { ) } - private def createZookeeperServer(zkPort: Int) = { - val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress(localhost, zkPort), 1024) - val tempDir = Files.createTempDirectory("embeddedKafkaZk").toFile - val zkServer = new ZooKeeperServer(tempDir, tempDir, ZooKeeperServer.DEFAULT_TICK_TIME) - (factory, zkServer, tempDir) - } - } -class EmbeddedKafkaServerWithDependencies( - zooKeeperServerOpt: Option[(NIOServerCnxnFactory, ZooKeeperServer, File)], +class EmbeddedKafkaKraftServer( createKafkaServer: () => Server, - val kafkaAddress: String, + val bootstrapServers: String, kafkaServerLogDir: File ) extends LazyLogging { @@ -136,25 +105,10 @@ class EmbeddedKafkaServerWithDependencies( kafkaServer = createKafkaServer() } - def startupKafkaServerAndDependencies(): Unit = { - zooKeeperServerOpt.foreach(t => t._1.startup(t._2)) - startupKafkaServer() - } - def startupKafkaServer(): Unit = { kafkaServer.startup() } - def shutdownKafkaServerAndDependencies(): Unit = { - shutdownKafkaServer() - zooKeeperServerOpt.foreach { case (cnxnFactory, zkServer, zkTempDir) => - cnxnFactory.shutdown() - // factory shutdown doesn't pass 'fullyShutDown' flag to ZkServer.shutdown, we need to explicitly close database - zkServer.getZKDatabase.close() - cleanDirectorySilently(zkTempDir) - } - } - def shutdownKafkaServer(): Unit = { kafkaServer.shutdown() kafkaServer.awaitShutdown() diff --git a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaSpec.scala b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaSpec.scala index 60f3b02483e..79bca9f6b67 100644 --- a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaSpec.scala +++ b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaSpec.scala @@ -7,14 +7,14 @@ import pl.touk.nussknacker.test.{AvailablePortFinder, KafkaConfigProperties, Wit trait KafkaSpec extends BeforeAndAfterAll with WithConfig { self: Suite => - var kafkaServerWithDependencies: EmbeddedKafkaServerWithDependencies = _ - var kafkaClient: KafkaClient = _ - val kafkaBrokerConfig = Map.empty[String, String] + var kafkaServer: EmbeddedKafkaKraftServer = _ + var kafkaClient: KafkaClient = _ + val kafkaBrokerConfig = Map.empty[String, String] override protected def resolveConfig(config: Config): Config = super .resolveConfig(config) - .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress)) + .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers)) // For tests we want to read from the beginning... .withValue(KafkaConfigProperties.property("auto.offset.reset"), fromAnyRef("earliest")) @@ -22,20 +22,20 @@ trait KafkaSpec extends BeforeAndAfterAll with WithConfig { self: Suite => super.beforeAll() AvailablePortFinder.withAvailablePortsBlocked(2) { case List(controllerPort, brokerPort) => - kafkaServerWithDependencies = EmbeddedKafkaServerWithDependencies.run( + kafkaServer = EmbeddedKafkaKraftServer.run( brokerPort = brokerPort, controllerPort = controllerPort, kafkaBrokerConfig = kafkaBrokerConfig ) case _ => throw new MatchError(()) } - kafkaClient = new KafkaClient(kafkaAddress = kafkaServerWithDependencies.kafkaAddress, self.suiteName) + kafkaClient = new KafkaClient(kafkaAddress = kafkaServer.bootstrapServers, self.suiteName) } override protected def afterAll(): Unit = { try { kafkaClient.shutdown() - kafkaServerWithDependencies.shutdownKafkaServerAndDependencies() + kafkaServer.shutdownKafkaServer() } finally { super.afterAll() } diff --git a/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SchemaRegistryMixin.scala b/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SchemaRegistryMixin.scala index e39c6fc4966..cf8730b7e6e 100644 --- a/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SchemaRegistryMixin.scala +++ b/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SchemaRegistryMixin.scala @@ -12,7 +12,7 @@ trait SchemaRegistryMixin extends AnyFunSuite with KafkaSpec with KafkaWithSchem override protected def resolveConfig(config: Config): Config = { super .resolveConfig(config) - .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress)) + .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers)) // schema.registry.url have to be defined even for MockSchemaRegistryClient .withValue(KafkaConfigProperties.property("schema.registry.url"), fromAnyRef("not_used")) // we turn off auto registration to do it on our own passing mocked schema registry client // meaningful only in Flink tests diff --git a/utils/test-utils/src/main/resources/logback-test.xml b/utils/test-utils/src/main/resources/logback-test.xml index 79fb9f1fc41..aa0b808b408 100644 --- a/utils/test-utils/src/main/resources/logback-test.xml +++ b/utils/test-utils/src/main/resources/logback-test.xml @@ -43,7 +43,6 @@ - @@ -60,8 +59,6 @@ - -