Skip to content

Commit

Permalink
[PIP 95][Issue 12040][broker] Improved multi-listener in standalone m…
Browse files Browse the repository at this point in the history
…ode (#12066)
  • Loading branch information
EronWright authored Oct 12, 2021
1 parent fbf073b commit 2c09593
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -299,11 +300,12 @@ public void start() throws Exception {
broker.start();

final String cluster = config.getClusterName();

if (!config.isTlsEnabled()) {
URL webServiceUrl = new URL(
String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort().get()));
String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(),
URL webServiceUrl = new URL(String.format("http://%s:%d",
ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
config.getWebServicePort().get()));
String brokerServiceUrl = String.format("pulsar://%s:%d",
ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
config.getBrokerServicePort().get());
admin = PulsarAdmin.builder().serviceHttpUrl(
webServiceUrl.toString()).authentication(
Expand All @@ -317,9 +319,11 @@ public void start() throws Exception {
} else {
checkArgument(config.getWebServicePortTls().isPresent(), "webServicePortTls must be present");
checkArgument(config.getBrokerServicePortTls().isPresent(), "brokerServicePortTls must be present");
URL webServiceUrlTls = new URL(
String.format("https://%s:%d", config.getAdvertisedAddress(), config.getWebServicePortTls().get()));
String brokerServiceUrlTls = String.format("pulsar+ssl://%s:%d", config.getAdvertisedAddress(),
URL webServiceUrlTls = new URL(String.format("https://%s:%d",
ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
config.getWebServicePortTls().get()));
String brokerServiceUrlTls = String.format("pulsar+ssl://%s:%d",
ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
config.getBrokerServicePortTls().get());
PulsarAdminBuilder builder = PulsarAdmin.builder()
.serviceHttpUrl(webServiceUrlTls.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void start() throws PulsarServerException {

LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.protocol.Commands.newLookupResponse;
import io.netty.buffer.ByteBuf;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
Expand All @@ -33,6 +34,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
Expand Down Expand Up @@ -293,14 +295,10 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
newAuthoritative, LookupType.Redirect, requestId, false));
} else {
// When running in standalone mode we want to redirect the client through the service
// url, so that the advertised address configuration is not relevant anymore.
boolean redirectThroughServiceUrl = pulsarService.getConfiguration()
.isRunningStandalone();

ServiceConfiguration conf = pulsarService.getConfiguration();
lookupfuture.complete(newLookupResponse(lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls(), true /* authoritative */, LookupType.Connect,
requestId, redirectThroughServiceUrl));
requestId, shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
Expand Down Expand Up @@ -354,5 +352,32 @@ protected TopicName getTopicName(String topicDomain, String tenant, String names
return TopicName.get(TopicDomain.getEnum(topicDomain).value(), tenant, namespace, decodedName);
}

private static boolean shouldRedirectThroughServiceUrl(ServiceConfiguration conf, LookupData lookupData) {
// When running in standalone mode we want to redirect the client through the service URL,
// if the advertised address is a loopback address (see PulsarStandaloneStarter).
if (!conf.isRunningStandalone()) {
return false;
}
if (!StringUtils.isEmpty(lookupData.getBrokerUrl())) {
try {
URI host = URI.create(lookupData.getBrokerUrl());
return InetAddress.getByName(host.getHost()).isLoopbackAddress();
} catch (Exception e) {
log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrl(), e.getMessage());
return false;
}
}
if (!StringUtils.isEmpty(lookupData.getBrokerUrlTls())) {
try {
URI host = URI.create(lookupData.getBrokerUrlTls());
return InetAddress.getByName(host.getHost()).isLoopbackAddress();
} catch (Exception e) {
log.info("Failed to resolve advertised address {}: {}", lookupData.getBrokerUrlTls(), e.getMessage());
return false;
}
}
return false;
}

private static final Logger log = LoggerFactory.getLogger(TopicLookupBase.class);
}

0 comments on commit 2c09593

Please sign in to comment.