From 6ff4e5ca474e63e5339c7637ff6b12961b1f4d07 Mon Sep 17 00:00:00 2001 From: cumafo Date: Fri, 21 Jan 2022 09:14:31 +0100 Subject: [PATCH] Enabled URI parse/check of multiple kafka bootstrap hosts #265 * enabled URI parse/check of multiple bootstrap hosts * reading scheme from the URI instead of using KafkaContants.KAFKA_SCHEME * removed -import com.sun.tools.javac.util.StringUtils --- .../sender/KafkaGelfSenderProvider.java | 75 ++++++++++++++----- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java index 084892205..14a8de9a1 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java @@ -3,9 +3,11 @@ import java.net.URI; import java.util.Map; import java.util.Properties; +import java.util.Scanner; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter; import org.apache.kafka.common.serialization.ByteArraySerializer; import biz.paluch.logging.gelf.intern.GelfSender; @@ -65,28 +67,67 @@ public GelfSender create(GelfSenderConfiguration configuration) { return new KafkaGelfSender(kafkaProducer, kafkaLogTopic, configuration.getErrorReporter()); } + + private static String getBrokerServers(GelfSenderConfiguration configuration) { - URI uri = URI.create(configuration.getHost()); - String brokers; - - if (uri.getHost() != null) { - brokers = uri.getHost(); - int port; - if (uri.getPort() > 0) { - port = uri.getPort(); - } else if (configuration.getPort() > 0) { - port = configuration.getPort(); - } else { - port = BROKER_DEFAULT_PORT; + // extract the host part from uri and put in an array + // so each host can be validated using the URI + + // from https://docs.oracle.com/javase/7/docs/api/java/net/URI.html + // A hierarchical URI is subject to further parsing according to the syntax + // [scheme:][//authority][path][?query][#fragment] + String hconf = configuration.getHost(); + + // in order to arrive here, there have to be a kafka scheme + // get the scheme part + String scheme = URI.create(hconf).getScheme()+"://"; + // and then begining of host + String hostsPart = hconf.substring( scheme.length() ); + + // hostsPart ends with either # or ? + int pos; + for(pos=0; pos0) + hosts = hostsPart.split(","); + if (hosts.length>0) for(String host: hosts) { + String broker; + String tmp = scheme + host + suffix; + URI uri = URI.create(tmp); + if (uri.getHost() != null) { + broker = uri.getHost(); + int port; + if (uri.getPort() > 0) { + port = uri.getPort(); + } else if (configuration.getPort() > 0) { + port = configuration.getPort(); + } else { + port = BROKER_DEFAULT_PORT; + } + broker += ":" + port; - if (brokers == null || brokers.isEmpty()) { + } else { + broker = uri.getAuthority(); + } + if (brokers.length()>0) + brokers += ","; + brokers += broker; + } + if (brokers.isEmpty()) { throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers."); }