diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 426a00db25ec85..e761b04b79b3ef 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -66,7 +66,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { @Test public void testOffloadRead() throws Exception { - MockLedgerOffloader offloader = spy(new MockLedgerOffloader()); + MockLedgerOffloader offloader = spy(MockLedgerOffloader.class); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); config.setMinimumRolloverTime(0, TimeUnit.SECONDS); @@ -122,7 +122,7 @@ public void testOffloadRead() throws Exception { @Test public void testBookkeeperFirstOffloadRead() throws Exception { - MockLedgerOffloader offloader = spy(new MockLedgerOffloader()); + MockLedgerOffloader offloader = spy(MockLedgerOffloader.class); MockClock clock = new MockClock(); offloader.getOffloadPolicies() .setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 34b5fc0c2f465e..d3a0375f8b3e44 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -22,7 +22,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index ff03e425ccbcd0..224060c9d912e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import java.util.UUID; +import org.mockito.Mockito; /** * Holds util methods used in test. @@ -29,4 +30,18 @@ public static String newUniqueName(String prefix) { return prefix + "-" + UUID.randomUUID(); } + /** + * Creates a Mockito spy directly without an intermediate instance to spy. + * This is to address flaky test issue where a spy created with a given instance fails with + * {@link org.mockito.exceptions.misusing.WrongTypeOfReturnValue} exception. + * + * @param classToSpy the class to spy + * @param args the constructor arguments to use when creating the spy instance + * @return a spy of the provided class created with given constructor arguments + */ + public static <T> T spyWithClassAndConstructorArgs(Class<T> classToSpy, Object... args) { + return Mockito.mock(classToSpy, Mockito.withSettings() + .useConstructor(args) + .defaultAnswer(Mockito.CALLS_REAL_METHODS)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java index cba06863831576..ea0b5c2a155c47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java @@ -81,7 +81,7 @@ protected void setup() throws Exception { new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace("prop/ns-abc"); admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test")); - persistentTopics = spy(new PersistentTopics()); + persistentTopics = spy(PersistentTopics.class); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 97ab3fc0f21a16..011f4855a30a3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -119,17 +119,17 @@ public void setup() throws Exception { conf.setClusterName(configClusterName); super.internalSetup(); - clusters = spy(new Clusters()); + clusters = spy(Clusters.class); clusters.setPulsar(pulsar); doReturn("test").when(clusters).clientAppId(); doNothing().when(clusters).validateSuperUserAccess(); - properties = spy(new Properties()); + properties = spy(Properties.class); properties.setPulsar(pulsar); doReturn("test").when(properties).clientAppId(); doNothing().when(properties).validateSuperUserAccess(); - namespaces = spy(new Namespaces()); + namespaces = spy(Namespaces.class); namespaces.setServletContext(new MockServletContext()); namespaces.setPulsar(pulsar); doReturn("test").when(namespaces).clientAppId(); @@ -138,7 +138,7 @@ public void setup() throws Exception { doNothing().when(namespaces).validateAdminAccessForTenant("other-tenant"); doNothing().when(namespaces).validateAdminAccessForTenant("new-property"); - brokers = spy(new Brokers()); + brokers = spy(Brokers.class); brokers.setPulsar(pulsar); doReturn("test").when(brokers).clientAppId(); doNothing().when(brokers).validateSuperUserAccess(); @@ -146,7 +146,7 @@ public void setup() throws Exception { uriField = PulsarWebResource.class.getDeclaredField("uri"); uriField.setAccessible(true); - persistentTopics = spy(new PersistentTopics()); + persistentTopics = spy(PersistentTopics.class); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); doReturn("test").when(persistentTopics).clientAppId(); @@ -156,11 +156,11 @@ public void setup() throws Exception { doNothing().when(persistentTopics).validateAdminAccessForTenant("other-tenant"); doNothing().when(persistentTopics).validateAdminAccessForTenant("prop-xyz"); - resourceQuotas = spy(new ResourceQuotas()); + resourceQuotas = spy(ResourceQuotas.class); resourceQuotas.setServletContext(new MockServletContext()); resourceQuotas.setPulsar(pulsar); - brokerStats = spy(new BrokerStats()); + brokerStats = spy(BrokerStats.class); brokerStats.setServletContext(new MockServletContext()); brokerStats.setPulsar(pulsar); @@ -169,7 +169,7 @@ public void setup() throws Exception { doReturn("test").when(persistentTopics).clientAppId(); doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData(); - schemasResource = spy(new SchemasResource()); + schemasResource = spy(SchemasResource.class); schemasResource.setServletContext(new MockServletContext()); schemasResource.setPulsar(pulsar); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 613f3278d5b427..33942a36f4a8c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -158,7 +158,7 @@ public void setup() throws Exception { conf.setClusterName(testLocalCluster); super.internalSetup(); - namespaces = spy(new Namespaces()); + namespaces = spy(Namespaces.class); namespaces.setServletContext(new MockServletContext()); namespaces.setPulsar(pulsar); doReturn(false).when(namespaces).isRequestHttps(); @@ -1093,7 +1093,7 @@ public void testValidateTopicOwnership() throws Exception { ownership.setAccessible(true); ownership.set(pulsar.getNamespaceService(), MockOwnershipCache); TopicName topicName = TopicName.get(testNs.getPersistentTopicName("my-topic")); - PersistentTopics topics = spy(new PersistentTopics()); + PersistentTopics topics = spy(PersistentTopics.class); topics.setServletContext(new MockServletContext()); topics.setPulsar(pulsar); doReturn(false).when(topics).isRequestHttps(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 4e02338191c488..96076b50aa132a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -115,7 +116,7 @@ public void initPersistentTopics() throws Exception { @BeforeMethod protected void setup() throws Exception { super.internalSetup(); - persistentTopics = spy(new PersistentTopics()); + persistentTopics = spy(PersistentTopics.class); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); doReturn(false).when(persistentTopics).isRequestHttps(); @@ -125,7 +126,7 @@ protected void setup() throws Exception { doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant); doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData(); - nonPersistentTopic = spy(new NonPersistentTopics()); + nonPersistentTopic = spy(NonPersistentTopics.class); nonPersistentTopic.setServletContext(new MockServletContext()); nonPersistentTopic.setPulsar(pulsar); doReturn(false).when(nonPersistentTopic).isRequestHttps(); @@ -137,7 +138,7 @@ protected void setup() throws Exception { PulsarResources resources = spy(new PulsarResources(pulsar.getLocalMetadataStore(), pulsar.getConfigurationMetadataStore())); - doReturn(spy(new TopicResources(pulsar.getLocalMetadataStore()))).when(resources).getTopicResources(); + doReturn(spyWithClassAndConstructorArgs(TopicResources.class, pulsar.getLocalMetadataStore())).when(resources).getTopicResources(); Whitebox.setInternalState(pulsar, "pulsarResources", resources); admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java index 766ba9839e55d5..347b271a5b94b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java @@ -51,7 +51,7 @@ public class ResourceGroupsTest extends MockedPulsarServiceBaseTest { @Override protected void setup() throws Exception { super.internalSetup(); - resourcegroups = spy(new ResourceGroups()); + resourcegroups = spy(ResourceGroups.class); resourcegroups.setServletContext(new MockServletContext()); resourcegroups.setPulsar(pulsar); doReturn(false).when(resourcegroups).isRequestHttps(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 8e334ace6b26aa..0b3794e15742fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.auth; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -321,7 +322,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore(); doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore(); - Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); + Supplier<NamespaceService> namespaceServiceSupplier = () -> spyWithClassAndConstructorArgs(NamespaceService.class, pulsar); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor(); @@ -385,7 +386,7 @@ public static MockZooKeeper createMockZooKeeperGlobal() { } public static NonClosableMockBookKeeper createMockBookKeeper(OrderedExecutor executor) throws Exception { - return spy(new NonClosableMockBookKeeper(executor)); + return spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, executor); } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 41e8ed9cfcaecb..359aa34218b96b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import com.google.common.collect.Sets; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -74,7 +75,7 @@ public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerExcepti config.setAdvertisedAddress("localhost"); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); @Cleanup - PulsarService pulsar = Mockito.spy(new MockPulsarService(config)); + PulsarService pulsar = spyWithClassAndConstructorArgs(MockPulsarService.class, config); pulsar.start(); // mock pulsar.getLeaderElectionService() in a thread safe way diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 1daf2d73af1f5e..021326ebca7b19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -105,7 +106,7 @@ void setup() throws Exception { bkEnsemble.start(); // Start broker 1 - ServiceConfiguration config1 = spy(new ServiceConfiguration()); + ServiceConfiguration config1 = spy(ServiceConfiguration.class); config1.setClusterName("use"); config1.setWebServicePort(Optional.of(0)); config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); @@ -333,7 +334,7 @@ public void testLoadReportParsing() throws Exception { @Test(enabled = true) public void testDoLoadShedding() throws Exception { - SimpleLoadManagerImpl loadManager = spy(new SimpleLoadManagerImpl(pulsar1)); + SimpleLoadManagerImpl loadManager = spyWithClassAndConstructorArgs(SimpleLoadManagerImpl.class, pulsar1); PulsarResourceDescription rd = new PulsarResourceDescription(); rd.put("memory", new ResourceUsage(1024, 4096)); rd.put("cpu", new ResourceUsage(10, 100)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index 17c52a3e6492a9..9aced7211b0b01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -75,7 +75,7 @@ public void setUp() throws Exception { pulsar = mock(PulsarService.class); ns = mock(NamespaceService.class); auth = mock(AuthorizationService.class); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); clusters = new TreeSet<>(); clusters.add("use"); @@ -104,7 +104,7 @@ public void setUp() throws Exception { @Test public void crossColoLookup() throws Exception { - TopicLookup destLookup = spy(new TopicLookup()); + TopicLookup destLookup = spy(TopicLookup.class); doReturn(false).when(destLookup).isRequestHttps(); destLookup.setPulsar(pulsar); doReturn("null").when(destLookup).clientAppId(); @@ -130,7 +130,7 @@ public void crossColoLookup() throws Exception { @Test public void testLookupTopicNotExist() throws Exception { - MockTopicLookup destLookup = spy(new MockTopicLookup()); + MockTopicLookup destLookup = spy(MockTopicLookup.class); doReturn(false).when(destLookup).isRequestHttps(); destLookup.setPulsar(pulsar); doReturn("null").when(destLookup).clientAppId(); @@ -171,7 +171,7 @@ public void testNotEnoughLookupPermits() throws Exception { BrokerService brokerService = pulsar.getBrokerService(); doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore(); - TopicLookup destLookup = spy(new TopicLookup()); + TopicLookup destLookup = spy(TopicLookup.class); doReturn(false).when(destLookup).isRequestHttps(); destLookup.setPulsar(pulsar); doReturn("null").when(destLookup).clientAppId(); @@ -209,7 +209,7 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception { // doReturn(Optional.of(policies2)).when(policiesCache) // .get(AdminResource.path(POLICIES, property, cluster, ns2)); - TopicLookup destLookup = spy(new TopicLookup()); + TopicLookup destLookup = spy(TopicLookup.class); doReturn(false).when(destLookup).isRequestHttps(); destLookup.setPulsar(pulsar); doReturn("null").when(destLookup).clientAppId(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java index 7ea6c92971c61c..b9d40d0609d2f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java @@ -42,7 +42,7 @@ public class TopicLookupTest extends PulsarWebResourceTest { @Override protected ResourceConfig configure() { - resource = spy(new TestableTopicLookup()); + resource = spy(TestableTopicLookup.class); return new ResourceConfig().register(resource); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java index 9c9f7fd92275b9..c25c6c23fb0112 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.namespace; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; @@ -115,7 +116,7 @@ protected void startBroker() throws Exception { conf.setWebServicePortTls(Optional.of(0)); serviceConfigurationList.add(conf); - PulsarService pulsar = spy(new PulsarService(conf)); + PulsarService pulsar = spyWithClassAndConstructorArgs(PulsarService.class, conf); setupBrokerMocks(pulsar); pulsar.start(); @@ -129,7 +130,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper); doReturn(new ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(); - Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); + Supplier<NamespaceService> namespaceServiceSupplier = () -> spyWithClassAndConstructorArgs(NamespaceService.class, pulsar); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); SameThreadOrderedSafeExecutor executor = new SameThreadOrderedSafeExecutor(); @@ -153,7 +154,7 @@ public static MockZooKeeper createMockZooKeeper() throws Exception { } public static NonClosableMockBookKeeper createMockBookKeeper(OrderedExecutor executor) throws Exception { - return spy(new NonClosableMockBookKeeper(executor)); + return spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, executor); } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 2ced6d4fec61a0..a9b154d24fc8ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -29,7 +29,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; @@ -114,9 +113,9 @@ public class PersistentDispatcherFailoverConsumerTest { @BeforeMethod public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build(); - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - pulsar = spy(new PulsarService(svcConfig)); + pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); mlFactoryMock = mock(ManagedLedgerFactory.class); @@ -133,7 +132,7 @@ public void setup() throws Exception { PulsarResources pulsarResources = new PulsarResources(store, store); doReturn(pulsarResources).when(pulsar).getPulsarResources(); - brokerService = spy(new BrokerService(pulsar, eventLoopGroup)); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); doReturn(brokerService).when(pulsar).getBrokerService(); consumerChanges = new LinkedBlockingQueue<>(); @@ -159,9 +158,7 @@ public void setup() throws Exception { return null; }).when(channelCtx).writeAndFlush(any(), any()); - serverCnx = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); @@ -170,9 +167,7 @@ public void setup() throws Exception { doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); - serverCnxWithOldVersion = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + serverCnxWithOldVersion = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnxWithOldVersion).isActive(); doReturn(true).when(serverCnxWithOldVersion).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 02acdfa2b246a2..1ced87a5a2a6fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -18,14 +18,15 @@ */ package org.apache.pulsar.broker.service; -import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertFalse; - +import com.google.common.collect.Lists; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import java.lang.reflect.Method; import java.util.Collections; import java.util.List; @@ -34,9 +35,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -52,15 +50,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; - -import com.google.common.collect.Lists; import org.testng.annotations.Test; @Test(groups = "broker") @@ -81,10 +77,10 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase { @BeforeMethod public void setup(Method m) throws Exception { super.setUp(m); - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); @Cleanup - PulsarService pulsar = spy(new PulsarService(svcConfig)); + PulsarService pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); @Cleanup(value = "shutdownGracefully") @@ -98,12 +94,10 @@ public void setup(Method m) throws Exception { mlFactoryMock = factory; doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); - brokerService = spy(new BrokerService(pulsar, eventLoopGroup)); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); doReturn(brokerService).when(pulsar).getBrokerService(); - serverCnx = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); NamespaceService nsSvc = mock(NamespaceService.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 7c9400d2c69340..cd3f3ccad4788e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -18,11 +18,11 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -35,7 +35,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; @@ -173,10 +172,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { public void setup() throws Exception { eventLoopGroup = new NioEventLoopGroup(); executor = OrderedExecutor.newBuilder().numThreads(1).build(); - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setAdvertisedAddress("localhost"); svcConfig.setBrokerShutdownTimeoutMs(0L); - pulsar = spy(new PulsarService(svcConfig)); + pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); doReturn(mock(Compactor.class)).when(pulsar).getCompactor(); @@ -197,20 +196,18 @@ public void setup() throws Exception { doReturn(executor).when(pulsar).getOrderedExecutor(); store = new ZKMetadataStore(mockZk); - PulsarResources pulsarResources = spy(new PulsarResources(store, store)); - NamespaceResources nsr = spy(new NamespaceResources(store, store, 30)); + PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); + NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); doReturn(nsr).when(pulsarResources).getNamespaceResources(); doReturn(pulsarResources).when(pulsar).getPulsarResources(); doReturn(store).when(pulsar).getLocalMetadataStore(); doReturn(store).when(pulsar).getConfigurationMetadataStore(); - brokerService = spy(new BrokerService(pulsar, eventLoopGroup)); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); doReturn(brokerService).when(pulsar).getBrokerService(); - serverCnx = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); @@ -352,7 +349,7 @@ public void setMetadataFromEntryData(ByteBuf entryData) { @Test public void testDispatcherMultiConsumerReadFailed() throws Exception { - PersistentTopic topic = spy(new PersistentTopic(successTopicName, ledgerMock, brokerService)); + PersistentTopic topic = spyWithClassAndConstructorArgs(PersistentTopic.class, successTopicName, ledgerMock, brokerService); ManagedCursor cursor = mock(ManagedCursor.class); when(cursor.getName()).thenReturn("cursor"); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, null); @@ -362,7 +359,7 @@ public void testDispatcherMultiConsumerReadFailed() throws Exception { @Test public void testDispatcherSingleConsumerReadFailed() throws Exception { - PersistentTopic topic = spy(new PersistentTopic(successTopicName, ledgerMock, brokerService)); + PersistentTopic topic = spyWithClassAndConstructorArgs(PersistentTopic.class, successTopicName, ledgerMock, brokerService); ManagedCursor cursor = mock(ManagedCursor.class); when(cursor.getName()).thenReturn("cursor"); PersistentDispatcherSingleActiveConsumer dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, @@ -560,7 +557,7 @@ private void testMaxProducers() throws Exception { @Test public void testMaxProducersForBroker() throws Exception { // set max clients - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(2).when(svcConfig).getMaxProducersPerTopic(); doReturn(svcConfig).when(pulsar).getConfiguration(); testMaxProducers(); @@ -568,7 +565,7 @@ public void testMaxProducersForBroker() throws Exception { @Test public void testMaxProducersForNamespace() throws Exception { - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(svcConfig).when(pulsar).getConfiguration(); // set max clients Policies policies = new Policies(); @@ -591,9 +588,7 @@ private Producer getMockedProducerWithSpecificAddress(Topic topic, long producer final String producerNameBase = "producer"; final String role = "appid1"; - ServerCnx cnx = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(cnx).isActive(); doReturn(true).when(cnx).isWritable(); doReturn(new InetSocketAddress(address, 1234)).when(cnx).clientAddress(); @@ -607,7 +602,7 @@ private Producer getMockedProducerWithSpecificAddress(Topic topic, long producer @Test public void testMaxSameAddressProducers() throws Exception { // set max clients - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(2).when(svcConfig).getMaxSameAddressProducersPerTopic(); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -909,7 +904,7 @@ private void testMaxConsumersShared() throws Exception { @Test public void testMaxConsumersSharedForBroker() throws Exception { // set max clients - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(2).when(svcConfig).getMaxConsumersPerSubscription(); doReturn(3).when(svcConfig).getMaxConsumersPerTopic(); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -919,7 +914,7 @@ public void testMaxConsumersSharedForBroker() throws Exception { @Test public void testMaxConsumersSharedForNamespace() throws Exception { - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(svcConfig).when(pulsar).getConfiguration(); // set max clients @@ -1013,7 +1008,7 @@ private void testMaxConsumersFailover() throws Exception { @Test public void testMaxConsumersFailoverForBroker() throws Exception { // set max clients - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(2).when(svcConfig).getMaxConsumersPerSubscription(); doReturn(3).when(svcConfig).getMaxConsumersPerTopic(); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -1023,7 +1018,7 @@ public void testMaxConsumersFailoverForBroker() throws Exception { @Test public void testMaxConsumersFailoverForNamespace() throws Exception { - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(svcConfig).when(pulsar).getConfiguration(); // set max clients @@ -1048,9 +1043,7 @@ private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription final String consumerNameBase = "consumer"; final String role = "appid1"; - ServerCnx cnx = mock(ServerCnx.class, withSettings() - .useConstructor(pulsar) - .defaultAnswer(CALLS_REAL_METHODS)); + ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(cnx).isActive(); doReturn(true).when(cnx).isWritable(); doReturn(new InetSocketAddress(address, 1234)).when(cnx).clientAddress(); @@ -1064,7 +1057,7 @@ private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription @Test public void testMaxSameAddressConsumers() throws Exception { // set max clients - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(2).when(svcConfig).getMaxSameAddressConsumersPerTopic(); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -2071,13 +2064,13 @@ public void testCheckInactiveSubscriptions() throws Exception { ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); // This subscription is connected by consumer. - PersistentSubscription nonDeletableSubscription1 = spy(new PersistentSubscription(topic, "nonDeletableSubscription1", cursorMock, false)); + PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "nonDeletableSubscription1", cursorMock, false); subscriptions.put(nonDeletableSubscription1.getName(), nonDeletableSubscription1); // This subscription is not connected by consumer. - PersistentSubscription deletableSubscription1 = spy(new PersistentSubscription(topic, "deletableSubscription1", cursorMock, false)); + PersistentSubscription deletableSubscription1 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "deletableSubscription1", cursorMock, false); subscriptions.put(deletableSubscription1.getName(), deletableSubscription1); // This subscription is replicated. - PersistentSubscription nonDeletableSubscription2 = spy(new PersistentSubscription(topic, "nonDeletableSubscription2", cursorMock, true)); + PersistentSubscription nonDeletableSubscription2 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "nonDeletableSubscription2", cursorMock, true); subscriptions.put(nonDeletableSubscription2.getName(), nonDeletableSubscription2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -2096,7 +2089,7 @@ public void testCheckInactiveSubscriptions() throws Exception { NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject(); doReturn(Optional.of(new Policies())).when(nsr).getPolicies(ns); - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes(); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -2111,7 +2104,7 @@ public void testCheckInactiveSubscriptions() throws Exception { @Test public void testTopicFencingTimeout() throws Exception { - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); doReturn(svcConfig).when(pulsar).getConfiguration(); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); @@ -2265,8 +2258,8 @@ public void testGetReplicationClusters() throws Exception { topic.initialize(); assertNull(topic.getHierarchyTopicPolicies().getReplicationClusters().get()); - PulsarResources pulsarResources = spy(new PulsarResources(store, store)); - NamespaceResources nsr = spy(new NamespaceResources(store, store, 30)); + PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); + NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); doReturn(nsr).when(pulsarResources).getNamespaceResources(); doReturn(pulsarResources).when(pulsar).getPulsarResources(); CompletableFuture<Optional<Policies>> policiesFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4fe14d89f7c671..a27bc8bf2bad9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; @@ -151,9 +152,9 @@ public class ServerCnxTest { public void setup() throws Exception { eventLoopGroup = new NioEventLoopGroup(); executor = OrderedExecutor.newBuilder().numThreads(1).build(); - svcConfig = spy(new ServiceConfiguration()); + svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - pulsar = spy(new PulsarService(svcConfig)); + pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS)); @@ -175,14 +176,14 @@ public void setup() throws Exception { doReturn(store).when(pulsar).getLocalMetadataStore(); doReturn(store).when(pulsar).getConfigurationMetadataStore(); - brokerService = spy(new BrokerService(pulsar, eventLoopGroup)); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); BrokerInterceptor interceptor = mock(BrokerInterceptor.class); doReturn(interceptor).when(brokerService).getInterceptor(); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(executor).when(pulsar).getOrderedExecutor(); - PulsarResources pulsarResources = spy(new PulsarResources(store, store)); - namespaceResources = spy(new NamespaceResources(store, store, 30)); + PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); + namespaceResources = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); doReturn(pulsarResources).when(pulsar).getPulsarResources(); @@ -501,14 +502,14 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception { @Test(timeOut = 30000) public void testNonExistentTopic() throws Exception { - AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources())); + AuthorizationService authorizationService = spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthorizationEnabled(); svcConfig.setAuthorizationEnabled(true); Field providerField = AuthorizationService.class.getDeclaredField("provider"); providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, - pulsar.getPulsarResources())); + PulsarAuthorizationProvider authorizationProvider = spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, + pulsar.getPulsarResources()); providerField.set(authorizationService, authorizationProvider); doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); @@ -534,11 +535,11 @@ public void testNonExistentTopic() throws Exception { @Test(timeOut = 30000) public void testClusterAccess() throws Exception { svcConfig.setAuthorizationEnabled(true); - AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources())); + AuthorizationService authorizationService = spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); Field providerField = AuthorizationService.class.getDeclaredField("provider"); providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, - pulsar.getPulsarResources())); + PulsarAuthorizationProvider authorizationProvider = spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, + pulsar.getPulsarResources()); providerField.set(authorizationService, authorizationProvider); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -565,12 +566,12 @@ public void testClusterAccess() throws Exception { @Test(timeOut = 30000) public void testNonExistentTopicSuperUserAccess() throws Exception { - AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources())); + AuthorizationService authorizationService = spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthorizationEnabled(); Field providerField = AuthorizationService.class.getDeclaredField("provider"); providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources())); + PulsarAuthorizationProvider authorizationProvider = spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, pulsar.getPulsarResources()); providerField.set(authorizationService, authorizationProvider); doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index f25b346a3360ae..9c81a754c265a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -85,8 +86,8 @@ public void testMarkerDeleteTimes() throws Exception { doReturn(false).when(configuration).isTransactionCoordinatorEnabled(); doReturn(managedLedger).when(topic).getManagedLedger(); ManagedCursor cursor = managedLedger.openCursor("test"); - PersistentSubscription persistentSubscription = spy(new PersistentSubscription(topic, "test", - cursor, false)); + PersistentSubscription persistentSubscription = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", + cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, Collections.emptyMap()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index c23c20ff0033a6..f62a65ad36a9ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -70,7 +71,7 @@ public void testIsDuplicate() { doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); PersistentTopic persistentTopic = mock(PersistentTopic.class); ManagedLedger managedLedger = mock(ManagedLedger.class); - MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, persistentTopic, managedLedger)); + MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, persistentTopic, managedLedger); doReturn(true).when(messageDeduplication).isEnabled(); String producerName1 = "producer1"; @@ -163,7 +164,7 @@ public void testInactiveProducerRemove() throws Exception { serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1); doReturn(serviceConfiguration).when(pulsarService).getConfiguration(); - MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, topic, managedLedger)); + MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger); doReturn(true).when(messageDeduplication).isEnabled(); Topic.PublishContext publishContext = mock(Topic.PublishContext.class); @@ -239,7 +240,7 @@ public void testIsDuplicateWithFailure() { doReturn(pulsarService).when(brokerService).pulsar(); doReturn(new BacklogQuotaManager(pulsarService)).when(brokerService).getBacklogQuotaManager(); - PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication)); + PersistentTopic persistentTopic = spyWithClassAndConstructorArgs(PersistentTopic.class, "topic-1", brokerService, managedLedger, messageDeduplication); String producerName1 = "producer1"; ByteBuf byteBuf1 = getMessage(producerName1, 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index a76c6374e5c680..b1cb0ea173094a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; @@ -110,10 +111,10 @@ public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-subscription-test").build(); eventLoopGroup = new NioEventLoopGroup(); - ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); svcConfig.setTransactionCoordinatorEnabled(true); - pulsarMock = spy(new PulsarService(svcConfig)); + pulsarMock = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); PulsarResources pulsarResources = mock(PulsarResources.class); doReturn(pulsarResources).when(pulsarMock).getPulsarResources(); NamespaceResources namespaceResources = mock(NamespaceResources.class); @@ -183,7 +184,7 @@ public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription doReturn(store).when(pulsarMock).getLocalMetadataStore(); doReturn(store).when(pulsarMock).getConfigurationMetadataStore(); - brokerMock = spy(new BrokerService(pulsarMock, eventLoopGroup)); + brokerMock = spyWithClassAndConstructorArgs(BrokerService.class, pulsarMock, eventLoopGroup); doNothing().when(brokerMock).unloadNamespaceBundlesGracefully(); doReturn(brokerMock).when(pulsarMock).getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 03389352629e65..daf773c801984f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.service.plugin; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.AssertJUnit.assertEquals; @@ -76,9 +76,9 @@ public void testFilter() throws Exception { field.setAccessible(true); NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spy(new EntryFilterWithClassLoader(filter1, narClassLoader)); + EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); EntryFilter filter2 = new EntryFilter2Test(); - EntryFilterWithClassLoader loader2 = spy(new EntryFilterWithClassLoader(filter2, narClassLoader)); + EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader); field.set(dispatcher, ImmutableList.of(loader1, loader2)); Producer<String> producer = pulsarClient.newProducer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 04ea1dda4c10b8..ab248a2ba017e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.transaction; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; @@ -170,7 +171,7 @@ protected void startBroker() throws Exception { conf.setTopicLevelPoliciesEnabled(true); serviceConfigurationList.add(conf); - PulsarService pulsar = spy(new PulsarService(conf)); + PulsarService pulsar = spyWithClassAndConstructorArgs(PulsarService.class, conf); setupBrokerMocks(pulsar); pulsar.start(); @@ -185,7 +186,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession.newInstance(mockZooKeeper); doReturn(new ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(); - Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); + Supplier<NamespaceService> namespaceServiceSupplier = () -> spyWithClassAndConstructorArgs(NamespaceService.class, pulsar); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); SameThreadOrderedSafeExecutor executor = new SameThreadOrderedSafeExecutor(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 04579078ad4af4..aba5b044583dfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.transaction.coordinator; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import java.util.Optional; import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup; import org.apache.pulsar.broker.PulsarService; @@ -76,7 +77,7 @@ protected final void setup() throws Exception { config.setTransactionCoordinatorEnabled(true); configurations[i] = config; - pulsarServices[i] = Mockito.spy(new PulsarService(config)); + pulsarServices[i] = spyWithClassAndConstructorArgs(PulsarService.class, config); pulsarServices[i].start(); pulsarAdmins[i] = PulsarAdmin.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 13823472a843db..ae611d7e4eb98b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.web; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -378,7 +378,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU config.setHttpRequestsLimitEnabled(true); config.setHttpRequestsMaxPerSecond(rateLimit); } - pulsar = spy(new PulsarService(config)); + pulsar = spyWithClassAndConstructorArgs(PulsarService.class, config); // mock zk MockZooKeeper mockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index a2b4b5c3dbf530..2ddb9e8c8a35a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -81,7 +81,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index 1984e1a8c737da..1a56d689799bed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; @@ -631,7 +631,7 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw // if broker unload bundle gracefully then cursor metadata recovered from zk else from ledger if (unloadBundleGracefully) { // set clean namespace which will not let broker unload bundle gracefully: stop broker - Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); + Supplier<NamespaceService> namespaceServiceSupplier = () -> spyWithClassAndConstructorArgs(NamespaceService.class, pulsar); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); } stopBroker(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index 1e97550322b67d..0193f592c756f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -58,7 +59,7 @@ protected void cleanup() throws Exception { public void testSingleIpAddress() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); @@ -78,7 +79,7 @@ public void testDoubleIpAddress() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); @@ -101,7 +102,7 @@ public void testNoConnectionPool() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setConnectionsPerBroker(0); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get()); @@ -123,7 +124,7 @@ public void testEnableConnectionPool() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setConnectionsPerBroker(5); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index b99276bca56ea2..173794a2cfb58c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -123,7 +123,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index a4d6cb85c000bc..69886a696c7ac5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -190,7 +190,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName(CLUSTER); Set<String> superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 9f5e525d6b7150..d985241e2903d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -119,7 +119,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); Set<String> superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 00f4f3ee021d86..a9fd06dc05809e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -87,7 +87,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); final Set<String> superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 52108a477ceb89..9e1edea2f80292 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -110,7 +110,7 @@ public void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); Set<String> superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 9d1495d8b7d4f2..b3126defa11d28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -93,7 +93,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setClusterName("use"); Set<String> superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 5867bb78404d6d..15ee27dc3a56c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -105,7 +105,7 @@ void setup(Method method) throws Exception { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble.start(); - config = spy(new ServiceConfiguration()); + config = spy(ServiceConfiguration.class); config.setBrokerShutdownTimeoutMs(0L); config.setClusterName("use"); Set<String> superUsers = Sets.newHashSet("superUser", "admin"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 5741a5eb0e6480..764d4a5b7915a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -19,26 +19,22 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.ArgumentMatchers.anyString; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; - +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; import com.google.common.collect.Sets; - import java.net.URI; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -86,7 +82,7 @@ public void setup() throws Exception { config.setAnonymousUserRole("anonymousUser"); } - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 69de71e1a8c102..15c3e50e185b85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -18,19 +18,16 @@ */ package org.apache.pulsar.websocket.proxy; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; - import com.google.common.collect.Sets; - import java.util.EnumSet; import java.util.Optional; import java.util.Set; - import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.TopicName; @@ -67,7 +64,7 @@ protected void setup() throws Exception { config.setClusterName("c1"); config.setWebServicePort(Optional.of(0)); config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); service.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index 3848d6ecc1dc6c..aeeab2afddcce9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -18,14 +18,12 @@ */ package org.apache.pulsar.websocket.proxy; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; - import java.util.Optional; - import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -65,7 +63,7 @@ public Object[][] setProxyConfig() { public void configTest(int numIoThreads, int connectionsPerBroker) throws Exception { config.setWebSocketNumIoThreads(numIoThreads); config.setWebSocketConnectionsPerBroker(connectionsPerBroker); - WebSocketService service = spy(new WebSocketService(config)); + WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); service.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index b12f670b8b22e4..177f6f8795a0fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -19,20 +19,20 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Sets; import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.google.gson.reflect.TypeToken; - import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.servlet.http.HttpServletResponse; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -51,7 +50,6 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Producer; @@ -81,8 +79,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; @Test(groups = "websocket") public class ProxyPublishConsumeTest extends ProducerConsumerBase { @@ -104,7 +100,7 @@ public void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setClusterName("test"); config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 4c82615aa7fc96..b8702522fce131 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -19,18 +19,16 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - import java.net.URI; import java.security.GeneralSecurityException; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.pulsar.client.api.TlsProducerConsumerBase; import org.apache.pulsar.client.impl.auth.AuthenticationTls; @@ -76,7 +74,7 @@ public void setup() throws Exception { config.setBrokerClientAuthenticationParameters("tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 485f23bdeb165d..1fb12645e5e346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -19,17 +19,15 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - import java.net.URI; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -63,7 +61,7 @@ public void setup() throws Exception { config.setClusterName("test"); config.setServiceUrl(pulsar.getSafeWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index d315a10c46a280..842acede2492ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -19,26 +19,22 @@ package org.apache.pulsar.websocket.proxy.v1; import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - import com.google.common.collect.Sets; - import java.net.URI; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import lombok.Cleanup; import org.apache.pulsar.client.api.v1.V1_ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -88,7 +84,7 @@ public void setup() throws Exception { config.setAnonymousUserRole("anonymousUser"); } - service = spy(new WebSocketService(config)); + service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java index 48f5816e2d2d5d..723080c134fcb1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java @@ -43,7 +43,7 @@ public int choosePartition(Message<?> msg) { @SuppressWarnings("deprecation") @Test public void testChoosePartition() { - MessageRouter router = spy(new TestMessageRouter()); + MessageRouter router = spy(TestMessageRouter.class); Message<?> mockedMsg = mock(Message.class); TopicMetadata mockedMetadata = mock(TopicMetadata.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 79871db87840d6..be832236aacc29 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -946,7 +946,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc WorkerConfig workerConfig = new WorkerConfig(); workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory); - KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new KubernetesRuntimeFactory()); + KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(KubernetesRuntimeFactory.class); doNothing().when(mockedKubernetesRuntimeFactory).initialize( any(WorkerConfig.class), any(AuthenticationConfig.class), @@ -1112,7 +1112,7 @@ public void testKubernetesFunctionInstancesRestart() throws Exception { WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory = new WorkerConfig.KubernetesContainerFactory(); workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory); - KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new KubernetesRuntimeFactory()); + KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(KubernetesRuntimeFactory.class); doNothing().when(mockedKubernetesRuntimeFactory).initialize( any(WorkerConfig.class), any(AuthenticationConfig.class), diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 39d8ba8213cc89..b673fc368e6ed5 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -357,7 +357,7 @@ protected static List<PulsarColumnHandle> getColumnColumnHandles(TopicName topic public static PulsarMetadata mockColumnMetadata() { ConnectorContext prestoConnectorContext = new TestingConnectorContext(); - PulsarConnectorConfig pulsarConnectorConfig = spy(new PulsarConnectorConfig()); + PulsarConnectorConfig pulsarConnectorConfig = spy(PulsarConnectorConfig.class); pulsarConnectorConfig.setMaxEntryReadBatchSize(1); pulsarConnectorConfig.setMaxSplitEntryQueueSize(10); pulsarConnectorConfig.setMaxSplitMessageQueueSize(100); @@ -446,7 +446,7 @@ protected static List<String> getPartitionedTopics(String ns) { @BeforeMethod public void setup() throws Exception { - this.pulsarConnectorConfig = spy(new PulsarConnectorConfig()); + this.pulsarConnectorConfig = spy(PulsarConnectorConfig.class); this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1); this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10); this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java index 218dc7c449e0ea..dbde648ee95e84 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java @@ -445,7 +445,7 @@ public void testGetSchemaInfo() throws Exception { PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class); Schemas schemas = Mockito.mock(Schemas.class); Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas); - PulsarConnectorConfig connectorConfig = spy(new PulsarConnectorConfig()); + PulsarConnectorConfig connectorConfig = spy(PulsarConnectorConfig.class); Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin); PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor( new ArrayList<>(), pulsarSplit, connectorConfig, Mockito.mock(ManagedLedgerFactory.class), diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java index cd4fcaf0d1e330..5cd46832516088 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java @@ -62,7 +62,7 @@ public abstract class AbstractDecoderTester { protected void init() { ConnectorContext prestoConnectorContext = new TestingConnectorContext(); this.decoderFactory = new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()); - this.pulsarConnectorConfig = spy(new PulsarConnectorConfig()); + this.pulsarConnectorConfig = spy(PulsarConnectorConfig.class); this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1); this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10); this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);