Skip to content

Commit

Permalink
Example installation setup: replaced zookeeper with kraft (#7609)
Browse files Browse the repository at this point in the history
* Example installation setup: replaced zookeeper with kraft

* Removed unused zk server based kafka configuration from unit tests
  • Loading branch information
arkadius authored Mar 4, 2025
1 parent 429a4dd commit 1e8cf03
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ abstract class FlinkWithKafkaSuite
.empty()
.withValue(
KafkaConfigProperties.bootstrapServersProperty("config"),
fromAnyRef(kafkaServerWithDependencies.kafkaAddress)
fromAnyRef(kafkaServer.bootstrapServers)
)
.withValue(
KafkaConfigProperties.property("config", "auto.offset.reset"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest1.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -59,7 +59,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest2.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -72,7 +72,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${outputTopicNameTest2.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -85,7 +85,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest3.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -98,7 +98,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${outputTopicNameTest3.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde
fixture.deployScenario(scenario)
}

kafkaServerWithDependencies.shutdownKafkaServer()
kafkaServer.shutdownKafkaServer()

eventually {
val jobStatuses = manager.getScenarioDeploymentsStatuses(name).futureValue.value
jobStatuses.map(_.status) shouldBe List(SimpleStateStatus.Restarting)
}

// We have to recreate kafka server because after shutdown it is unusable anymore
kafkaServerWithDependencies.recreateKafkaServer()
kafkaServerWithDependencies.startupKafkaServer()
kafkaServer.recreateKafkaServer()
kafkaServer.startupKafkaServer()

eventually {
manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List(
Expand Down
25 changes: 6 additions & 19 deletions examples/installation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,30 +127,17 @@ services:

### KAFKA-related services:

zookeeper:
image: bitnami/zookeeper:3.9
restart: unless-stopped
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: "ruok"
healthcheck:
test: [ "CMD-SHELL", 'echo "ruok" | nc -w 2 -q 2 localhost 2181 | grep imok' ]
interval: 5s
retries: 5
deploy:
resources:
limits:
memory: 256M

kafka:
image: bitnami/kafka:3.7.0
restart: unless-stopped
hostname: nu-kafka
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
depends_on:
zookeeper:
condition: service_healthy
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list" ]
interval: 10s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.kafka

import com.typesafe.scalalogging.LazyLogging
import kafka.server
import kafka.server.{KafkaRaftServer, KafkaServer, Server}
import kafka.server.{KafkaRaftServer, Server}
import kafka.tools.StorageTool
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.NullOutputStream
Expand All @@ -12,53 +12,37 @@ import org.apache.kafka.common.{IsolationLevel, Uuid}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import java.io.{File, PrintStream}
import java.net.InetSocketAddress
import java.nio.file.Files
import java.util.{Locale, Properties}
import scala.language.implicitConversions
import scala.util.control.NonFatal

// We should consider switching to KafkaClusterTestKit (https://github.com/apache/kafka/blob/3.6/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java),
// it's used by spring-kafka (https://github.com/spring-projects/spring-kafka/blob/3.1.x/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java).
object EmbeddedKafkaServerWithDependencies {

// TODO: Remove supprot for legacy zk setup
private val kRaftEnabled: Boolean = true
object EmbeddedKafkaKraftServer {

private val localhost: String = "127.0.0.1"

def run(
brokerPort: Int,
controllerPort: Int,
kafkaBrokerConfig: Map[String, String]
): EmbeddedKafkaServerWithDependencies = {
): EmbeddedKafkaKraftServer = {
val kafkaServerLogDir = Files.createTempDirectory("embeddedKafka").toFile
val clusterId = Uuid.randomUuid()
val kafkaConfig =
prepareKafkaServerConfig(brokerPort, controllerPort, kafkaServerLogDir, kafkaBrokerConfig, clusterId)
val kafkaServer = if (kRaftEnabled) {
new EmbeddedKafkaServerWithDependencies(
None,
() => {
prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId)
new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM)
},
s"$localhost:$brokerPort",
kafkaServerLogDir
)
} else {
val zk = createZookeeperServer(controllerPort)
new EmbeddedKafkaServerWithDependencies(
Some(zk),
() => new KafkaServer(kafkaConfig, time = Time.SYSTEM),
s"$localhost:$brokerPort",
kafkaServerLogDir
)
}
kafkaServer.startupKafkaServerAndDependencies()
val kafkaServer = new EmbeddedKafkaKraftServer(
() => {
prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId)
new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM)
},
s"$localhost:$brokerPort",
kafkaServerLogDir
)
kafkaServer.startupKafkaServer()
kafkaServer
}

Expand All @@ -70,20 +54,14 @@ object EmbeddedKafkaServerWithDependencies {
clusterId: Uuid
) = {
val properties = new Properties()
if (kRaftEnabled) {
properties.setProperty("node.id", "0")
properties.setProperty("process.roles", "broker,controller")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort")
properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
properties.setProperty("controller.listener.names", s"CONTROLLER")
properties.setProperty("inter.broker.listener.name", "PLAINTEXT")
properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort")
properties.setProperty("cluster.id", clusterId.toString)
} else {
properties.setProperty("broker.id", "0")
properties.setProperty("zookeeper.connect", s"$localhost:$controllerPort")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort")
}
properties.setProperty("node.id", "0")
properties.setProperty("process.roles", "broker,controller")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort")
properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
properties.setProperty("controller.listener.names", s"CONTROLLER")
properties.setProperty("inter.broker.listener.name", "PLAINTEXT")
properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort")
properties.setProperty("cluster.id", clusterId.toString)
properties.setProperty("num.partitions", "1")
properties.setProperty("group.initial.rebalance.delay.ms", "0")
properties.setProperty("offsets.topic.num.partitions", "1")
Expand Down Expand Up @@ -112,20 +90,11 @@ object EmbeddedKafkaServerWithDependencies {
)
}

private def createZookeeperServer(zkPort: Int) = {
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(localhost, zkPort), 1024)
val tempDir = Files.createTempDirectory("embeddedKafkaZk").toFile
val zkServer = new ZooKeeperServer(tempDir, tempDir, ZooKeeperServer.DEFAULT_TICK_TIME)
(factory, zkServer, tempDir)
}

}

class EmbeddedKafkaServerWithDependencies(
zooKeeperServerOpt: Option[(NIOServerCnxnFactory, ZooKeeperServer, File)],
class EmbeddedKafkaKraftServer(
createKafkaServer: () => Server,
val kafkaAddress: String,
val bootstrapServers: String,
kafkaServerLogDir: File
) extends LazyLogging {

Expand All @@ -136,25 +105,10 @@ class EmbeddedKafkaServerWithDependencies(
kafkaServer = createKafkaServer()
}

def startupKafkaServerAndDependencies(): Unit = {
zooKeeperServerOpt.foreach(t => t._1.startup(t._2))
startupKafkaServer()
}

def startupKafkaServer(): Unit = {
kafkaServer.startup()
}

def shutdownKafkaServerAndDependencies(): Unit = {
shutdownKafkaServer()
zooKeeperServerOpt.foreach { case (cnxnFactory, zkServer, zkTempDir) =>
cnxnFactory.shutdown()
// factory shutdown doesn't pass 'fullyShutDown' flag to ZkServer.shutdown, we need to explicitly close database
zkServer.getZKDatabase.close()
cleanDirectorySilently(zkTempDir)
}
}

def shutdownKafkaServer(): Unit = {
kafkaServer.shutdown()
kafkaServer.awaitShutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ import pl.touk.nussknacker.test.{AvailablePortFinder, KafkaConfigProperties, Wit

trait KafkaSpec extends BeforeAndAfterAll with WithConfig { self: Suite =>

var kafkaServerWithDependencies: EmbeddedKafkaServerWithDependencies = _
var kafkaClient: KafkaClient = _
val kafkaBrokerConfig = Map.empty[String, String]
var kafkaServer: EmbeddedKafkaKraftServer = _
var kafkaClient: KafkaClient = _
val kafkaBrokerConfig = Map.empty[String, String]

override protected def resolveConfig(config: Config): Config =
super
.resolveConfig(config)
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress))
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers))
// For tests we want to read from the beginning...
.withValue(KafkaConfigProperties.property("auto.offset.reset"), fromAnyRef("earliest"))

override protected def beforeAll(): Unit = {
super.beforeAll()
AvailablePortFinder.withAvailablePortsBlocked(2) {
case List(controllerPort, brokerPort) =>
kafkaServerWithDependencies = EmbeddedKafkaServerWithDependencies.run(
kafkaServer = EmbeddedKafkaKraftServer.run(
brokerPort = brokerPort,
controllerPort = controllerPort,
kafkaBrokerConfig = kafkaBrokerConfig
)
case _ => throw new MatchError(())
}
kafkaClient = new KafkaClient(kafkaAddress = kafkaServerWithDependencies.kafkaAddress, self.suiteName)
kafkaClient = new KafkaClient(kafkaAddress = kafkaServer.bootstrapServers, self.suiteName)
}

override protected def afterAll(): Unit = {
try {
kafkaClient.shutdown()
kafkaServerWithDependencies.shutdownKafkaServerAndDependencies()
kafkaServer.shutdownKafkaServer()
} finally {
super.afterAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait SchemaRegistryMixin extends AnyFunSuite with KafkaSpec with KafkaWithSchem
override protected def resolveConfig(config: Config): Config = {
super
.resolveConfig(config)
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress))
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers))
// schema.registry.url have to be defined even for MockSchemaRegistryClient
.withValue(KafkaConfigProperties.property("schema.registry.url"), fromAnyRef("not_used"))
// we turn off auto registration to do it on our own passing mocked schema registry client // meaningful only in Flink tests
Expand Down
3 changes: 0 additions & 3 deletions utils/test-utils/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
<logger name="com.spotify.docker.client.LoggingPullHandler" level="WARN"/>
<logger name="com.spotify.docker.client.LoggingBuildHandler" level="WARN"/>

<logger name="org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService" level="WARN"/>
<!-- in tests we don't have proper RocksDB memory config, so we disable warnings on that ... -->
<logger name="org.apache.flink.contrib.streaming.state.RocksDBOperationUtils" level="ERROR"/>

Expand All @@ -60,8 +59,6 @@
<!-- in tests we frequently encounter "Error while fetching metadata" WARNings when auto-creating topics -->
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="WARN"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>

<logger name="org.flywaydb.core.internal.sqlscript.DefaultSqlScriptExecutor" level="ERROR"/>
<logger name="org.apache.flink.metrics.MetricGroup" level="ERROR"/>
Expand Down

0 comments on commit 1e8cf03

Please sign in to comment.