Skip to content

Commit

Permalink
[fix][test] Detects whether an empty object is returned, prevent NPE …
Browse files Browse the repository at this point in the history
…exception (apache#21585)
  • Loading branch information
pandalee99 authored Nov 20, 2023
1 parent 9fbb92a commit 2cb54f3
Showing 1 changed file with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
Expand Down Expand Up @@ -273,4 +270,36 @@ public void testGetNumOfPartitions() throws Exception {
assertEquals(producerImpl.getNumOfPartitions(), 0);
}


@Test
public void testOnTopicsExtended() throws Exception {
String topicName = "test-on-topics-extended";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
conf.setStatsIntervalSeconds(100);
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
@Cleanup("shutdownGracefully")
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);

@Cleanup
PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);

ProducerConfigurationData producerConfData = new ProducerConfigurationData();
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
producerConfData.setCustomMessageRouter(new CustomMessageRouter());
producerConfData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MILLISECONDS);

PartitionedProducerImpl impl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 1, null, null, null);

impl.setState(HandlerState.State.Ready);
Thread.sleep(1000);
CompletableFuture future = impl.getPartitionsAutoUpdateFuture();

// When null is returned in method thenCompose we will encounter an NPE exception.
// Because the returned value will be applied to the next stage.
// We use future instead of null as the return value.
assertNotNull(future);
}

}

0 comments on commit 2cb54f3

Please sign in to comment.