From d52df57300b62ff35bf38fbf60e77010b2ffe77d Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 21 Jan 2022 09:16:13 +0100 Subject: [PATCH] Polishing #265 --- .../sender/KafkaGelfSenderProvider.java | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java index 14a8de9a..73dcb252 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.java @@ -3,11 +3,9 @@ import java.net.URI; import java.util.Map; import java.util.Properties; -import java.util.Scanner; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter; import org.apache.kafka.common.serialization.ByteArraySerializer; import biz.paluch.logging.gelf.intern.GelfSender; @@ -67,8 +65,6 @@ public GelfSender create(GelfSenderConfiguration configuration) { return new KafkaGelfSender(kafkaProducer, kafkaLogTopic, configuration.getErrorReporter()); } - - private static String getBrokerServers(GelfSenderConfiguration configuration) { // extract the host part from uri and put in an array @@ -81,52 +77,44 @@ private static String getBrokerServers(GelfSenderConfiguration configuration) { // in order to arrive here, there have to be a kafka scheme // get the scheme part - String scheme = URI.create(hconf).getScheme()+"://"; + String scheme = URI.create(hconf).getScheme() + "://"; // and then begining of host - String hostsPart = hconf.substring( scheme.length() ); + String hostsPart = hconf.substring(scheme.length()); // hostsPart ends with either # or ? - int pos; - for(pos=0; pos0) + if (hostsPart.length() > 0) hosts = hostsPart.split(","); - if (hosts.length>0) for(String host: hosts) { - String broker; - String tmp = scheme + host + suffix; - URI uri = URI.create(tmp); - if (uri.getHost() != null) { - broker = uri.getHost(); - int port; - if (uri.getPort() > 0) { - port = uri.getPort(); - } else if (configuration.getPort() > 0) { - port = configuration.getPort(); + if (hosts.length > 0) + for (String host : hosts) { + String broker; + String tmp = scheme + host + suffix; + URI uri = URI.create(tmp); + if (uri.getHost() != null) { + broker = uri.getHost(); + int port; + if (uri.getPort() > 0) { + port = uri.getPort(); + } else if (configuration.getPort() > 0) { + port = configuration.getPort(); + } else { + port = BROKER_DEFAULT_PORT; + } + broker += ":" + port; + } else { - port = BROKER_DEFAULT_PORT; + broker = uri.getAuthority(); } - broker += ":" + port; - - } else { - broker = uri.getAuthority(); + if (brokers.length() > 0) + brokers += ","; + brokers += broker; } - if (brokers.length()>0) - brokers += ","; - brokers += broker; - } if (brokers.isEmpty()) { throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers."); } @@ -134,6 +122,21 @@ private static String getBrokerServers(GelfSenderConfiguration configuration) { return brokers; } + private static int findHostPartEnd(String hostsPart) { + for (int pos = 0; pos < hostsPart.length(); pos++) { + switch (hostsPart.charAt(pos)) { + case '#': + case '?': + break; + default: + continue; + } + return pos; + } + + return hostsPart.length(); + } + private static String getTopic(URI uri) { String fragment = uri.getFragment(); @@ -144,4 +147,5 @@ private static String getTopic(URI uri) { return fragment; } + }