diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1471d4a75c175..f66ed2a5c9062 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1024,7 +1024,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); log.info("Successfully split {} parent namespace-bundle to {} in {} ms", parentBundle, childBundles, splitBundleTime); - namespaceService.onNamespaceBundleSplit(parentBundle); completionFuture.complete(null); }) .exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java deleted file mode 100644 index a3312f5689e38..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.namespace; - -import java.util.function.Predicate; -import org.apache.pulsar.common.naming.NamespaceBundle; - -/** - * Listener for NamespaceBundle split. - */ -public interface NamespaceBundleSplitListener extends Predicate { - void onSplit(NamespaceBundle bundle); -} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e04be25fe499c..5236202df0acf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -140,10 +140,6 @@ public class NamespaceService implements AutoCloseable { private final ConcurrentOpenHashMap namespaceClients; private final List bundleOwnershipListeners; - - private final List bundleSplitListeners; - - private final RedirectManager redirectManager; @@ -172,7 +168,6 @@ public NamespaceService(PulsarService pulsar) { this.namespaceClients = ConcurrentOpenHashMap.newBuilder().build(); this.bundleOwnershipListeners = new CopyOnWriteArrayList<>(); - this.bundleSplitListeners = new CopyOnWriteArrayList<>(); this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); this.redirectManager = new RedirectManager(pulsar); } @@ -1005,7 +1000,6 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, // affect the split operation which is already safely completed r.forEach(this::unloadNamespaceBundle); } - onNamespaceBundleSplit(bundle); }) .exceptionally(e -> { String msg1 = format( @@ -1246,19 +1240,6 @@ public void onNamespaceBundleUnload(NamespaceBundle bundle) { } } } - - public void onNamespaceBundleSplit(NamespaceBundle bundle) { - for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) { - try { - if (bundleSplitListener.test(bundle)) { - bundleSplitListener.onSplit(bundle); - } - } catch (Throwable t) { - LOG.error("Call bundle {} split listener {} error", bundle, bundleSplitListener, t); - } - } - } - public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) { Objects.requireNonNull(listeners); for (NamespaceBundleOwnershipListener listener : listeners) { @@ -1269,15 +1250,6 @@ public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); } - public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) { - Objects.requireNonNull(listeners); - for (NamespaceBundleSplitListener listener : listeners) { - if (listener != null) { - bundleSplitListeners.add(listener); - } - } - } - private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle, NamespaceBundleOwnershipListener... listeners) { if (listeners != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 03ea937c44edc..169ff89fe3c0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -97,7 +97,6 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; -import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -368,23 +367,6 @@ public void testSplitBundleAdminAPI() throws Exception { String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1); - AtomicInteger splitCount = new AtomicInteger(0); - NamespaceBundleSplitListener namespaceBundleSplitListener = new NamespaceBundleSplitListener() { - @Override - public void onSplit(NamespaceBundle bundle) { - splitCount.incrementAndGet(); - } - - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return namespaceBundle - .toString() - .equals(String.format(namespace + "/0x%08x_0x%08x", bundleRanges.get(0), bundleRanges.get(1))); - } - }; - pulsar1.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener); - pulsar2.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener); - long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2; admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); @@ -397,7 +379,6 @@ public boolean test(NamespaceBundle namespaceBundle) { assertTrue(bundlesData.getBoundaries().contains(lowBundle)); assertTrue(bundlesData.getBoundaries().contains(midBundle)); assertTrue(bundlesData.getBoundaries().contains(highBundle)); - assertEquals(splitCount.get(), 1); // Test split bundle with invalid bundle range. try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java index 73cfaf1b0d96b..43d37466918ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java @@ -20,21 +20,15 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import lombok.Cleanup; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.Policies; -import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -87,31 +81,4 @@ public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Excep assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName)); producer.close(); } - - @Test - public void testBundleSplitListener() throws Exception { - String namespaceName = "prop/" + UUID.randomUUID().toString(); - String topicName = "persistent://" + namespaceName + "/my-topic5"; - admin.namespaces().createNamespace(namespaceName); - @Cleanup - Producer producer = pulsarClient.newProducer().topic(topicName).sendTimeout(1, - TimeUnit.SECONDS).create(); - producer.send(new byte[1]); - String bundleRange = admin.lookups().getBundleRange(topicName); - AtomicBoolean isTriggered = new AtomicBoolean(false); - pulsar.getNamespaceService().addNamespaceBundleSplitListener(new NamespaceBundleSplitListener() { - @Override - public void onSplit(NamespaceBundle bundle) { - assertEquals(bundleRange, bundle.getBundleRange()); - isTriggered.set(true); - } - - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return true; - } - }); - admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange, false, null); - Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get())); - } }