From 003a19fd5dc42c22921cde366799510c2a2b4a72 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Mon, 18 Apr 2022 08:52:19 +0800 Subject: [PATCH] [PIP-74] Support auto scaled consumer receiver queue (#14494) * Add autoScaledReceiverQueueSize * Add autoScaledReceiverQueueSize for PerformanceConsumer * remove memory limit code * fix typo --- .../impl/AutoScaledReceiverQueueSizeTest.java | 258 ++++++++++++++++++ .../pulsar/client/api/ConsumerBuilder.java | 14 + .../pulsar/client/impl/ConsumerBase.java | 26 +- .../client/impl/ConsumerBuilderImpl.java | 6 + .../pulsar/client/impl/ConsumerImpl.java | 30 +- .../client/impl/MultiTopicsConsumerImpl.java | 28 +- .../client/impl/ZeroQueueConsumerImpl.java | 10 + .../impl/conf/ConsumerConfigurationData.java | 2 + .../testclient/PerformanceConsumer.java | 38 ++- 9 files changed, 407 insertions(+), 5 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java new file mode 100644 index 00000000000000..2b9e2dc3adf572 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.BatchReceivePolicy; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest { + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + setupDefaultTenantAndNamespace(); + } + + @BeforeClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testConsumerImpl() throws PulsarClientException { + String topic = "persistent://public/default/testConsumerImpl" + System.currentTimeMillis(); + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .receiverQueueSize(3) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + producer.send(data); + Assert.assertNotNull(consumer.receive()); + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + + //this will trigger receiver queue size expanding. + Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS)); + + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2); + Assert.assertFalse(consumer.scaleReceiverQueueHint.get()); + + for (int i = 0; i < 5; i++) { + producer.send(data); + producer.send(data); + Assert.assertNotNull(consumer.receive()); + Assert.assertNotNull(consumer.receive()); + // queue maybe full, but no empty receive, so no expanding + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2); + } + + producer.send(data); + producer.send(data); + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + Assert.assertNotNull(consumer.receive()); + Assert.assertNotNull(consumer.receive()); + Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS)); + // queue is full, with empty receive, expanding to max size + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3); + } + + @Test + public void testConsumerImplBatchReceive() throws PulsarClientException { + String topic = "persistent://public/default/testConsumerImplBatchReceive" + System.currentTimeMillis(); + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build()) + .receiverQueueSize(20) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + + int currentSize = 8; + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i < 10; i++) { // just run a few times. + for (int j = 0; j < 5; j++) { + producer.send(data); + } + Awaitility.await().until(() -> consumer.incomingMessages.size() == 5); + log.info("i={},expandReceiverQueueHint:{},local permits:{}", + i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits()); + Assert.assertEquals(consumer.batchReceive().size(), 5); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + } + + //clear local available permits. + int n = currentSize / 2 - consumer.getAvailablePermits(); + for (int i = 0; i < n; i++) { + producer.send(data); + consumer.receive(); + } + Assert.assertEquals(consumer.getAvailablePermits(), 0); + + for (int i = 0; i < currentSize; i++) { + producer.send(data); + } + + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + Assert.assertEquals(consumer.batchReceive().size(), 5); + + //trigger expanding + consumer.batchReceiveAsync(); + Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + } + + @Test + public void testMultiConsumerImpl() throws Exception { + String topic = "persistent://public/default/testMultiConsumerImpl" + System.currentTimeMillis(); + admin.topics().createPartitionedTopic(topic, 3); + @Cleanup + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .receiverQueueSize(10) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + + //queue size will be adjusted to partition number. + Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3)); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i < 3; i++) { + producer.send(data); + } + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + for (int i = 0; i < 3; i++) { + Assert.assertNotNull(consumer.receive()); + } + Assert.assertTrue(consumer.scaleReceiverQueueHint.get()); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3); // queue size no change + + //this will trigger receiver queue size expanding. + Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS)); + + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6); + Assert.assertFalse(consumer.scaleReceiverQueueHint.get()); //expandReceiverQueueHint is reset. + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 6; j++) { + producer.send(data); + } + for (int j = 0; j < 6; j++) { + Assert.assertNotNull(consumer.receive()); + } + log.info("i={},currentReceiverQueueSize={},expandReceiverQueueHint={}", i, + consumer.getCurrentReceiverQueueSize(), consumer.scaleReceiverQueueHint); + // queue maybe full, but no empty receive, so no expanding + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6); + } + + for (int j = 0; j < 6; j++) { + producer.send(data); + } + Awaitility.await().until(() -> consumer.scaleReceiverQueueHint.get()); + for (int j = 0; j < 6; j++) { + Assert.assertNotNull(consumer.receive()); + } + Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS)); + // queue is full, with empty receive, expanding to max size + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 10); + } + + @Test + public void testMultiConsumerImplBatchReceive() throws PulsarClientException, PulsarAdminException { + String topic = "persistent://public/default/testMultiConsumerImplBatchReceive" + System.currentTimeMillis(); + admin.topics().createPartitionedTopic(topic, 3); + @Cleanup + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build()) + .receiverQueueSize(20) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + + //receiver queue size init as 5. + int currentSize = 5; + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize); + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i < 10; i++) { // just run a few times. + for (int j = 0; j < 5; j++) { + producer.send(data); + } + log.info("i={},expandReceiverQueueHint:{},local permits:{}", + i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits()); + Assert.assertEquals(consumer.batchReceive().size(), 5); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + } + + for (int i = 0; i < currentSize; i++) { + producer.send(data); + } + + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + Assert.assertEquals(consumer.batchReceive().size(), 5); + + //trigger expanding + consumer.batchReceiveAsync(); + Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2); + log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index caeb3d58b824af..029d4de7d9c0c2 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -798,4 +798,18 @@ public interface ConsumerBuilder extends Cloneable { * @default false */ ConsumerBuilder startPaused(boolean paused); + + /** + * If this is enabled, consumer receiver queue size is init as a very small value, 1 by default, + * and it will double itself until it reaches the value set by {@link #receiverQueueSize(int)}, if and only if + * 1) User calls receive() and there is no messages in receiver queue. + * 2) The last message we put in the receiver queue took the last space available in receiver queue. + * + * This is disabled by default and currentReceiverQueueSize is init as maxReceiverQueueSize. + * + * The feature should be able to reduce client memory usage. + * + * @param enabled whether to enable AutoScaledReceiverQueueSize. + */ + ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 6215562160ae49..f9a7911935ca98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.Lock; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; public abstract class ConsumerBase extends HandlerState implements Consumer { + protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1; protected final String subscription; protected final ConsumerConfigurationData conf; @@ -102,6 +104,8 @@ public abstract class ConsumerBase extends HandlerState implements Consumer conf, int receiverQueueSize, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, @@ -125,7 +129,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; this.interceptors = interceptors; - CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize); if (conf.getBatchReceivePolicy() != null) { BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy(); if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) { @@ -160,6 +163,22 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS); } + + initReceiverQueueSize(); + } + + + public abstract void initReceiverQueueSize(); + + protected void expectMoreIncomingMessages() { + if (!conf.isAutoScaledReceiverQueueSizeEnabled()) { + return; + } + if (scaleReceiverQueueHint.compareAndSet(true, false)) { + int oldSize = getCurrentReceiverQueueSize(); + int newSize = Math.min(maxReceiverQueueSize, oldSize * 2); + setCurrentReceiverQueueSize(newSize); + } } @Override @@ -777,10 +796,13 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message message) { // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance // anymore, since for pooled messages, this instance was possibly already been released and recycled. INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); + updateAutoScaleReceiverQueueHint(); } return hasEnoughMessagesForBatchReceive(); } + protected abstract void updateAutoScaleReceiverQueueHint(); + protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; @@ -1031,7 +1053,7 @@ protected boolean hasPendingBatchReceive() { } protected void resetIncomingMessageSize() { - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0); } protected void decreaseIncomingMessageSize(final Message message) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index a486104bdaa0df..644c0025d16bf3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -530,4 +530,10 @@ public ConsumerBuilder startPaused(boolean paused) { conf.setStartPaused(paused); return this; } + + @Override + public ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled) { + conf.setAutoScaledReceiverQueueSizeEnabled(enabled); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e666dae63b7382..3d8f7e56330cd4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -412,10 +412,28 @@ public CompletableFuture unsubscribeAsync() { return unsubscribeFuture; } + @Override + public void initReceiverQueueSize() { + if (conf.isAutoScaledReceiverQueueSizeEnabled()) { + // turn on autoScaledReceiverQueueSize + int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize); + if (batchReceivePolicy.getMaxNumMessages() > 0) { + // consumerImpl may store (half-1) permits locally. + size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2); + } + CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size); + } else { + CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize); + } + } + @Override protected Message internalReceive() throws PulsarClientException { Message message; try { + if (incomingMessages.isEmpty()) { + expectMoreIncomingMessages(); + } message = incomingMessages.take(); messageProcessed(message); if (!isValidConsumerEpoch(message)) { @@ -439,6 +457,7 @@ protected CompletableFuture> internalReceiveAsync() { internalPinnedExecutor.execute(() -> { Message message = incomingMessages.poll(); if (message == null) { + expectMoreIncomingMessages(); pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { @@ -460,6 +479,9 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC Message message; long callTime = System.nanoTime(); try { + if (incomingMessages.isEmpty()) { + expectMoreIncomingMessages(); + } message = incomingMessages.poll(timeout, unit); if (message == null) { return null; @@ -524,6 +546,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { } result.complete(messages); } else { + expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); pendingBatchReceives.add(opBatchReceive); cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive)); @@ -1609,7 +1632,7 @@ protected void setCurrentReceiverQueueSize(int newSize) { int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, newSize); int delta = newSize - oldSize; if (log.isDebugEnabled()) { - log.debug("[{}][{}] update maxReceiverQueueSize from {} to {}, increaseAvailablePermits by {}", + log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}", topic, subscription, oldSize, newSize, delta); } increaseAvailablePermits(delta); @@ -1902,6 +1925,11 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } } + @Override + protected void updateAutoScaleReceiverQueueHint() { + scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize()); + } + @Override protected void completeOpBatchReceive(OpBatchReceive op) { notifyPendingBatchReceivedCallBack(op); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 21e0b9941cef79..c0c626a600a62d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -337,10 +337,26 @@ private boolean isValidConsumerEpoch(Message message) { .getMessage())); } + @Override + public void initReceiverQueueSize() { + if (conf.isAutoScaledReceiverQueueSizeEnabled()) { + int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize); + if (batchReceivePolicy.getMaxNumMessages() > 0) { + size = Math.max(size, batchReceivePolicy.getMaxNumMessages()); + } + CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size); + } else { + CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize); + } + } + @Override protected Message internalReceive() throws PulsarClientException { Message message; try { + if (incomingMessages.isEmpty()) { + expectMoreIncomingMessages(); + } message = incomingMessages.take(); decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); @@ -363,6 +379,9 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC long callTime = System.nanoTime(); try { + if (incomingMessages.isEmpty()) { + expectMoreIncomingMessages(); + } message = incomingMessages.poll(timeout, unit); if (message != null) { decreaseIncomingMessageSize(message); @@ -425,6 +444,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { } result.complete(messages); } else { + expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); pendingBatchReceives.add(opBatchReceive); cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive)); @@ -444,6 +464,7 @@ protected CompletableFuture> internalReceiveAsync() { internalPinnedExecutor.execute(() -> { Message message = incomingMessages.poll(); if (message == null) { + expectMoreIncomingMessages(); pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { @@ -714,6 +735,11 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { resumeReceivingFromPausedConsumersIfNeeded(); } + @Override + protected void updateAutoScaleReceiverQueueHint() { + scaleReceiverQueueHint.set(incomingMessages.size() >= getCurrentReceiverQueueSize()); + } + @Override protected void completeOpBatchReceive(OpBatchReceive op) { notifyPendingBatchReceivedCallBack(op); @@ -1487,7 +1513,7 @@ protected void setCurrentReceiverQueueSize(int newSize) { checkArgument(newSize > 0, "receiver queue size should larger than 0"); if (log.isDebugEnabled()) { log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", topic, subscription, - getCurrentReceiverQueueSize(), newSize); + newSize, getCurrentReceiverQueueSize()); } CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize); resumeReceivingFromPausedConsumersIfNeeded(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 9f0f2e215a50b9..e63d803237e396 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -56,6 +56,16 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf createTopicIfDoesNotExist); } + @Override + public void initReceiverQueueSize() { + if (conf.isAutoScaledReceiverQueueSizeEnabled()) { + throw new NotImplementedException("AutoScaledReceiverQueueSize is not supported in ZeroQueueConsumerImpl"); + } else { + CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0); + } + + } + @Override protected Message internalReceive() throws PulsarClientException { zeroQueueLock.lock(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index cf1f60d8e9358f..6c22d143a6f06d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -162,6 +162,8 @@ public int getMaxPendingChuckedMessage() { private boolean startPaused = false; + private boolean autoScaledReceiverQueueSizeEnabled = false; + public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) { checkArgument(interval > 0, "interval needs to be > 0"); this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 9d1920ac4f651a..bdee5e2df08f4c 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -54,6 +54,9 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.ConsumerBase; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; @@ -130,6 +133,10 @@ static class Arguments { description = "Max total size of the receiver queue across partitions") public int maxTotalReceiverQueueSizeAcrossPartitions = 50000; + @Parameter(names = {"-aq", "--auto-scaled-receiver-queue-size"}, + description = "Enable autoScaledReceiverQueueSize") + public boolean autoScaledReceiverQueueSize = false; + @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated") public boolean replicatedSubscription = false; @@ -342,6 +349,8 @@ public static void main(String[] args) throws Exception { ObjectWriter w = m.writerWithDefaultPrettyPrinter(); log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments)); + final Recorder qRecorder = arguments.autoScaledReceiverQueueSize + ? new Recorder(arguments.receiverQueueSize, 5) : null; final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; long startTime = System.nanoTime(); long testEndTime = startTime + (long) (arguments.testTime * 1e9); @@ -394,6 +403,9 @@ public static void main(String[] args) throws Exception { thread.interrupt(); } } + if (qRecorder != null) { + qRecorder.recordValue(((ConsumerBase) consumer).getTotalIncomingMessages()); + } messagesReceived.increment(); bytesReceived.add(msg.size()); @@ -500,7 +512,8 @@ public static void main(String[] args) throws Exception { .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull) .enableBatchIndexAcknowledgment(arguments.batchIndexAck) .poolMessages(arguments.poolMessages) - .replicateSubscriptionState(arguments.replicatedSubscription); + .replicateSubscriptionState(arguments.replicatedSubscription) + .autoScaledReceiverQueueSizeEnabled(arguments.autoScaledReceiverQueueSize); if (arguments.maxPendingChunkedMessage > 0) { consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage); } @@ -543,6 +556,7 @@ public static void main(String[] args) throws Exception { long oldTime = System.nanoTime(); Histogram reportHistogram = null; + Histogram qHistogram = null; HistogramLogWriter histogramLogWriter = null; if (arguments.histogramFile != null) { @@ -596,6 +610,28 @@ public static void main(String[] args) throws Exception { reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99), reportHistogram.getMaxValue()); + if (arguments.autoScaledReceiverQueueSize && log.isDebugEnabled() && qRecorder != null) { + qHistogram = qRecorder.getIntervalHistogram(qHistogram); + log.debug("ReceiverQueueUsage: cnt={},mean={}, min={},max={},25pct={},50pct={},75pct={}", + qHistogram.getTotalCount(), dec.format(qHistogram.getMean()), + qHistogram.getMinValue(), qHistogram.getMaxValue(), + qHistogram.getValueAtPercentile(25), + qHistogram.getValueAtPercentile(50), + qHistogram.getValueAtPercentile(75) + ); + qHistogram.reset(); + for (Future> future : futures) { + ConsumerBase consumerBase = (ConsumerBase) future.get(); + log.debug("[{}] CurrentReceiverQueueSize={}", consumerBase.getConsumerName(), + consumerBase.getCurrentReceiverQueueSize()); + if (consumerBase instanceof MultiTopicsConsumerImpl) { + for (ConsumerImpl consumer : ((MultiTopicsConsumerImpl) consumerBase).getConsumers()) { + log.debug("[{}] SubConsumer.CurrentReceiverQueueSize={}", consumer.getConsumerName(), + consumer.getCurrentReceiverQueueSize()); + } + } + } + } if (histogramLogWriter != null) { histogramLogWriter.outputIntervalHistogram(reportHistogram); }