Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Enabled URI parse/check of multiple kafka bootstrap hosts #265
Browse files Browse the repository at this point in the history
* enabled URI parse/check of multiple bootstrap hosts

* reading scheme from the URI instead of using KafkaContants.KAFKA_SCHEME

* removed -import com.sun.tools.javac.util.StringUtils
  • Loading branch information
cumafo authored Jan 21, 2022
1 parent 779ddf0 commit 6ff4e5c
Showing 1 changed file with 58 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import java.net.URI;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import biz.paluch.logging.gelf.intern.GelfSender;
Expand Down Expand Up @@ -65,28 +67,67 @@ public GelfSender create(GelfSenderConfiguration configuration) {
return new KafkaGelfSender(kafkaProducer, kafkaLogTopic, configuration.getErrorReporter());
}



private static String getBrokerServers(GelfSenderConfiguration configuration) {

URI uri = URI.create(configuration.getHost());
String brokers;

if (uri.getHost() != null) {
brokers = uri.getHost();
int port;
if (uri.getPort() > 0) {
port = uri.getPort();
} else if (configuration.getPort() > 0) {
port = configuration.getPort();
} else {
port = BROKER_DEFAULT_PORT;
// extract the host part from uri and put in an array
// so each host can be validated using the URI

// from https://docs.oracle.com/javase/7/docs/api/java/net/URI.html
// A hierarchical URI is subject to further parsing according to the syntax
// [scheme:][//authority][path][?query][#fragment]
String hconf = configuration.getHost();

// in order to arrive here, there have to be a kafka scheme
// get the scheme part
String scheme = URI.create(hconf).getScheme()+"://";
// and then begining of host
String hostsPart = hconf.substring( scheme.length() );

// hostsPart ends with either # or ?
int pos;
for(pos=0; pos<hostsPart.length(); pos++) {
switch(hostsPart.charAt(pos)) {
case '#':
case '?':
break;
default:
continue;
}
brokers += ":" + port;

} else {
brokers = uri.getAuthority();
break;
}
String suffix=hostsPart.substring(pos);
hostsPart=hostsPart.substring(0, pos);

String brokers = "";
String[] hosts = new String[0];
if (hostsPart.length()>0)
hosts = hostsPart.split(",");
if (hosts.length>0) for(String host: hosts) {
String broker;
String tmp = scheme + host + suffix;
URI uri = URI.create(tmp);
if (uri.getHost() != null) {
broker = uri.getHost();
int port;
if (uri.getPort() > 0) {
port = uri.getPort();
} else if (configuration.getPort() > 0) {
port = configuration.getPort();
} else {
port = BROKER_DEFAULT_PORT;
}
broker += ":" + port;

if (brokers == null || brokers.isEmpty()) {
} else {
broker = uri.getAuthority();
}
if (brokers.length()>0)
brokers += ",";
brokers += broker;
}
if (brokers.isEmpty()) {
throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers.");
}

Expand Down

0 comments on commit 6ff4e5c

Please sign in to comment.