Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Streams fire event after created and before scheduling the start #38705

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
Expand All @@ -44,10 +43,9 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Identifier;

/**
Expand All @@ -64,12 +62,16 @@ public class KafkaStreamsProducer {
private static volatile boolean shutdown = false;

private final ExecutorService executorService;
private final StreamsConfig streamsConfig;
private final KafkaStreams kafkaStreams;
private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
private final Admin kafkaAdminClient;
private final Duration topicsTimeout;
private final List<String> trimmedTopics;

@Inject
public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig,
ExecutorService executorService,
Instance<Topology> topology, Instance<KafkaClientSupplier> kafkaClientSupplier,
@Identifier("default-kafka-broker") Instance<Map<String, Object>> defaultConfiguration,
Instance<StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener,
Expand All @@ -79,9 +81,12 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
if (topology.isUnsatisfied()) {
LOGGER.warn("No Topology producer; Kafka Streams will not be started");
this.executorService = null;
this.streamsConfig = null;
this.kafkaStreams = null;
this.kafkaStreamsTopologyManager = null;
this.kafkaAdminClient = null;
this.topicsTimeout = null;
this.trimmedTopics = null;
return;
}

Expand All @@ -101,33 +106,53 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
runtimeConfig);
this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties));

this.executorService = Executors.newSingleThreadExecutor();
this.executorService = executorService;

this.kafkaStreams = initializeKafkaStreams(kafkaStreamsProperties, runtimeConfig, kafkaAdminClient, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener,
executorService);
this.topicsTimeout = runtimeConfig.topicsTimeout;
this.trimmedTopics = runtimeConfig.getTrimmedTopics();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient);
}

@PostConstruct
public void postConstruct() {
public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
if (kafkaStreams != null) {
Arc.container().beanManager().getEvent().select(KafkaStreams.class).fire(kafkaStreams);
kafkaStreamsEvent.fire(kafkaStreams);
executorService.execute(() -> {
if (topicsTimeout.compareTo(Duration.ZERO) > 0) {
try {
waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!shutdown) {
LOGGER.debug("Starting Kafka Streams pipeline");
kafkaStreams.start();
}
});
}
}

@Produces
@Singleton
@Unremovable
@Startup
public KafkaStreams getKafkaStreams() {
return kafkaStreams;
}

@Produces
@Singleton
@Unremovable
@Startup
public StreamsConfig getStreamsConfig() {
return streamsConfig;
}

@Produces
@Singleton
@Unremovable
public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
return kafkaStreamsTopologyManager;
}
Expand All @@ -146,16 +171,15 @@ void onStop(@Observes ShutdownEvent event) {
}
}

private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProperties,
KafkaStreamsRuntimeConfig runtimeConfig, Admin adminClient, Topology topology,
private static KafkaStreams initializeKafkaStreams(StreamsConfig streamsConfig, Topology topology,
Instance<KafkaClientSupplier> kafkaClientSupplier,
Instance<StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener,
Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener, ExecutorService executorService) {
Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener) {
KafkaStreams kafkaStreams;
if (kafkaClientSupplier.isUnsatisfied()) {
kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties);
kafkaStreams = new KafkaStreams(topology, streamsConfig);
} else {
kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties, kafkaClientSupplier.get());
kafkaStreams = new KafkaStreams(topology, streamsConfig, kafkaClientSupplier.get());
}

if (!stateListener.isUnsatisfied()) {
Expand All @@ -168,21 +192,6 @@ private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProper
kafkaStreams.setUncaughtExceptionHandler(uncaughtExceptionHandlerListener.get());
}

executorService.execute(() -> {
if (runtimeConfig.topicsTimeout.compareTo(Duration.ZERO) > 0) {
try {
waitForTopicsToBeCreated(adminClient, runtimeConfig.getTrimmedTopics(), runtimeConfig.topicsTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!shutdown) {
LOGGER.debug("Starting Kafka Streams pipeline");
kafkaStreams.start();
}
});

return kafkaStreams;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class KafkaStreamsRuntimeConfig {
/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9012";
public static final String DEFAULT_KAFKA_BROKER = "localhost:9092";

/**
* A unique identifier for this Kafka Streams application.
Expand All @@ -27,7 +27,7 @@ public class KafkaStreamsRuntimeConfig {

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}.
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}.
*/
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import jakarta.enterprise.event.Observes;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

@ApplicationScoped
public class KafkaStreamsEventCounter {

LongAdder eventCount = new LongAdder();

void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams) {
void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams, StreamsConfig streamsConfig) {
assert kafkaStreams.state() == KafkaStreams.State.CREATED;
assert streamsConfig != null;
eventCount.increment();
}

Expand Down
Loading