From 1a0348d9271d684b20d302e98af92d0e55abd116 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 13 Jul 2022 19:44:19 +0800 Subject: [PATCH] [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287) Master Issue: #14365 ### Motivation Please see #14365 ### Modifications * Make Namespaces.deleteNamespaceBundle async * Combine internalDeleteNamespaceBundle * Make removeOwnedServiceUnit async --- .../broker/admin/impl/NamespacesBase.java | 226 +++++++----------- .../pulsar/broker/admin/v1/Namespaces.java | 24 +- .../pulsar/broker/admin/v2/Namespaces.java | 21 +- .../broker/namespace/NamespaceService.java | 7 +- .../pulsar/broker/admin/NamespacesTest.java | 28 +-- 5 files changed, 131 insertions(+), 175 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index a76072891a38c8..0ab82513b2b3c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -25,8 +25,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.lang.reflect.Field; +import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -551,153 +553,95 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo }); } - protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) { - if (force) { - internalDeleteNamespaceBundleForcefully(bundleRange, authoritative); - } else { - internalDeleteNamespaceBundle(bundleRange, authoritative); - } - } - @SuppressWarnings("deprecation") - protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE); - validatePoliciesReadOnlyAccess(); - - // ensure that non-global namespace is directed to the correct cluster - if (!namespaceName.isGlobal()) { - validateClusterOwnership(namespaceName.getCluster()); - } - - Policies policies = getNamespacePolicies(namespaceName); - // ensure the local cluster is the only cluster for the global namespace configuration - try { - if (namespaceName.isGlobal()) { - if (policies.replication_clusters.size() > 1) { - // There are still more than one clusters configured for the global namespace - throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " - + namespaceName + ". There are still more than one replication clusters configured."); - } - if (policies.replication_clusters.size() == 1 - && !policies.replication_clusters.contains(config().getClusterName())) { - // the only replication cluster is other cluster, redirect - String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = - clusterResources().getCluster(replCluster) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Cluster " + replCluster + " does not exist")); - URL replClusterUrl; - if (!config().isTlsEnabled() || !isRequestHttps()) { - replClusterUrl = new URL(replClusterData.getServiceUrl()); - } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { - replClusterUrl = new URL(replClusterData.getServiceUrlTls()); - } else { - throw new RestException(Status.PRECONDITION_FAILED, - "The replication cluster does not provide TLS encrypted service"); - } - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) - .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build(); - if (log.isDebugEnabled()) { - log.debug("[{}] Redirecting the rest call to {}: cluster={}", - clientAppId(), redirect, replCluster); + protected CompletableFuture internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative, + boolean force) { + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> { + if (!namespaceName.isGlobal()) { + return validateClusterOwnershipAsync(namespaceName.getCluster()); } - throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); - } - } - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - throw new RestException(e); - } - - try { - NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, - authoritative, true); - List topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) - .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); - for (String topic : topics) { - NamespaceBundle topicBundle = pulsar().getNamespaceService() - .getBundle(TopicName.get(topic)); - if (bundle.equals(topicBundle)) { - throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle"); - } - } - - // remove from owned namespace map and ephemeral node from ZK - pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), - bundleRange, e); - throw new RestException(e); - } - } - - @SuppressWarnings("deprecation") - protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE); - validatePoliciesReadOnlyAccess(); + return CompletableFuture.completedFuture(null); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> { + CompletableFuture future = CompletableFuture.completedFuture(null); + if (namespaceName.isGlobal()) { - // ensure that non-global namespace is directed to the correct cluster - if (!namespaceName.isGlobal()) { - validateClusterOwnership(namespaceName.getCluster()); - } + if (policies.replication_clusters.size() > 1) { + // There are still more than one clusters configured for the global namespace + throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + + namespaceName + + ". There are still more than one replication clusters configured."); + } + if (policies.replication_clusters.size() == 1 + && !policies.replication_clusters.contains(config().getClusterName())) { + // the only replication cluster is other cluster, redirect + String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); + future = clusterResources().getClusterAsync(replCluster) + .thenCompose(clusterData -> { + if (clusterData.isEmpty()) { + throw new RestException(Status.NOT_FOUND, + "Cluster " + replCluster + " does not exist"); + } + ClusterData replClusterData = clusterData.get(); + URL replClusterUrl; + try { + if (!config().isTlsEnabled() || !isRequestHttps()) { + replClusterUrl = new URL(replClusterData.getServiceUrl()); + } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { + replClusterUrl = new URL(replClusterData.getServiceUrlTls()); + } else { + throw new RestException(Status.PRECONDITION_FAILED, + "The replication cluster does not provide TLS encrypted " + + "service"); + } + } catch (MalformedURLException malformedURLException) { + throw new RestException(malformedURLException); + } - Policies policies = getNamespacePolicies(namespaceName); - // ensure the local cluster is the only cluster for the global namespace configuration - try { - if (namespaceName.isGlobal()) { - if (policies.replication_clusters.size() > 1) { - // There are still more than one clusters configured for the global namespace - throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " - + namespaceName + ". There are still more than one replication clusters configured."); - } - if (policies.replication_clusters.size() == 1 - && !policies.replication_clusters.contains(config().getClusterName())) { - // the only replication cluster is other cluster, redirect - String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = - clusterResources().getCluster(replCluster) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Cluster " + replCluster + " does not exist")); - URL replClusterUrl; - if (!config().isTlsEnabled() || !isRequestHttps()) { - replClusterUrl = new URL(replClusterData.getServiceUrl()); - } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { - replClusterUrl = new URL(replClusterData.getServiceUrlTls()); - } else { - throw new RestException(Status.PRECONDITION_FAILED, - "The replication cluster does not provide TLS encrypted service"); - } - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) - .port(replClusterUrl.getPort()) - .replaceQueryParam("authoritative", false).build(); - if (log.isDebugEnabled()) { - log.debug("[{}] Redirecting the rest call to {}: cluster={}", - clientAppId(), redirect, replCluster); + URI redirect = + UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) + .port(replClusterUrl.getPort()) + .replaceQueryParam("authoritative", false).build(); + if (log.isDebugEnabled()) { + log.debug("[{}] Redirecting the rest call to {}: cluster={}", + clientAppId(), redirect, replCluster); + } + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + }); + } } - throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); - } - } - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - throw new RestException(e); - } + return future.thenCompose(__ -> { + NamespaceBundle bundle = + validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, + authoritative, true); + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) + .thenCompose(topics -> { + CompletableFuture deleteTopicsFuture = + CompletableFuture.completedFuture(null); + if (!force) { + List> futures = new ArrayList<>(); + for (String topic : topics) { + futures.add(pulsar().getNamespaceService() + .getBundleAsync(TopicName.get(topic)) + .thenCompose(topicBundle -> { + if (bundle.equals(topicBundle)) { + throw new RestException(Status.CONFLICT, + "Cannot delete non empty bundle"); + } + return CompletableFuture.completedFuture(null); + })); - try { - NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, - authoritative, true); - // directly remove from owned namespace map and ephemeral node from ZK - pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), - bundleRange, e); - throw new RestException(e); - } + } + deleteTopicsFuture = FutureUtil.waitForAll(futures); + } + return deleteTopicsFuture.thenCompose( + ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)); + }); + }); + }); } protected CompletableFuture internalGrantPermissionOnNamespaceAsync(String role, Set actions) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index f958dbad37e743..b0b16dcaf98458 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -259,14 +259,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), - @ApiResponse(code = 409, message = "Namespace bundle is not empty") }) - public void deleteNamespaceBundle(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("bundle") String bundleRange, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(property, cluster, namespace); - internalDeleteNamespaceBundle(bundleRange, authoritative, force); + @ApiResponse(code = 409, message = "Namespace bundle is not empty")}) + public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("bundle") String bundleRange, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(property, cluster, namespace); + internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force) + .thenRun(() -> response.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 13fe1c5b0f7c36..a1e8fcbc1e6516 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -206,13 +206,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), - @ApiResponse(code = 409, message = "Namespace bundle is not empty") }) - public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("bundle") String bundleRange, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @ApiResponse(code = 409, message = "Namespace bundle is not empty")}) + public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("bundle") String bundleRange, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalDeleteNamespaceBundle(bundleRange, authoritative, force); + internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force) + .thenRun(() -> response.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex); + } + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 86fd5b586629c8..eecabf3fb369c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1047,10 +1047,9 @@ public CompletableFuture checkTopicOwnership(TopicName topicName) { .thenCompose(ownershipCache::checkOwnershipAsync); } - public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception { - ownershipCache.removeOwnership(nsBundle).get( - pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); - bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()); + public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) { + return ownershipCache.removeOwnership(nsBundle) + .thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject())); } protected void onNamespaceBundleOwned(NamespaceBundle bundle) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 4b7d0d5f5ca64f..2369a0af4bb960 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -835,18 +835,16 @@ public boolean matches(NamespaceBundle bundle) { doReturn(preconditionFailed).when(namespacesAdmin) .deleteNamespaceBundleAsync(Mockito.anyString(), Mockito.anyString()); - try { - namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000", - false, false); - fail("Should have failed"); - } catch (RestException re) { - assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); - } + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor captor = ArgumentCaptor.forClass(RestException.class); + namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, + "0x00000000_0x80000000", false, false); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData); doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class)); - AsyncResponse response = mock(AsyncResponse.class); + response = mock(AsyncResponse.class); namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false); - ArgumentCaptor captor = ArgumentCaptor.forClass(RestException.class); verify(response, timeout(5000).times(1)).resume(captor.capture()); assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); // make one bundle owned @@ -855,13 +853,11 @@ public boolean matches(NamespaceBundle bundle) { doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0)); doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync( testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000"); - try { - namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff", - false, false); - fail("Should have failed"); - } catch (RestException re) { - assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); - } + response = mock(AsyncResponse.class); + namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, + "0x80000000_0xffffffff", false, false); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); response = mock(AsyncResponse.class); doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class)); for (NamespaceBundle bundle : nsBundles.getBundles()) {