Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix-test
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong authored Dec 6, 2022
2 parents 2240a75 + 90f6758 commit 976a3c4
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 4,590 deletions.
18 changes: 12 additions & 6 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ webServicePortTls=

# Specify the tls protocols the broker's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# webServiceTlsProtocols=TLSv1.3,TLSv1.2
webServiceTlsProtocols=

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# webServiceTlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
webServiceTlsCiphers=

# Hostname or IP address the service binds on, default is 0.0.0.0.
Expand Down Expand Up @@ -680,12 +682,14 @@ tlsAllowInsecureConnection=false

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# tlsProtocols=TLSv1.3,TLSv1.2
tlsProtocols=

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# tlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
tlsCiphers=

# Trusted client certificates are required for to connect TLS
Expand Down Expand Up @@ -773,14 +777,16 @@ brokerClientTlsTrustStorePassword=

# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers)
# e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
# used by the internal client to authenticate with Pulsar brokers
# Examples:
# brokerClientTlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
brokerClientTlsCiphers=

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# e.g. [TLSv1.3, TLSv1.2]
# used by the internal client to authenticate with Pulsar brokers
# Example:
# brokerClientTlsProtocols=TLSv1.3,TLSv1.2
brokerClientTlsProtocols=

# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client
Expand Down
12 changes: 8 additions & 4 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ tlsTrustStorePassword=

# Specify the tls protocols the proxy's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# webServiceTlsProtocols=TLSv1.3,TLSv1.2
webServiceTlsProtocols=

# Specify the tls cipher the proxy will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# webServiceTlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
webServiceTlsCiphers=

# Allowed broker target ports
Expand Down Expand Up @@ -227,12 +229,14 @@ tlsHostnameVerificationEnabled=false

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# tlsProtocols=TLSv1.3,TLSv1.2
tlsProtocols=

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# tlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
tlsCiphers=

# Whether client certificates are required for TLS. Connections are rejected if the client
Expand Down
12 changes: 8 additions & 4 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,14 @@ tlsAllowInsecureConnection=false

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# tlsProtocols=TLSv1.3,TLSv1.2
tlsProtocols=

# Specify the tls cipher the broker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# tlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
tlsCiphers=

# Trusted client certificates are required for to connect TLS
Expand Down Expand Up @@ -469,14 +471,16 @@ brokerClientTlsTrustStorePassword=

# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers)
# e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
# used by the internal client to authenticate with Pulsar brokers
# Examples:
# brokerClientTlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
brokerClientTlsCiphers=

# Specify the tls protocols the broker will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# e.g. [TLSv1.3, TLSv1.2]
# used by the internal client to authenticate with Pulsar brokers
# Examples:
# brokerClientTlsProtocols=TLSv1.3,TLSv1.2
brokerClientTlsProtocols=

# Enable or disable system topic
Expand Down
6 changes: 4 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ tlsTrustStorePassword=

# Specify the tls protocols the proxy's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
# Examples:
# webServiceTlsProtocols=TLSv1.3,TLSv1.2
webServiceTlsProtocols=

# Specify the tls cipher the proxy will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
# Examples:
# webServiceTlsCiphers=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
webServiceTlsCiphers=

### --- Deprecated config variables --- ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se

@FieldContext(
category = CATEGORY_POLICIES,
minValue = 1,
doc = "How often to check for topics that have reached the quota."
+ " It only takes effects when `backlogQuotaCheckEnabled` is true"
)
Expand Down Expand Up @@ -616,6 +617,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false;
@FieldContext(
category = CATEGORY_POLICIES,
minValue = 1,
dynamic = true,
doc = "How often to check for inactive topics"
)
Expand Down Expand Up @@ -665,6 +667,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se

@FieldContext(
category = CATEGORY_POLICIES,
minValue = 1,
doc = "How frequently to proactively check and purge expired messages"
)
private int messageExpiryCheckIntervalInMinutes = 5;
Expand Down Expand Up @@ -768,6 +771,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se

@FieldContext(
category = CATEGORY_POLICIES,
minValue = 1,
doc = "Time of inactivity after which the broker will discard the deduplication information"
+ " relative to a disconnected producer. Default is 6 hours.")
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;
Expand Down Expand Up @@ -2706,6 +2710,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
private boolean exposePublisherStats = true;
@FieldContext(
category = CATEGORY_METRICS,
minValue = 1,
doc = "Stats update frequency in seconds"
)
private int statsUpdateFrequencyInSecs = 60;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.nio.file.Path;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
Expand Down Expand Up @@ -79,8 +79,7 @@ private static ServiceConfiguration loadConfig(String configFile) throws Excepti
@Parameters(commandDescription = "Options")
private static class StarterArguments {
@Parameter(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
private String brokerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";
private String brokerConfigFile = "conf/broker.conf";

@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
private boolean runBookie = false;
Expand All @@ -90,15 +89,13 @@ private static class StarterArguments {
private boolean runBookieAutoRecovery = false;

@Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
private String bookieConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
private String bookieConfigFile = "conf/bookkeeper.conf";

@Parameter(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
private boolean runFunctionsWorker = false;

@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
private String fnWorkerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
private String fnWorkerConfigFile = "conf/functions_worker.yml";

@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
Expand Down Expand Up @@ -152,22 +149,24 @@ private static class BrokerStarter {
jcommander.parse(args);
if (starterArguments.help) {
jcommander.usage();
System.exit(-1);
System.exit(0);
}

if (starterArguments.generateDocs) {
CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
cmd.addCommand("broker", starterArguments);
cmd.run(null);
System.exit(-1);
System.exit(0);
}

// init broker config
if (isBlank(starterArguments.brokerConfigFile)) {
jcommander.usage();
throw new IllegalArgumentException("Need to specify a configuration file for broker");
} else {
brokerConfig = loadConfig(starterArguments.brokerConfigFile);
final String filepath = Path.of(starterArguments.brokerConfigFile)
.toAbsolutePath().normalize().toString();
brokerConfig = loadConfig(filepath);
}

int maxFrameSize = brokerConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING;
Expand All @@ -190,9 +189,9 @@ private static class BrokerStarter {

// init functions worker
if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(
brokerConfig, starterArguments.fnWorkerConfigFile
);
final String filepath = Path.of(starterArguments.fnWorkerConfigFile)
.toAbsolutePath().normalize().toString();
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(brokerConfig, filepath);
functionsWorkerService = WorkerServiceLoader.load(workerConfig);
} else {
workerConfig = null;
Expand Down Expand Up @@ -231,7 +230,9 @@ && isBlank(starterArguments.bookieConfigFile)) {
if (starterArguments.runBookie || starterArguments.runBookieAutoRecovery) {
checkState(isNotBlank(starterArguments.bookieConfigFile),
"No configuration file for Bookie");
bookieConfig = readBookieConfFile(starterArguments.bookieConfigFile);
final String filepath = Path.of(starterArguments.bookieConfigFile)
.toAbsolutePath().normalize().toString();
bookieConfig = readBookieConfFile(filepath);
Class<? extends StatsProvider> statsProviderClass = bookieConfig.getStatsProviderClass();
bookieStatsProvider = ReflectionUtils.newInstance(statsProviderClass);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Sets;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -251,8 +252,7 @@ public boolean isHelp() {
private boolean noFunctionsWorker = false;

@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
private String fnWorkerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
private String fnWorkerConfigFile = "conf/functions_worker.yml";

@Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable stream storage")
private boolean noStreamStorage = false;
Expand Down Expand Up @@ -305,8 +305,8 @@ public void start() throws Exception {

// initialize the functions worker
if (!this.isNoFunctionsWorker()) {
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(
config, this.getFnWorkerConfigFile());
final String filepath = Path.of(getFnWorkerConfigFile()).toAbsolutePath().normalize().toString();
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, filepath);
if (usingNewDefaultsPIP117) {
workerConfig.setStateStorageProviderImplementation(
PulsarMetadataStateStoreProviderImpl.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
conf.setTlsTrustCertsFilePath(
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
: this.getConfiguration().getTlsCertificateFilePath());
: this.getConfiguration().getTlsTrustCertsFilePath());
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -918,10 +919,14 @@ public void splitNamespaceBundle(
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
validateNamespaceName(property, cluster, namespace);
if (StringUtils.isEmpty(splitAlgorithmName)) {
splitAlgorithmName = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME;
}
internalSplitNamespaceBundleAsync(bundleRange,
authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries)
authoritative, unload, splitAlgorithmName, splitBoundaries)
.thenAccept(__ -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,22 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS

public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
Position position) {
if (isShadowTopic && position == null || !isShadowTopic && position != null) {
if (!isShadowTopic && position != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Only shadow topic supports sending messages with messageId");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (isShadowTopic && position == null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Cannot send messages to a shadow topic");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (isClosed) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError,
Expand Down
Loading

0 comments on commit 976a3c4

Please sign in to comment.