diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 3c35f32f36c1d..54122bbcf4500 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -183,6 +183,12 @@ public Optional getIsolationDataPolicies(String clus return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty(); } + public CompletableFuture getIsolationDataPoliciesAsync(String cluster) { + return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)) + .thenApply(data -> data.map(NamespaceIsolationPolicies::new) + .orElseGet(NamespaceIsolationPolicies::new)); + } + public void deleteIsolationData(String cluster) throws MetadataStoreException { delete(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 5a3db2302eebd..669f85e11f1b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -148,20 +148,23 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) { @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist") }) - public Map getOwnedNamespaces(@PathParam("clusterName") String cluster, - @PathParam("broker-webserviceurl") String broker) throws Exception { - validateSuperUserAccess(); - validateClusterOwnership(cluster); - validateBrokerName(broker); - - try { - // now we validated that this is the broker specified in the request - return pulsar().getNamespaceService().getOwnedNameSpacesStatus(); - } catch (Exception e) { - LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", clientAppId(), - cluster, broker); - throw new RestException(e); - } + public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse, + @PathParam("clusterName") String cluster, + @PathParam("broker-webserviceurl") String broker) { + validateSuperUserAccessAsync() + .thenAccept(__ -> validateBrokerName(broker)) + .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) + .thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync()) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", + clientAppId(), cluster, broker); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -220,17 +223,15 @@ public void deleteDynamicConfiguration( @ApiResponse(code = 403, message = "You don't have admin permission to view configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 500, message = "Internal server error")}) - public Map getAllDynamicConfigurations() throws Exception { - validateSuperUserAccess(); - try { - return dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap); - } catch (RestException e) { - LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e); - throw e; - } catch (Exception e) { - LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e); - throw new RestException(e); - } + public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) { + validateSuperUserAccessAsync() + .thenCompose(__ -> dynamicConfigurationResources().getDynamicConfigurationAsync()) + .thenAccept(configOpt -> asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap))) + .exceptionally(ex -> { + LOG.error("[{}] Failed to get all dynamic configuration.", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -238,9 +239,14 @@ public Map getAllDynamicConfigurations() throws Exception { @ApiOperation(value = "Get all updatable dynamic configurations's name") @ApiResponses(value = { @ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) - public List getDynamicConfigurationName() { - validateSuperUserAccess(); - return BrokerService.getDynamicConfiguration(); + public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) { + validateSuperUserAccessAsync() + .thenAccept(__ -> asyncResponse.resume(BrokerService.getDynamicConfiguration())) + .exceptionally(ex -> { + LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e1e1dc106762e..fdfd9bf1880c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -29,8 +29,8 @@ import io.prometheus.client.Counter; import java.net.URI; import java.net.URL; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -732,16 +732,21 @@ public CompletableFuture isNamespaceBundleOwned(NamespaceBundle bundle) return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } - public Map getOwnedNameSpacesStatus() throws Exception { - NamespaceIsolationPolicies nsIsolationPolicies = this.getLocalNamespaceIsolationPolicies(); - Map ownedNsStatus = new HashMap(); - for (OwnedBundle nsObj : this.ownershipCache.getOwnedBundles().values()) { - NamespaceOwnershipStatus nsStatus = this.getNamespaceOwnershipStatus(nsObj, - nsIsolationPolicies.getPolicyByNamespace(nsObj.getNamespaceBundle().getNamespaceObject())); - ownedNsStatus.put(nsObj.getNamespaceBundle().toString(), nsStatus); - } - - return ownedNsStatus; + public CompletableFuture> getOwnedNameSpacesStatusAsync() { + return getLocalNamespaceIsolationPoliciesAsync() + .thenCompose(namespaceIsolationPolicies -> { + Collection> futures = + ownershipCache.getOwnedBundlesAsync().values(); + return FutureUtil.waitForAll(futures) + .thenApply(__ -> futures.stream() + .map(CompletableFuture::join) + .collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(), + bundle -> getNamespaceOwnershipStatus(bundle, + namespaceIsolationPolicies.getPolicyByNamespace( + bundle.getNamespaceBundle().getNamespaceObject())) + )) + ); + }); } private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, @@ -763,14 +768,10 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, return nsOwnedStatus; } - private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception { + private CompletableFuture getLocalNamespaceIsolationPoliciesAsync() { String localCluster = pulsar.getConfiguration().getClusterName(); return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies() - .getIsolationDataPolicies(localCluster) - .orElseGet(() -> { - // the namespace isolation policies are empty/undefined = an empty object - return new NamespaceIsolationPolicies(); - }); + .getIsolationDataPoliciesAsync(localCluster); } public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 3d8bf2654c6a2..67e986b804cba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -250,6 +250,10 @@ public Map getOwnedBundles() { return this.ownedBundlesCache.synchronous().asMap(); } + public Map> getOwnedBundlesAsync() { + return ownedBundlesCache.asMap(); + } + /** * Checked whether a particular bundle is currently owned by this broker. *