Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Polishing #265
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 21, 2022
1 parent fa6b152 commit d52df57
Showing 1 changed file with 44 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import java.net.URI;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import biz.paluch.logging.gelf.intern.GelfSender;
Expand Down Expand Up @@ -67,8 +65,6 @@ public GelfSender create(GelfSenderConfiguration configuration) {
return new KafkaGelfSender(kafkaProducer, kafkaLogTopic, configuration.getErrorReporter());
}



private static String getBrokerServers(GelfSenderConfiguration configuration) {

// extract the host part from uri and put in an array
Expand All @@ -81,59 +77,66 @@ private static String getBrokerServers(GelfSenderConfiguration configuration) {

// in order to arrive here, there have to be a kafka scheme
// get the scheme part
String scheme = URI.create(hconf).getScheme()+"://";
String scheme = URI.create(hconf).getScheme() + "://";
// and then begining of host
String hostsPart = hconf.substring( scheme.length() );
String hostsPart = hconf.substring(scheme.length());

// hostsPart ends with either # or ?
int pos;
for(pos=0; pos<hostsPart.length(); pos++) {
switch(hostsPart.charAt(pos)) {
case '#':
case '?':
break;
default:
continue;
}
break;
}
String suffix=hostsPart.substring(pos);
hostsPart=hostsPart.substring(0, pos);
int pos = findHostPartEnd(hostsPart);

String suffix = hostsPart.substring(pos);
hostsPart = hostsPart.substring(0, pos);

String brokers = "";
String[] hosts = new String[0];
if (hostsPart.length()>0)
if (hostsPart.length() > 0)
hosts = hostsPart.split(",");
if (hosts.length>0) for(String host: hosts) {
String broker;
String tmp = scheme + host + suffix;
URI uri = URI.create(tmp);
if (uri.getHost() != null) {
broker = uri.getHost();
int port;
if (uri.getPort() > 0) {
port = uri.getPort();
} else if (configuration.getPort() > 0) {
port = configuration.getPort();
if (hosts.length > 0)
for (String host : hosts) {
String broker;
String tmp = scheme + host + suffix;
URI uri = URI.create(tmp);
if (uri.getHost() != null) {
broker = uri.getHost();
int port;
if (uri.getPort() > 0) {
port = uri.getPort();
} else if (configuration.getPort() > 0) {
port = configuration.getPort();
} else {
port = BROKER_DEFAULT_PORT;
}
broker += ":" + port;

} else {
port = BROKER_DEFAULT_PORT;
broker = uri.getAuthority();
}
broker += ":" + port;

} else {
broker = uri.getAuthority();
if (brokers.length() > 0)
brokers += ",";
brokers += broker;
}
if (brokers.length()>0)
brokers += ",";
brokers += broker;
}
if (brokers.isEmpty()) {
throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers.");
}

return brokers;
}

private static int findHostPartEnd(String hostsPart) {
for (int pos = 0; pos < hostsPart.length(); pos++) {
switch (hostsPart.charAt(pos)) {
case '#':
case '?':
break;
default:
continue;
}
return pos;
}

return hostsPart.length();
}

private static String getTopic(URI uri) {

String fragment = uri.getFragment();
Expand All @@ -144,4 +147,5 @@ private static String getTopic(URI uri) {

return fragment;
}

}

0 comments on commit d52df57

Please sign in to comment.