Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testclient] Make --payload-file take effect in PerformanceClient #12187

Merged
merged 2 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,30 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.testclient.PositiveNumberParameterValidator;
import org.apache.pulsar.testclient.IMessageFormatter;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
Expand All @@ -69,6 +76,7 @@ public class PerformanceClient {
private static final LongAdder bytesSent = new LongAdder();
private static final LongAdder totalMessagesSent = new LongAdder();
private static final LongAdder totalBytesSent = new LongAdder();
private static IMessageFormatter messageFormatter = null;
private JCommander jc;

@Parameters(commandDescription = "Test pulsar websocket producer performance.")
Expand Down Expand Up @@ -112,6 +120,17 @@ static class Arguments {
@Parameter(names = { "-f", "--payload-file" }, description = "Use payload from a file instead of empty buffer")
public String payloadFilename = null;

@Parameter(names = { "-e", "--payload-delimiter" },
description = "The delimiter used to split lines when using payload from a file")
public String payloadDelimiter = "\\n"; // here escaping \n since default value will be printed with the help text

@Parameter(names = { "-fp", "--format-payload" },
description = "Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds")
public boolean formatPayload = false;

@Parameter(names = {"-fc", "--format-class"}, description="Custom Formatter class name")
public String formatterClass = "org.apache.pulsar.testclient.DefaultMessageFormatter";

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;
Expand Down Expand Up @@ -193,23 +212,49 @@ public Arguments loadArguments(String[] args) {

}

public void runPerformanceTest(long messages, long limit, int numOfTopic, int sizeOfMessage, String baseUrl,
String topicName, String authPluginClassName, String authParams) throws InterruptedException, FileNotFoundException {
public void runPerformanceTest(Arguments arguments) throws InterruptedException, IOException {
// Read payload data from file if needed
final byte[] payloadBytes = new byte[arguments.msgSize];
Random random = new Random(0);
List<byte[]> payloadByteList = Lists.newArrayList();
if (arguments.payloadFilename != null) {
Path payloadFilePath = Paths.get(arguments.payloadFilename);
if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) {
throw new IllegalArgumentException("Payload file doesn't exist or it is empty.");
}
// here escaping the default payload delimiter to correct value
String delimiter = arguments.payloadDelimiter.equals("\\n") ? "\n" : arguments.payloadDelimiter;
String[] payloadList = new String(Files.readAllBytes(payloadFilePath), StandardCharsets.UTF_8).split(delimiter);
log.info("Reading payloads from {} and {} records read", payloadFilePath.toAbsolutePath(), payloadList.length);
for (String payload : payloadList) {
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
}

if (arguments.formatPayload) {
messageFormatter = getMessageFormatter(arguments.formatterClass);
}
} else {
for (int i = 0; i < payloadBytes.length; ++i) {
payloadBytes[i] = (byte) (random.nextInt(26) + 65); // The value of the payloadBytes is from A to Z and the ASCII of A-Z is 65-90, so it needs to add 65
}
}

ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec"));
HashMap<String, Tuple> producersMap = new HashMap<>();
String topicName = arguments.topics.get(0);
String restPath = TopicName.get(topicName).getRestPath();
String produceBaseEndPoint = TopicName.get(topicName).isV2() ?
baseUrl + "ws/v2/producer/" + restPath : baseUrl + "ws/producer/" + restPath;
for (int i = 0; i < numOfTopic; i++) {
String topic = numOfTopic > 1 ? produceBaseEndPoint + String.valueOf(i) : produceBaseEndPoint;
arguments.proxyURL + "ws/v2/producer/" + restPath : arguments.proxyURL + "ws/producer/" + restPath;
for (int i = 0; i < arguments.numTopics; i++) {
String topic = arguments.numTopics > 1 ? produceBaseEndPoint + i : produceBaseEndPoint;
URI produceUri = URI.create(topic);

WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();

if (StringUtils.isNotBlank(authPluginClassName) && StringUtils.isNotBlank(authParams)) {
if (StringUtils.isNotBlank(arguments.authPluginClassName) && StringUtils.isNotBlank(arguments.authParams)) {
try {
Authentication auth = AuthenticationFactory.create(authPluginClassName, authParams);
Authentication auth = AuthenticationFactory.create(arguments.authPluginClassName, arguments.authParams);
auth.start();
AuthenticationDataProvider authData = auth.getAuthData();
if (authData.hasDataForHttp()) {
Expand Down Expand Up @@ -243,14 +288,14 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si

executor.submit(() -> {
try {
RateLimiter rateLimiter = RateLimiter.create(limit);
RateLimiter rateLimiter = RateLimiter.create(arguments.msgRate);
// Send messages on all topics/producers
long totalSent = 0;
while (true) {
for (String topic : producersMap.keySet()) {
if (messages > 0) {
if (totalSent >= messages) {
log.trace("------------- DONE (reached the maximum number: [{}] of production) --------------", messages);
if (arguments.numMessages > 0) {
if (totalSent >= arguments.numMessages) {
log.trace("------------- DONE (reached the maximum number: [{}] of production) --------------", arguments.numMessages);
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
Expand All @@ -262,11 +307,23 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);

byte[] payloadData;
if (arguments.payloadFilename != null) {
if (messageFormatter != null) {
payloadData = messageFormatter.formatMessage("", totalSent,
payloadByteList.get(random.nextInt(payloadByteList.size())));
} else {
payloadData = payloadByteList.get(random.nextInt(payloadByteList.size()));
}
} else {
payloadData = payloadBytes;
}
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), payloadData);
messagesSent.increment();
bytesSent.add(sizeOfMessage);
bytesSent.add(payloadData.length);
totalMessagesSent.increment();
totalBytesSent.add(sizeOfMessage);
totalBytesSent.add(payloadData.length);
}
}

Expand Down Expand Up @@ -328,6 +385,16 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si

}

static IMessageFormatter getMessageFormatter(String formatterClass) {
try {
ClassLoader classLoader = PerformanceClient.class.getClassLoader();
Class clz = classLoader.loadClass(formatterClass);
return (IMessageFormatter) clz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
return null;
}
}

public static void main(String[] args) throws Exception {
PerformanceClient test = new PerformanceClient();
Arguments arguments = test.loadArguments(args);
Expand All @@ -337,8 +404,7 @@ public static void main(String[] args) throws Exception {
printAggregatedThroughput(start);
printAggregatedStats();
}));
test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize,
arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams);
test.runPerformanceTest(arguments);
}

private class Tuple {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ public Session getSession() {
return this.session;
}

public void sendMsg(String context, int sizeOfMessage)
public void sendMsg(String context, byte[] payloadData)
throws IOException, JsonParseException, InterruptedException, ExecutionException {
byte[] payload = new byte[sizeOfMessage];
String message = getEncoder().encodeToString(payload);
String message = getEncoder().encodeToString(payloadData);
String timeStamp = "{\"payload\": \"" + message + "\",\"context\": \"" + context + "\"}";
String sampleMsg = new Gson().fromJson(timeStamp, JsonObject.class).toString();
if (this.session != null && this.session.isOpen() && this.session.getRemote() != null) {
Expand Down
3 changes: 3 additions & 0 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ Options
|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from a file instead of empty buffer||
|`-e`, `--payload-delimiter`|The delimiter used to split lines when using payload from a file|\n|
|`-fp`, `--format-payload`|Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds|false|
|`-fc`, `--format-class`|Custom formatter class name|`org.apache.pulsar.testclient.DefaultMessageFormatter`|
|`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"||
|`-r`, `--rate`|Publish rate msg/s across topics|100|
|`-s`, `--size`|Message size in byte|1024|
Expand Down