Skip to content

Commit

Permalink
[fix] [broker] remove bundle-data in local metadata store. (#21078)
Browse files Browse the repository at this point in the history
Motivation: When deleting a namespace, we will delete znode under the path `/loadbalance/bundle-data` in `local metadata store` instead of `global metadata store`.

Modifications: Delete bundle data znode in local metadata store.
  • Loading branch information
thetumbled authored and Technoboy- committed Sep 5, 2023
1 parent b2c7052 commit 59ca24a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,18 @@ public class NamespaceResources extends BaseResources<Policies> {
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
private final MetadataStore localStore;

public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
this.localStore = localStore;
}

public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
Expand Down Expand Up @@ -381,13 +383,13 @@ public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
return getStore().deleteRecursive(namespaceBundlePath);
return this.localStore.deleteRecursive(namespaceBundlePath);
}

// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
return getStore().deleteRecursive(tenantBundlePath);
return this.localStore.deleteRecursive(tenantBundlePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(localMetadataStore, configurationMetadataStore
, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,61 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NamespaceResourcesTest {

private MetadataStore localStore;
private MetadataStore configurationStore;
private NamespaceResources namespaceResources;

private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

@BeforeMethod
public void setup() {
localStore = mock(MetadataStore.class);
configurationStore = mock(MetadataStore.class);
namespaceResources = new NamespaceResources(localStore, configurationStore, 30);
}

@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}

/**
* Test that the bundle-data node is deleted from the local stores.
*/
@Test
public void testDeleteBundleDataAsync() {
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
namespaceResources.deleteBundleDataAsync(ns);

String tenant="my-tenant";
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
namespaceResources.deleteBundleDataTenantAsync(tenant);

verify(localStore).deleteRecursive(namespaceBundlePath);
verify(localStore).deleteRecursive(tenantBundlePath);

assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -1720,6 +1721,8 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
// Set conf.
cleanup();
setNamespaceAttr(namespaceAttr);
this.conf.setMetadataStoreUrl("127.0.0.1:2181");
this.conf.setConfigurationMetadataStoreUrl("127.0.0.1:2182");
setup();

String tenant = newUniqueName("test-tenant");
Expand All @@ -1740,6 +1743,28 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());

final String managedLedgersPath = "/managed-ledgers/" + namespace;
final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
// Trigger bundle owned by brokers.
pulsarClient.newProducer().topic(topic).create().close();
// Trigger bundle data write to ZK.
Awaitility.await().untilAsserted(() -> {
boolean bundleDataWereWriten = false;
for (PulsarService ps : new PulsarService[]{pulsar, mockPulsarSetup.getPulsar()}) {
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper) ps.getLoadManager().get();
ModularLoadManagerImpl loadManagerImpl = (ModularLoadManagerImpl) loadManager.getLoadManager();
ps.getBrokerService().updateRates();
loadManagerImpl.updateLocalBrokerData();
loadManagerImpl.writeBundleDataOnZooKeeper();
bundleDataWereWriten = bundleDataWereWriten || ps.getLocalMetadataStore().exists(bundleDataPath).join();
}
assertTrue(bundleDataWereWriten);
});

// assert znode exists in metadata store
assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());

try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
Expand All @@ -1756,12 +1781,8 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());


final String managedLedgersPath = "/managed-ledgers/" + namespace;
// assert znode deleted in metadata store
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());


final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
if (metadataStore == null) {
metadataStore = builder.configurationMetadataStore;
}
NamespaceResources nsr = spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
NamespaceResources nsr = spyConfigPulsarResources.spy(NamespaceResources.class,
builder.localMetadataStore, metadataStore, 30);
TopicResources tsr = spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
pulsarResources(
spyConfigPulsarResources.spy(
Expand Down

0 comments on commit 59ca24a

Please sign in to comment.