Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. Kafka: Initial integration changes #1492

Open
wants to merge 3 commits into
base: mvp_demo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
420 changes: 420 additions & 0 deletions design/KafkaDesign.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ data:
"experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment",
"experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%",
"bulkapilimit" : 1000,
"isKafkaEnabled" : "true",
"kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to set via env variable ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create issue to address following
Client Validation Flow:
Authentication: Broker checks client credentials (SASL or TLS certs).
Authorization: Broker verifies the client has the necessary ACLs.
Data Integrity: TLS encryption protects the messages during transmission.

Please verify if this configuration is getting matched with ROS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #1497 to track this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khansaad Are you addressing setting the kafka bootstrap server using env in this PR?

"hibernate": {
"dialect": "org.hibernate.dialect.PostgreSQLDialect",
"driver": "org.postgresql.Driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ data:
"experimentsURL" : "http://kruize.openshift-tuning.svc.cluster.local:8080/createExperiment",
"experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%",
"bulkapilimit" : 1000,
"isKafkaEnabled" : "true",
"kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
"hibernate": {
"dialect": "org.hibernate.dialect.PostgreSQLDialect",
"driver": "org.postgresql.Driver",
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>

</dependencies>
<build>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/autotune/operator/KruizeDeploymentInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public class KruizeDeploymentInfo {
private static KubeEventLogger kubeEventLogger;
public static Boolean is_ros_enabled = false;
public static String datasource_via_env = null;
public static Boolean is_kafka_enabled = false;
public static String kafka_bootstrap_servers = null;
public static String kafka_topic_inbound = System.getenv("INGRESS_KAFKA_TOPIC");
public static String kafka_group_id = System.getenv("KAFKA_CONSUMER_GROUP_ID");


private KruizeDeploymentInfo() {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/autotune/utils/KruizeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -947,4 +947,14 @@ private MetadataProfileErrorMsgs() {
}

}

public static final class KAFKA_CONSTANTS {
public static final String RECOMMENDATIONS_TOPIC = "recommendations-topic";
public static final String ERROR_TOPIC = "error-topic";
public static final String SUMMARY_TOPIC = "summary-topic";

public static final String SUMMARY = "summary";
public static final String EXPERIMENTS = "experiments";
public static final String RECOMMENDATIONS = "recommendations";
}
}
64 changes: 64 additions & 0 deletions src/main/java/com/autotune/utils/kafka/KruizeKafkaConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.autotune.utils.kafka;

import com.autotune.operator.KruizeDeploymentInfo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.Scanner;

//TODO: This class is not being used for now, will be updated later
public class KruizeKafkaConsumer implements Runnable {
private static KafkaConsumer<String, String> consumer;
private static final Logger LOGGER = LoggerFactory.getLogger(KruizeKafkaConsumer.class);

@Override
public void run() {

// Flag to control the loop and terminate when needed
boolean continueListening = true;

try {
consumer = getKafkaConsumerConfig();
consumer.subscribe(java.util.Collections.singletonList(KruizeDeploymentInfo.kafka_topic_inbound));
while (continueListening) {
consumer.poll(java.time.Duration.ofMillis(100)).forEach(record -> {
LOGGER.info("Received Recommendation: JobID={}, Value={}, Partition={}, Offset={}",
record.key(), record.value(), record.partition(), record.offset());
});
if (isTerminationSignalReceived()) {
continueListening = false;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

private KafkaConsumer<String, String> getKafkaConsumerConfig() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.kafka_bootstrap_servers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, KruizeDeploymentInfo.kafka_group_id);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

return new KafkaConsumer<>(consumerProps);
}

// Shutdown hook for the consumer
private static void addConsumerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (consumer != null) {
consumer.close();
}
}));
}

private static boolean isTerminationSignalReceived() {
Scanner scanner = new Scanner(System.in);
return scanner.hasNext(); // This will return true if any input arrives
}
}
97 changes: 97 additions & 0 deletions src/main/java/com/autotune/utils/kafka/KruizeKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.autotune.utils.kafka;

import com.autotune.operator.KruizeDeploymentInfo;
import com.autotune.utils.KruizeConstants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class KruizeKafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KruizeKafkaProducer.class);

// Singleton Kafka Producer Instance
private static final KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());

// Get Kafka producer properties
private static Properties getProducerProperties() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.kafka_bootstrap_servers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
return producerProps;
}

// Kafka message producer
private static void sendMessage(String topic, String payload) {
try {
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, payload))
.get(5, TimeUnit.SECONDS); //todo : set the timeout value via ENV
// todo: get the status of message whether its delivered or failed
LOGGER.debug("Message sent successfully to topic {} at partition {} and offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (TimeoutException te) {
LOGGER.error("Kafka timeout while sending message to topic {}: {}", topic, te.getMessage());
} catch (Exception e) {
LOGGER.error("Error sending message to Kafka topic {}: {}", topic, e.getMessage(), e);
}
}

// Send valid recommendation messages
public static class ValidRecommendationMessageProducer implements Runnable {
private final String payload;

public ValidRecommendationMessageProducer(String payload) {
this.payload = payload;
}

@Override
public void run() {
sendMessage(KruizeConstants.KAFKA_CONSTANTS.RECOMMENDATIONS_TOPIC, payload);
}
}

// Send error messages
public static class ErrorMessageProducer implements Runnable {
private final String errorDetails;

public ErrorMessageProducer(String errorDetails) {
this.errorDetails = errorDetails;
}

@Override
public void run() {
sendMessage(KruizeConstants.KAFKA_CONSTANTS.ERROR_TOPIC, errorDetails);
}
}

// Send summary messages
public static class SummaryResponseMessageProducer implements Runnable {
private final String payload;

public SummaryResponseMessageProducer(String payload) {
this.payload = payload;
}

@Override
public void run() {
sendMessage(KruizeConstants.KAFKA_CONSTANTS.SUMMARY_TOPIC, payload);
}
}

// Close the Kafka producer
public static void close() {
if (producer != null) {
producer.close();
LOGGER.info("Kafka producer closed.");
}
}
}