From 381e1ac6742c5d6ed3f6cd490d45bbcaf003d4fe Mon Sep 17 00:00:00 2001 From: Saad Khan Date: Tue, 4 Feb 2025 10:29:29 +0530 Subject: [PATCH 1/4] initial kafka integration changes Signed-off-by: Saad Khan --- .../minikube/kruize-crc-minikube.yaml | 2 + .../openshift/kruize-crc-openshift.yaml | 2 + pom.xml | 6 ++ .../operator/KruizeDeploymentInfo.java | 4 + .../com/autotune/utils/KruizeConstants.java | 10 ++ .../utils/kafka/KruizeKafkaConsumer.java | 64 ++++++++++++ .../utils/kafka/KruizeKafkaProducer.java | 97 +++++++++++++++++++ 7 files changed, 185 insertions(+) create mode 100644 src/main/java/com/autotune/utils/kafka/KruizeKafkaConsumer.java create mode 100644 src/main/java/com/autotune/utils/kafka/KruizeKafkaProducer.java diff --git a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml index 039520d1d..0ae3a371a 100644 --- a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml +++ b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml @@ -216,6 +216,8 @@ data: "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, + "isKafkaEnabled" : "true", + "kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", "hibernate": { "dialect": "org.hibernate.dialect.PostgreSQLDialect", "driver": "org.postgresql.Driver", diff --git a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml index 6aede9a10..7d21bdfd8 100644 --- a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml +++ b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml @@ -210,6 +210,8 @@ data: "experimentsURL" : "http://kruize.openshift-tuning.svc.cluster.local:8080/createExperiment", "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, + "isKafkaEnabled" : "true", + "kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", "hibernate": { "dialect": "org.hibernate.dialect.PostgreSQLDialect", "driver": "org.postgresql.Driver", diff --git a/pom.xml b/pom.xml index bd63d88f9..d13125295 100644 --- a/pom.xml +++ b/pom.xml @@ -210,6 +210,12 @@ micrometer-registry-prometheus ${micrometer-version} + + + org.apache.kafka + kafka-clients + 3.9.0 + diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index 3866724e8..b42824fb9 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -91,6 +91,10 @@ public class KruizeDeploymentInfo { private static KubeEventLogger kubeEventLogger; public static Boolean is_ros_enabled = false; public static String datasource_via_env = null; + public static Boolean is_kafka_enabled = false; + public static String kafka_bootstrap_servers = null; + public static String kafka_topic_inbound = System.getenv("INGRESS_KAFKA_TOPIC"); + public static String kafka_group_id = System.getenv("KAFKA_CONSUMER_GROUP_ID"); private KruizeDeploymentInfo() { diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index dc9d3f256..f4fcabe9d 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -947,4 +947,14 @@ private MetadataProfileErrorMsgs() { } } + + public static final class KAFKA_CONSTANTS { + public static final String RECOMMENDATIONS_TOPIC = "recommendations-topic"; + public static final String ERROR_TOPIC = "error-topic"; + public static final String SUMMARY_TOPIC = "summary-topic"; + + public static final String SUMMARY = "summary"; + public static final String EXPERIMENTS = "experiments"; + public static final String RECOMMENDATIONS = "recommendations"; + } } diff --git a/src/main/java/com/autotune/utils/kafka/KruizeKafkaConsumer.java b/src/main/java/com/autotune/utils/kafka/KruizeKafkaConsumer.java new file mode 100644 index 000000000..44052eb2f --- /dev/null +++ b/src/main/java/com/autotune/utils/kafka/KruizeKafkaConsumer.java @@ -0,0 +1,64 @@ +package com.autotune.utils.kafka; + +import com.autotune.operator.KruizeDeploymentInfo; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.Scanner; + +//TODO: This class is not being used for now, will be updated later +public class KruizeKafkaConsumer implements Runnable { + private static KafkaConsumer consumer; + private static final Logger LOGGER = LoggerFactory.getLogger(KruizeKafkaConsumer.class); + + @Override + public void run() { + + // Flag to control the loop and terminate when needed + boolean continueListening = true; + + try { + consumer = getKafkaConsumerConfig(); + consumer.subscribe(java.util.Collections.singletonList(KruizeDeploymentInfo.kafka_topic_inbound)); + while (continueListening) { + consumer.poll(java.time.Duration.ofMillis(100)).forEach(record -> { + LOGGER.info("Received Recommendation: JobID={}, Value={}, Partition={}, Offset={}", + record.key(), record.value(), record.partition(), record.offset()); + }); + if (isTerminationSignalReceived()) { + continueListening = false; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private KafkaConsumer getKafkaConsumerConfig() { + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.kafka_bootstrap_servers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, KruizeDeploymentInfo.kafka_group_id); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + return new KafkaConsumer<>(consumerProps); + } + + // Shutdown hook for the consumer + private static void addConsumerShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (consumer != null) { + consumer.close(); + } + })); + } + + private static boolean isTerminationSignalReceived() { + Scanner scanner = new Scanner(System.in); + return scanner.hasNext(); // This will return true if any input arrives + } +} diff --git a/src/main/java/com/autotune/utils/kafka/KruizeKafkaProducer.java b/src/main/java/com/autotune/utils/kafka/KruizeKafkaProducer.java new file mode 100644 index 000000000..05d0b71ca --- /dev/null +++ b/src/main/java/com/autotune/utils/kafka/KruizeKafkaProducer.java @@ -0,0 +1,97 @@ +package com.autotune.utils.kafka; + +import com.autotune.operator.KruizeDeploymentInfo; +import com.autotune.utils.KruizeConstants; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class KruizeKafkaProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(KruizeKafkaProducer.class); + + // Singleton Kafka Producer Instance + private static final KafkaProducer producer = new KafkaProducer<>(getProducerProperties()); + + // Get Kafka producer properties + private static Properties getProducerProperties() { + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.kafka_bootstrap_servers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + return producerProps; + } + + // Kafka message producer + private static void sendMessage(String topic, String payload) { + try { + RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, payload)) + .get(5, TimeUnit.SECONDS); //todo : set the timeout value via ENV + // todo: get the status of message whether its delivered or failed + LOGGER.debug("Message sent successfully to topic {} at partition {} and offset {}", + metadata.topic(), metadata.partition(), metadata.offset()); + } catch (TimeoutException te) { + LOGGER.error("Kafka timeout while sending message to topic {}: {}", topic, te.getMessage()); + } catch (Exception e) { + LOGGER.error("Error sending message to Kafka topic {}: {}", topic, e.getMessage(), e); + } + } + + // Send valid recommendation messages + public static class ValidRecommendationMessageProducer implements Runnable { + private final String payload; + + public ValidRecommendationMessageProducer(String payload) { + this.payload = payload; + } + + @Override + public void run() { + sendMessage(KruizeConstants.KAFKA_CONSTANTS.RECOMMENDATIONS_TOPIC, payload); + } + } + + // Send error messages + public static class ErrorMessageProducer implements Runnable { + private final String errorDetails; + + public ErrorMessageProducer(String errorDetails) { + this.errorDetails = errorDetails; + } + + @Override + public void run() { + sendMessage(KruizeConstants.KAFKA_CONSTANTS.ERROR_TOPIC, errorDetails); + } + } + + // Send summary messages + public static class SummaryResponseMessageProducer implements Runnable { + private final String payload; + + public SummaryResponseMessageProducer(String payload) { + this.payload = payload; + } + + @Override + public void run() { + sendMessage(KruizeConstants.KAFKA_CONSTANTS.SUMMARY_TOPIC, payload); + } + } + + // Close the Kafka producer + public static void close() { + if (producer != null) { + producer.close(); + LOGGER.info("Kafka producer closed."); + } + } +} From ba3f5cf8db93d815356ee38d0014362aace9ae60 Mon Sep 17 00:00:00 2001 From: Saad Khan Date: Wed, 5 Feb 2025 11:06:49 +0530 Subject: [PATCH 2/4] add design doc Signed-off-by: Saad Khan --- design/KafkaDesign.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 design/KafkaDesign.md diff --git a/design/KafkaDesign.md b/design/KafkaDesign.md new file mode 100644 index 000000000..bdd93fb1e --- /dev/null +++ b/design/KafkaDesign.md @@ -0,0 +1,27 @@ +# Kafka Documentation + +Kruize has a BulkService feature which designed to streamline the detection, management, and optimization of containerized environments. It relies on the REST APIs without a message broker, which introduces latency when handling multiple parallel requests. +Kafka replaces this existing REST API service for faster, asynchronous communication and to ensure seamless handling of recommendations at scale. + +Kruize Kafka Producer internally uses the BulkService and publishes recommendations in the form of message in three topics namely, `recommendations-topic`, `error-topic` and `summary-topic`. + +## Kafka Flow + +1. Currently, Kruize Kafka Module works as a Producer only i.e. to invoke the Kafka Service user needs to hit a REST API POST request with the same input as the one for the BulkService. +2. On receiving the request, BulkService will return the `job_id` back and in the background starts the following tasks: + - First, does a handshake with the datasource. + - Using queries, it fetches the list of namespaces, workloads, containers of the connected datasource. + - Creates experiments, one for each container *alpha release. + - Triggers `generateRecommendations` for each container. + - Once the above step returns a success response, Kafka Producer is invoked and the recommendations are pushed into the `recommendations-topic` + - If at any of the above steps, the service fails be it while fetching metadata, creating experiment or generating recommendations, Kafka Producer is invoked to publish the error response in the `error-topic`. + - Once all experiments are created, and recommendations are generated, the system marks the `job_id` as "COMPLETED". + - Once the job is completed, Kafka Producer pushes the summary of all the experiments into the `summary-topic`. + +## Specifications + - User needs to hit a REST API POST request with a Payload as mentioned in the BulkAPI doc. For details, kindly refer to the [BulkAPI](BulkAPI.md) design doc. + - Kafka needs to be installed locally or in a cluster, and it's corresponding Bootstrap server URL should be added in the crc file based on your cluster. +Example: + - `"kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"` + - Consumer needs to be subscribed to the `recommendations-topic` to get the recommendations. + - Subscribing to the `error-topic` and the `summary-topic` is optional From c721a3c23b4e4b253fb83e7eac34f7d351d973fb Mon Sep 17 00:00:00 2001 From: Saad Khan Date: Thu, 6 Feb 2025 11:46:49 +0530 Subject: [PATCH 3/4] add examples in the design doc Signed-off-by: Saad Khan --- design/KafkaDesign.md | 395 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 394 insertions(+), 1 deletion(-) diff --git a/design/KafkaDesign.md b/design/KafkaDesign.md index bdd93fb1e..fa6547d31 100644 --- a/design/KafkaDesign.md +++ b/design/KafkaDesign.md @@ -22,6 +22,399 @@ Kruize Kafka Producer internally uses the BulkService and publishes recommendati - User needs to hit a REST API POST request with a Payload as mentioned in the BulkAPI doc. For details, kindly refer to the [BulkAPI](BulkAPI.md) design doc. - Kafka needs to be installed locally or in a cluster, and it's corresponding Bootstrap server URL should be added in the crc file based on your cluster. Example: - - `"kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"` + - `"kafkaBootstrapServers" : "..svc.cluster.local:9092"` - Consumer needs to be subscribed to the `recommendations-topic` to get the recommendations. - Subscribing to the `error-topic` and the `summary-topic` is optional + +## Examples + +**Request Payload (JSON):** + +```json +{ + "filter": { + "exclude": { + "namespace": [ + "openshift-.*" + ], + "workload": [], + "containers": [], + "labels": { + "org_id": "ABCOrga", + "source_id": "ZZZ", + "cluster_id": "ABG" + } + }, + "include": { + "namespace": [ + "openshift-tuning" + ], + "workload": [], + "containers": [], + "labels": { + "org_id": "ABCOrga", + "source_id": "ZZZ", + "cluster_id": "ABG" + } + } + }, + "time_range": { + "start": "2024-11-01T00:00:00.000Z", + "end": "2024-11-15T23:59:59.000Z" + }, + "datasource": "Cbank1Xyz", + "experiment_types": [ + "container", + "namespace" + ] +} +``` + +**recommendations-topic:** + +```json +{ + "summary": { + "status": "COMPLETED", + "job_id": "65603c30-64ee-4bd2-85db-8328425c3b09" + }, + "experiments": { + "prometheus-1|default|cadvisor|cadvisor(daemonset)|cadvisor": { + "name": "prometheus-1|default|cadvisor|cadvisor(daemonset)|cadvisor", + "status": "PROCESSED", + "apis": { + "recommendations": { + "response": [ + { + "cluster_name": "default", + "experiment_type": "container", + "kubernetes_objects": [ + { + "type": "daemonset", + "name": "cadvisor", + "namespace": "cadvisor", + "containers": [ + { + "container_image_name": "gcr.io/cadvisor/cadvisor:v0.45.0", + "container_name": "cadvisor", + "recommendations": { + "version": "1.0", + "notifications": { + "111000": { + "type": "info", + "message": "Recommendations Are Available", + "code": 111000 + } + }, + "data": { + "2025-01-27T11:28:24.000Z": { + "notifications": { }, + "monitoring_end_time": "2025-01-27T11:28:24.000Z", + "current": {}, + "recommendation_terms": { + "short_term": { + "duration_in_hours": 24, + "notifications": { + "112101": { + "type": "info", + "message": "Cost Recommendations Available", + "code": 112101 + }, + "112102": { + "type": "info", + "message": "Performance Recommendations Available", + "code": 112102 + } + }, + "monitoring_start_time": "2025-01-26T11:28:24.000Z", + "recommendation_engines": { + "cost": { + "pods_count": 1, + "confidence_level": 0, + "config": { + "limits": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + }, + "requests": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + } + }, + "variation": { + "limits": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + }, + "requests": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + } + }, + "notifications": {} + }, + "performance": { + "pods_count": 1, + "confidence_level": 0, + "config": { + "limits": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + }, + "requests": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + } + }, + "variation": { + "limits": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + }, + "requests": { + "memory": { + "amount": 136651161.6, + "format": "bytes" + }, + "cpu": { + "amount": 0.22053964085707395, + "format": "cores" + } + } + }, + "notifications": {} + } + }, + "plots": { + "datapoints": 4, + "plots_data": { + "2025-01-27T11:28:24.000Z": { + "cpuUsage": { + "min": 0.08396913905943296, + "q1": 0.18669564827597168, + "median": 0.19308384827591732, + "q3": 0.2056331399785098, + "max": 0.22053964085707395, + "format": "cores" + }, + "memoryUsage": { + "min": 104177664, + "q1": 116555776, + "median": 121192448, + "q3": 122347520, + "max": 123654144, + "format": "bytes" + } + }, + "2025-01-26T23:28:24.000Z": {}, + "2025-01-26T17:28:24.000Z": {}, + "2025-01-27T05:28:24.000Z": { + "cpuUsage": { + "min": 0.09123101016771845, + "q1": 0.1949451069246897, + "median": 0.19694082169152047, + "q3": 0.19694082169152047, + "max": 0.21236305529661514, + "format": "cores" + }, + "memoryUsage": { + "min": 111341568, + "q1": 121679872, + "median": 121905152, + "q3": 121905152, + "max": 123506688, + "format": "bytes" + } + } + } + } + }, + "medium_term": { + "duration_in_hours": 168, + "notifications": { + "120001": { + "type": "info", + "message": "There is not enough data available to generate a recommendation.", + "code": 120001 + } + } + }, + "long_term": { + "duration_in_hours": 360, + "notifications": { + "120001": { + "type": "info", + "message": "There is not enough data available to generate a recommendation.", + "code": 120001 + } + } + } + } + } + } + } + } + ] + } + ], + "version": "v2.0", + "experiment_name": "prometheus-1|default|cadvisor|cadvisor(daemonset)|cadvisor" + } + ] + } + }, + "status_history": [ + { + "status": "UNPROCESSED", + "timestamp": "2025-01-27T11:18:24.056Z" + }, + { + "status": "PROCESSED", + "timestamp": "2025-01-27T11:18:27.012Z" + } + ] + } + } +} + +``` + +**error-topic:** + +```json +{ + "summary": { + "status": "IN_PROGRESS", + "job_id": "c3491f51-2f22-4c3d-af17-3636fac185eb" + }, + "experiments": { + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28975950(job)|collect-profiles": { + "name": "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28975950(job)|collect-profiles", + "status": "FAILED", + "apis": { + "create": { + "response": { + "message": "Not Found: performance_profile does not exist: resource-optimization-local-monitoring", + "httpcode": 400, + "documentationLink": "", + "status": "ERROR" + }, + "request": { + "apiVersion": "v2.0", + "experimentName": "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28975950(job)|collect-profiles", + "clusterName": "default", + "performanceProfile": "resource-optimization-local-monitoring", + "sloInfo": null, + "mode": "monitor", + "targetCluster": "local", + "trialSettings": { + "measurement_durationMinutes": "15min", + "measurement_durationMinutes_inDouble": 15 + }, + "recommendationSettings": { + "threshold": 0.1 + }, + "datasource": "prometheus-1", + "experimentType": "CONTAINER", + "status": "IN_PROGRESS", + "experiment_id": "acbc6d44b32e94e2ba55968478c31fce3ee62bdb975f616cc3df149d733917ce", + "validationData": null, + "kubernetesObjects": [ + { + "type": "job", + "name": "collect-profiles-28975950", + "namespace": "openshift-operator-lifecycle-manager", + "containers": [ + { + "container_image_name": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:82ff155c5e7118a86952f86cba21da8e249f74f0a8f1ac0f2161e2bc1e3b3dbf", + "container_name": "collect-profiles", + "metrics": null, + "recommendations": null + } + ], + "namespaces": null + } + ], + "namespaceExperiment": false, + "containerExperiment": true + } + }, + "recommendations": { + "response": null + } + }, + "status_history": [ + { + "status": "UNPROCESSED", + "timestamp": "2025-02-03T04:32:43.588Z" + }, + { + "status": "FAILED", + "timestamp": "2025-02-03T04:32:43.593Z" + } + ] + } + } +} +``` + +**summary-topic:** + +```json +{ + "summary": { + "status": "COMPLETED", + "total_experiments": 23, + "processed_experiments": 23, + "job_id": "54905959-77d4-42ba-8e06-90bb97b823b9", + "job_start_time": "2024-10-10T06:07:09.066Z", + "job_end_time": "2024-10-10T06:07:17.471Z" + } +} + +``` + + + From 87b544c7e07deb22f683d6c124a0188f7b06d836 Mon Sep 17 00:00:00 2001 From: Saad Khan Date: Mon, 10 Feb 2025 11:56:34 +0530 Subject: [PATCH 4/4] move kafka server value to env and corresponding code changes Signed-off-by: Saad Khan --- .../minikube/kruize-crc-minikube.yaml | 3 ++- .../openshift/kruize-crc-openshift.yaml | 3 ++- src/main/java/com/autotune/Autotune.java | 5 +++++ .../java/com/autotune/operator/KruizeDeploymentInfo.java | 2 +- src/main/java/com/autotune/utils/KruizeConstants.java | 2 ++ 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml index 0ae3a371a..00a1bfada 100644 --- a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml +++ b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml @@ -217,7 +217,6 @@ data: "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, "isKafkaEnabled" : "true", - "kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", "hibernate": { "dialect": "org.hibernate.dialect.PostgreSQLDialect", "driver": "org.postgresql.Driver", @@ -298,6 +297,8 @@ spec: value: "/etc/config/kruizeconfigjson" - name: JAVA_TOOL_OPTIONS value: "-XX:MaxRAMPercentage=80" + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" ports: - name: kruize-port containerPort: 8080 diff --git a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml index 7d21bdfd8..9754d7975 100644 --- a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml +++ b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml @@ -211,7 +211,6 @@ data: "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, "isKafkaEnabled" : "true", - "kafkaBootstrapServers" : "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", "hibernate": { "dialect": "org.hibernate.dialect.PostgreSQLDialect", "driver": "org.postgresql.Driver", @@ -363,6 +362,8 @@ spec: value: "/etc/config/kruizeconfigjson" - name: JAVA_TOOL_OPTIONS value: "-XX:MaxRAMPercentage=80" + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kruize-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" resources: requests: memory: "350Mi" diff --git a/src/main/java/com/autotune/Autotune.java b/src/main/java/com/autotune/Autotune.java index 5701663f9..fc23839cf 100644 --- a/src/main/java/com/autotune/Autotune.java +++ b/src/main/java/com/autotune/Autotune.java @@ -181,6 +181,11 @@ public static void main(String[] args) { AutoscalingSettings.getInstance().initialiseAutoscalingSettings(); try { + // check if kafka flag is enabled and the corresponding server details are added + if (KruizeDeploymentInfo.is_kafka_enabled && KruizeDeploymentInfo.kafka_bootstrap_servers == null) { + LOGGER.error(KruizeConstants.KAFKA_CONSTANTS.BOOTSTRAP_SERVER_MISSING); + throw new IllegalStateException(KruizeConstants.KAFKA_CONSTANTS.BOOTSTRAP_SERVER_MISSING); + } String startAutotune = System.getenv("START_AUTOTUNE"); if (startAutotune == null || startAutotune.equalsIgnoreCase("true")) { server.start(); diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index b42824fb9..0eb358f48 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -92,7 +92,7 @@ public class KruizeDeploymentInfo { public static Boolean is_ros_enabled = false; public static String datasource_via_env = null; public static Boolean is_kafka_enabled = false; - public static String kafka_bootstrap_servers = null; + public static String kafka_bootstrap_servers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");; public static String kafka_topic_inbound = System.getenv("INGRESS_KAFKA_TOPIC"); public static String kafka_group_id = System.getenv("KAFKA_CONSUMER_GROUP_ID"); diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 81aba360a..049bdd12d 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -966,5 +966,7 @@ public static final class KAFKA_CONSTANTS { public static final String SUMMARY = "summary"; public static final String EXPERIMENTS = "experiments"; public static final String RECOMMENDATIONS = "recommendations"; + + public static final String BOOTSTRAP_SERVER_MISSING = "Missing required environment variable: KAFKA_BOOTSTRAP_SERVERS"; } }