From ebbce8c141fbfbde5f09c5af81ddb57c3edc501a Mon Sep 17 00:00:00 2001 From: Quinten Parker Date: Tue, 14 Jan 2025 10:47:42 -0800 Subject: [PATCH] Fix error handling and partition config bugs in KafkaMessagingProvider.ensureTopic --- .../connector/kafka/KafkaMessagingProvider.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala index f61f6ec7a6e..dd388952708 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala @@ -41,6 +41,8 @@ case class KafkaConfig(replicationFactor: Short, consumerLagCheckInterval: Finit object KafkaMessagingProvider extends MessagingProvider { import KafkaConfiguration._ + private val topicPartitionsConfigKey = "partitions" + def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)( implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer = @@ -64,12 +66,13 @@ object KafkaMessagingProvider extends MessagingProvider { Try(AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))) .flatMap(client => { - val partitions = topicConfig.getOrElse("partitions", "1").toInt - val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) + val partitions = topicConfig.getOrElse(topicPartitionsConfigKey, "1").toInt + val safeTopicConfig = topicConfig - topicPartitionsConfigKey + val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(safeTopicConfig.asJava) def createTopic(retries: Int = 5): Try[Unit] = { Try(client.listTopics().names().get()) - .map(topics => + .flatMap(topics => if (topics.contains(topic)) { Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation.")) } else {