Skip to content

Commit

Permalink
Revert Add listener interface for namespace service apache#20406
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Jul 10, 2023
1 parent 6e2f668 commit f3bb89d
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
log.info("Successfully split {} parent namespace-bundle to {} in {} ms",
parentBundle, childBundles, splitBundleTime);
namespaceService.onNamespaceBundleSplit(parentBundle);
completionFuture.complete(null);
})
.exceptionally(ex -> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ public class NamespaceService implements AutoCloseable {
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;

private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;

private final List<NamespaceBundleSplitListener> bundleSplitListeners;


private final RedirectManager redirectManager;


Expand Down Expand Up @@ -171,7 +167,6 @@ public NamespaceService(PulsarService pulsar) {
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
}
Expand Down Expand Up @@ -1018,7 +1013,6 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
// affect the split operation which is already safely completed
r.forEach(this::unloadNamespaceBundle);
}
onNamespaceBundleSplit(bundle);
})
.exceptionally(e -> {
String msg1 = format(
Expand Down Expand Up @@ -1261,19 +1255,6 @@ public void onNamespaceBundleUnload(NamespaceBundle bundle) {
}
}
}

public void onNamespaceBundleSplit(NamespaceBundle bundle) {
for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) {
try {
if (bundleSplitListener.test(bundle)) {
bundleSplitListener.onSplit(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} split listener {} error", bundle, bundleSplitListener, t);
}
}
}

public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleOwnershipListener listener : listeners) {
Expand All @@ -1284,15 +1265,6 @@ public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener
getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners));
}

public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleSplitListener listener : listeners) {
if (listener != null) {
bundleSplitListeners.add(listener);
}
}
}

private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
NamespaceBundleOwnershipListener... listeners) {
if (listeners != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
Expand Down Expand Up @@ -406,23 +405,6 @@ public void testSplitBundleAdminAPI() throws Exception {

String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1);

AtomicInteger splitCount = new AtomicInteger(0);
NamespaceBundleSplitListener namespaceBundleSplitListener = new NamespaceBundleSplitListener() {
@Override
public void onSplit(NamespaceBundle bundle) {
splitCount.incrementAndGet();
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle
.toString()
.equals(String.format(namespace + "/0x%08x_0x%08x", bundleRanges.get(0), bundleRanges.get(1)));
}
};
pulsar1.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
pulsar2.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);

long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2;

admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);
Expand All @@ -435,7 +417,6 @@ public boolean test(NamespaceBundle namespaceBundle) {
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 1);

// Test split bundle with invalid bundle range.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import lombok.Cleanup;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.Policies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -87,31 +81,4 @@ public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Excep
assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName));
producer.close();
}

@Test
public void testBundleSplitListener() throws Exception {
String namespaceName = "prop/" + UUID.randomUUID().toString();
String topicName = "persistent://" + namespaceName + "/my-topic5";
admin.namespaces().createNamespace(namespaceName);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).sendTimeout(1,
TimeUnit.SECONDS).create();
producer.send(new byte[1]);
String bundleRange = admin.lookups().getBundleRange(topicName);
AtomicBoolean isTriggered = new AtomicBoolean(false);
pulsar.getNamespaceService().addNamespaceBundleSplitListener(new NamespaceBundleSplitListener() {
@Override
public void onSplit(NamespaceBundle bundle) {
assertEquals(bundleRange, bundle.getBundleRange());
isTriggered.set(true);
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}
});
admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange, false, null);
Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get()));
}
}

0 comments on commit f3bb89d

Please sign in to comment.