From 355c07bd38671dab2526e2560058f3e74e4aa615 Mon Sep 17 00:00:00 2001 From: hleecs Date: Fri, 26 May 2023 10:33:59 +0800 Subject: [PATCH 1/6] [feat][broker]PIP-255 Part-1: Add listener interface for namespace service --- .../NamespaceBundleSplitListener.java | 30 +++++++++++++++++ .../broker/namespace/NamespaceService.java | 27 +++++++++++++++ .../namespace/NamespaceCreateBundlesTest.java | 33 +++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java new file mode 100644 index 0000000000000..141a67c789723 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.namespace; + +import org.apache.pulsar.common.naming.NamespaceBundle; + +import java.util.function.Predicate; + +/** + * Listener for NamespaceBundle split. + */ +public interface NamespaceBundleSplitListener extends Predicate { + void onSplit(NamespaceBundle bundle); +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 9d8d9e3890a19..37ce575a5b67c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -139,6 +139,10 @@ public class NamespaceService implements AutoCloseable { private final ConcurrentOpenHashMap namespaceClients; private final List bundleOwnershipListeners; + + private final List bundleSplitListeners; + + private final RedirectManager redirectManager; @@ -167,6 +171,7 @@ public NamespaceService(PulsarService pulsar) { this.namespaceClients = ConcurrentOpenHashMap.newBuilder().build(); this.bundleOwnershipListeners = new CopyOnWriteArrayList<>(); + this.bundleSplitListeners = new CopyOnWriteArrayList<>(); this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); this.redirectManager = new RedirectManager(pulsar); } @@ -975,6 +980,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, // affect the split operation which is already safely completed r.forEach(this::unloadNamespaceBundle); } + onNamespaceBundleSplit(bundle); }) .exceptionally(e -> { String msg1 = format( @@ -1230,6 +1236,18 @@ protected void onNamespaceBundleUnload(NamespaceBundle bundle) { } } + protected void onNamespaceBundleSplit(NamespaceBundle bundle) { + for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) { + try { + if (bundleSplitListener.test(bundle)) { + bundleSplitListener.unLoad(bundle); + } + } catch (Throwable t) { + LOG.error("Call bundle {} split lister error", bundle, t); + } + } + } + public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) { Objects.requireNonNull(listeners); for (NamespaceBundleOwnershipListener listener : listeners) { @@ -1240,6 +1258,15 @@ public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); } + public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) { + Objects.requireNonNull(listeners); + for (NamespaceBundleSplitListener listener : listeners) { + if (listener != null) { + bundleSplitListeners.add(listener); + } + } + } + private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle, NamespaceBundleOwnershipListener... listeners) { if (listeners != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java index 43d37466918ce..a96e135eff684 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java @@ -20,15 +20,21 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import static lombok.Cleanup; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.Policies; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -81,4 +87,31 @@ public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Excep assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName)); producer.close(); } + + @Test + public void testBundleSplitListener() throws Exception { + String namespaceName = "prop/" + UUID.randomUUID().toString(); + String topicName = "persistent://" + namespaceName + "/my-topics"; + admin.namespaces().createNamespace(namespaceName); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName).sendTimeout(1, + TimeUnit.SECONDS).create(); + producer.send(new byte[1]); + String bundleRange = admin.lookups().getBundleRange(topicName); + AtomicBoolean isTriggered = new AtomicBoolean(false); + pulsar.getNamespaceService().addNamespaceBundleSplitListener(new NamespaceBundleSplitListener() { + @Override + public void onSplit(NamespaceBundle bundle) { + assertEquals(bundleRange, bundle.getBundleRange()); + isTriggered.set(true); + } + + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return true; + } + }); + admin.namespaces().splitNamspaceBundle(namespaceName, bundleRange, false, null); + Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get())); + } } From 681bef8ae315f7abc2ea9872a672bea373858076 Mon Sep 17 00:00:00 2001 From: hleecs Date: Fri, 26 May 2023 10:45:42 +0800 Subject: [PATCH 2/6] [feat][broker]PIP-255 Part-1: Add listener interface for namespace service --- .../apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../pulsar/broker/namespace/NamespaceCreateBundlesTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 37ce575a5b67c..ca3f12e982654 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1228,7 +1228,7 @@ protected void onNamespaceBundleUnload(NamespaceBundle bundle) { for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) { try { if (bundleOwnedListener.test(bundle)) { - bundleOwnedListener.unLoad(bundle); + bundleOwnedListener.onSplit(bundle); } } catch (Throwable t) { LOG.error("Call bundle {} ownership lister error", bundle, t); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java index a96e135eff684..73cfaf1b0d96b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java @@ -22,7 +22,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -import static lombok.Cleanup; +import lombok.Cleanup; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -91,7 +91,7 @@ public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Excep @Test public void testBundleSplitListener() throws Exception { String namespaceName = "prop/" + UUID.randomUUID().toString(); - String topicName = "persistent://" + namespaceName + "/my-topics"; + String topicName = "persistent://" + namespaceName + "/my-topic5"; admin.namespaces().createNamespace(namespaceName); @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).sendTimeout(1, @@ -111,7 +111,7 @@ public boolean test(NamespaceBundle namespaceBundle) { return true; } }); - admin.namespaces().splitNamspaceBundle(namespaceName, bundleRange, false, null); + admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange, false, null); Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get())); } } From a25e47285f2238a7555bb5ed6c1d597d19441983 Mon Sep 17 00:00:00 2001 From: hleecs Date: Fri, 26 May 2023 10:50:24 +0800 Subject: [PATCH 3/6] [feat][broker]PIP-255 Part-1: Add listener interface for namespace service --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ca3f12e982654..ad497089e0e70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1228,7 +1228,7 @@ protected void onNamespaceBundleUnload(NamespaceBundle bundle) { for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) { try { if (bundleOwnedListener.test(bundle)) { - bundleOwnedListener.onSplit(bundle); + bundleOwnedListener.unLoad(bundle); } } catch (Throwable t) { LOG.error("Call bundle {} ownership lister error", bundle, t); @@ -1240,7 +1240,7 @@ protected void onNamespaceBundleSplit(NamespaceBundle bundle) { for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) { try { if (bundleSplitListener.test(bundle)) { - bundleSplitListener.unLoad(bundle); + bundleSplitListener.onSplit(bundle); } } catch (Throwable t) { LOG.error("Call bundle {} split lister error", bundle, t); From 05d5f0812eae7f64bfeedcf0d1ef394ea3308c30 Mon Sep 17 00:00:00 2001 From: hleecs Date: Sat, 27 May 2023 12:30:17 +0800 Subject: [PATCH 4/6] fix checkstyle & compile error --- .../pulsar/broker/namespace/NamespaceBundleSplitListener.java | 4 ++-- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java index 141a67c789723..c162855d1bf3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java @@ -18,10 +18,10 @@ */ package org.apache.pulsar.broker.namespace; -import org.apache.pulsar.common.naming.NamespaceBundle; - import java.util.function.Predicate; +import org.apache.pulsar.common.naming.NamespaceBundle; + /** * Listener for NamespaceBundle split. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ad497089e0e70..616922a3c92a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -140,7 +140,7 @@ public class NamespaceService implements AutoCloseable { private final List bundleOwnershipListeners; - private final List bundleSplitListeners; + private final List bundleSplitListeners; private final RedirectManager redirectManager; From 4b0cce15d0d1b9d8750bd7a12f96d818a40720e4 Mon Sep 17 00:00:00 2001 From: hleecs Date: Sat, 27 May 2023 13:53:19 +0800 Subject: [PATCH 5/6] fix checkstyle --- .../pulsar/broker/namespace/NamespaceBundleSplitListener.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java index c162855d1bf3d..a3312f5689e38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.namespace; import java.util.function.Predicate; - import org.apache.pulsar.common.naming.NamespaceBundle; /** From 5d9fe2955f259bdff025766bcc6f411b8b04ca82 Mon Sep 17 00:00:00 2001 From: hleecs Date: Thu, 1 Jun 2023 08:41:07 +0800 Subject: [PATCH 6/6] log add split listener info --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 616922a3c92a3..69f0c2ec9c4d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1243,7 +1243,7 @@ protected void onNamespaceBundleSplit(NamespaceBundle bundle) { bundleSplitListener.onSplit(bundle); } } catch (Throwable t) { - LOG.error("Call bundle {} split lister error", bundle, t); + LOG.error("Call bundle {} split listener {} error", bundle, bundleSplitListener, t); } } }