Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example installation setup: replaced zookeeper with kraft #7609

Merged
merged 2 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ abstract class FlinkWithKafkaSuite
.empty()
.withValue(
KafkaConfigProperties.bootstrapServersProperty("config"),
fromAnyRef(kafkaServerWithDependencies.kafkaAddress)
fromAnyRef(kafkaServer.bootstrapServers)
)
.withValue(
KafkaConfigProperties.property("config", "auto.offset.reset"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest1.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -59,7 +59,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest2.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -72,7 +72,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${outputTopicNameTest2.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -85,7 +85,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${inputTopicNameTest3.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand All @@ -98,7 +98,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite {
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = '${outputTopicNameTest3.name}',
| 'properties.bootstrap.servers' = '${kafkaServerWithDependencies.kafkaAddress}',
| 'properties.bootstrap.servers' = '${kafkaServer.bootstrapServers}',
| 'properties.group.id' = 'someConsumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde
fixture.deployScenario(scenario)
}

kafkaServerWithDependencies.shutdownKafkaServer()
kafkaServer.shutdownKafkaServer()

eventually {
val jobStatuses = manager.getScenarioDeploymentsStatuses(name).futureValue.value
jobStatuses.map(_.status) shouldBe List(SimpleStateStatus.Restarting)
}

// We have to recreate kafka server because after shutdown it is unusable anymore
kafkaServerWithDependencies.recreateKafkaServer()
kafkaServerWithDependencies.startupKafkaServer()
kafkaServer.recreateKafkaServer()
kafkaServer.startupKafkaServer()

eventually {
manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List(
Expand Down
25 changes: 6 additions & 19 deletions examples/installation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,30 +127,17 @@ services:

### KAFKA-related services:

zookeeper:
image: bitnami/zookeeper:3.9
restart: unless-stopped
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_4LW_COMMANDS_WHITELIST: "ruok"
healthcheck:
test: [ "CMD-SHELL", 'echo "ruok" | nc -w 2 -q 2 localhost 2181 | grep imok' ]
interval: 5s
retries: 5
deploy:
resources:
limits:
memory: 256M

kafka:
image: bitnami/kafka:3.7.0
restart: unless-stopped
hostname: nu-kafka
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
depends_on:
zookeeper:
condition: service_healthy
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list" ]
interval: 10s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.kafka

import com.typesafe.scalalogging.LazyLogging
import kafka.server
import kafka.server.{KafkaRaftServer, KafkaServer, Server}
import kafka.server.{KafkaRaftServer, Server}
import kafka.tools.StorageTool
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.NullOutputStream
Expand All @@ -12,53 +12,37 @@ import org.apache.kafka.common.{IsolationLevel, Uuid}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import java.io.{File, PrintStream}
import java.net.InetSocketAddress
import java.nio.file.Files
import java.util.{Locale, Properties}
import scala.language.implicitConversions
import scala.util.control.NonFatal

// We should consider switching to KafkaClusterTestKit (https://github.com/apache/kafka/blob/3.6/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java),
// it's used by spring-kafka (https://github.com/spring-projects/spring-kafka/blob/3.1.x/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java).
object EmbeddedKafkaServerWithDependencies {

// TODO: Remove supprot for legacy zk setup
private val kRaftEnabled: Boolean = true
object EmbeddedKafkaKraftServer {

private val localhost: String = "127.0.0.1"

def run(
brokerPort: Int,
controllerPort: Int,
kafkaBrokerConfig: Map[String, String]
): EmbeddedKafkaServerWithDependencies = {
): EmbeddedKafkaKraftServer = {
val kafkaServerLogDir = Files.createTempDirectory("embeddedKafka").toFile
val clusterId = Uuid.randomUuid()
val kafkaConfig =
prepareKafkaServerConfig(brokerPort, controllerPort, kafkaServerLogDir, kafkaBrokerConfig, clusterId)
val kafkaServer = if (kRaftEnabled) {
new EmbeddedKafkaServerWithDependencies(
None,
() => {
prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId)
new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM)
},
s"$localhost:$brokerPort",
kafkaServerLogDir
)
} else {
val zk = createZookeeperServer(controllerPort)
new EmbeddedKafkaServerWithDependencies(
Some(zk),
() => new KafkaServer(kafkaConfig, time = Time.SYSTEM),
s"$localhost:$brokerPort",
kafkaServerLogDir
)
}
kafkaServer.startupKafkaServerAndDependencies()
val kafkaServer = new EmbeddedKafkaKraftServer(
() => {
prepareRaftStorage(kafkaServerLogDir, kafkaConfig, clusterId)
new KafkaRaftServer(kafkaConfig, time = Time.SYSTEM)
},
s"$localhost:$brokerPort",
kafkaServerLogDir
)
kafkaServer.startupKafkaServer()
kafkaServer
}

Expand All @@ -70,20 +54,14 @@ object EmbeddedKafkaServerWithDependencies {
clusterId: Uuid
) = {
val properties = new Properties()
if (kRaftEnabled) {
properties.setProperty("node.id", "0")
properties.setProperty("process.roles", "broker,controller")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort")
properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
properties.setProperty("controller.listener.names", s"CONTROLLER")
properties.setProperty("inter.broker.listener.name", "PLAINTEXT")
properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort")
properties.setProperty("cluster.id", clusterId.toString)
} else {
properties.setProperty("broker.id", "0")
properties.setProperty("zookeeper.connect", s"$localhost:$controllerPort")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort")
}
properties.setProperty("node.id", "0")
properties.setProperty("process.roles", "broker,controller")
properties.setProperty("listeners", s"PLAINTEXT://$localhost:$brokerPort,CONTROLLER://$localhost:$controllerPort")
properties.setProperty("listener.security.protocol.map", s"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
properties.setProperty("controller.listener.names", s"CONTROLLER")
properties.setProperty("inter.broker.listener.name", "PLAINTEXT")
properties.setProperty("controller.quorum.voters", s"0@$localhost:$controllerPort")
properties.setProperty("cluster.id", clusterId.toString)
properties.setProperty("num.partitions", "1")
properties.setProperty("group.initial.rebalance.delay.ms", "0")
properties.setProperty("offsets.topic.num.partitions", "1")
Expand Down Expand Up @@ -112,20 +90,11 @@ object EmbeddedKafkaServerWithDependencies {
)
}

private def createZookeeperServer(zkPort: Int) = {
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(localhost, zkPort), 1024)
val tempDir = Files.createTempDirectory("embeddedKafkaZk").toFile
val zkServer = new ZooKeeperServer(tempDir, tempDir, ZooKeeperServer.DEFAULT_TICK_TIME)
(factory, zkServer, tempDir)
}

}

class EmbeddedKafkaServerWithDependencies(
zooKeeperServerOpt: Option[(NIOServerCnxnFactory, ZooKeeperServer, File)],
class EmbeddedKafkaKraftServer(
createKafkaServer: () => Server,
val kafkaAddress: String,
val bootstrapServers: String,
kafkaServerLogDir: File
) extends LazyLogging {

Expand All @@ -136,25 +105,10 @@ class EmbeddedKafkaServerWithDependencies(
kafkaServer = createKafkaServer()
}

def startupKafkaServerAndDependencies(): Unit = {
zooKeeperServerOpt.foreach(t => t._1.startup(t._2))
startupKafkaServer()
}

def startupKafkaServer(): Unit = {
kafkaServer.startup()
}

def shutdownKafkaServerAndDependencies(): Unit = {
shutdownKafkaServer()
zooKeeperServerOpt.foreach { case (cnxnFactory, zkServer, zkTempDir) =>
cnxnFactory.shutdown()
// factory shutdown doesn't pass 'fullyShutDown' flag to ZkServer.shutdown, we need to explicitly close database
zkServer.getZKDatabase.close()
cleanDirectorySilently(zkTempDir)
}
}

def shutdownKafkaServer(): Unit = {
kafkaServer.shutdown()
kafkaServer.awaitShutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ import pl.touk.nussknacker.test.{AvailablePortFinder, KafkaConfigProperties, Wit

trait KafkaSpec extends BeforeAndAfterAll with WithConfig { self: Suite =>

var kafkaServerWithDependencies: EmbeddedKafkaServerWithDependencies = _
var kafkaClient: KafkaClient = _
val kafkaBrokerConfig = Map.empty[String, String]
var kafkaServer: EmbeddedKafkaKraftServer = _
var kafkaClient: KafkaClient = _
val kafkaBrokerConfig = Map.empty[String, String]

override protected def resolveConfig(config: Config): Config =
super
.resolveConfig(config)
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress))
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers))
// For tests we want to read from the beginning...
.withValue(KafkaConfigProperties.property("auto.offset.reset"), fromAnyRef("earliest"))

override protected def beforeAll(): Unit = {
super.beforeAll()
AvailablePortFinder.withAvailablePortsBlocked(2) {
case List(controllerPort, brokerPort) =>
kafkaServerWithDependencies = EmbeddedKafkaServerWithDependencies.run(
kafkaServer = EmbeddedKafkaKraftServer.run(
brokerPort = brokerPort,
controllerPort = controllerPort,
kafkaBrokerConfig = kafkaBrokerConfig
)
case _ => throw new MatchError(())
}
kafkaClient = new KafkaClient(kafkaAddress = kafkaServerWithDependencies.kafkaAddress, self.suiteName)
kafkaClient = new KafkaClient(kafkaAddress = kafkaServer.bootstrapServers, self.suiteName)
}

override protected def afterAll(): Unit = {
try {
kafkaClient.shutdown()
kafkaServerWithDependencies.shutdownKafkaServerAndDependencies()
kafkaServer.shutdownKafkaServer()
} finally {
super.afterAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait SchemaRegistryMixin extends AnyFunSuite with KafkaSpec with KafkaWithSchem
override protected def resolveConfig(config: Config): Config = {
super
.resolveConfig(config)
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServerWithDependencies.kafkaAddress))
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef(kafkaServer.bootstrapServers))
// schema.registry.url have to be defined even for MockSchemaRegistryClient
.withValue(KafkaConfigProperties.property("schema.registry.url"), fromAnyRef("not_used"))
// we turn off auto registration to do it on our own passing mocked schema registry client // meaningful only in Flink tests
Expand Down
3 changes: 0 additions & 3 deletions utils/test-utils/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
<logger name="com.spotify.docker.client.LoggingPullHandler" level="WARN"/>
<logger name="com.spotify.docker.client.LoggingBuildHandler" level="WARN"/>

<logger name="org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService" level="WARN"/>
<!-- in tests we don't have proper RocksDB memory config, so we disable warnings on that ... -->
<logger name="org.apache.flink.contrib.streaming.state.RocksDBOperationUtils" level="ERROR"/>

Expand All @@ -60,8 +59,6 @@
<!-- in tests we frequently encounter "Error while fetching metadata" WARNings when auto-creating topics -->
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="WARN"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>

<logger name="org.flywaydb.core.internal.sqlscript.DefaultSqlScriptExecutor" level="ERROR"/>
<logger name="org.apache.flink.metrics.MetricGroup" level="ERROR"/>
Expand Down
Loading