diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index ce797a80c7c3a..c24df6c586fb4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies); } + public CompletableFuture createPoliciesAsync(NamespaceName ns, Policies policies) { + return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies); + } + public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException { String path = joinPath(BASE_POLICIES_PATH, ns.toString()); return super.exists(path) && super.getChildren(path).isEmpty(); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java index 3313b61c8a1e1..87f48f7e682a4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.resources; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -78,7 +79,7 @@ public CompletableFuture updateTenantAsync(String tenantName, Function tenantExistsAsync(String tenantName) { - return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName)); + return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName)); } public List getListOfNamespaces(String tenant) throws MetadataStoreException { @@ -110,6 +111,41 @@ public List getListOfNamespaces(String tenant) throws MetadataStoreExcep return namespaces; } + public CompletableFuture> getListOfNamespacesAsync(String tenant) { + // this will return a cluster in v1 and a namespace in v2 + return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant)) + .thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key -> + getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key)) + .thenCompose(children -> { + if (children == null || children.isEmpty()) { + String namespace = NamespaceName.get(tenant, key).toString(); + // if the length is 0 then this is probably a leftover cluster from namespace + // created with the v1 admin format (prop/cluster/ns) and then deleted, so no + // need to add it to the list + return getAsync(joinPath(BASE_POLICIES_PATH, namespace)) + .thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace) + : new ArrayList()) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof MetadataStoreException + .ContentDeserializationException) { + return new ArrayList<>(); + } + throw FutureUtil.wrapToCompletionException(ex); + }); + } else { + CompletableFuture> ret = new CompletableFuture(); + ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns) + .toString()).collect(Collectors.toList())); + return ret; + } + })).reduce(CompletableFuture.completedFuture(new ArrayList<>()), + (accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> { + namespaces.addAll(m); + return namespaces; + })))); + } + public CompletableFuture> getActiveNamespaces(String tenant, String cluster) { return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index ec502e134f2a2..3a396411c63d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -323,6 +323,11 @@ protected CompletableFuture getNamespacePoliciesAsync(NamespaceName na return FutureUtil.failedFuture(new RestException(e)); } policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; + if (policies.get().is_allow_auto_update_schema == null) { + // the type changed from boolean to Boolean. return broker value here for keeping compatibility. + policies.get().is_allow_auto_update_schema = pulsar().getConfig() + .isAllowAutoUpdateSchemaEnabled(); + } return CompletableFuture.completedFuture(policies.get()); }); } else { @@ -534,6 +539,11 @@ protected List getPartitionedTopicList(TopicDomain topicDomain) { } } + protected CompletableFuture> getPartitionedTopicListAsync(TopicDomain topicDomain) { + return namespaceResources().getPartitionedTopicResources() + .listPartitionedTopicsAsync(namespaceName, topicDomain); + } + protected List getTopicPartitionList(TopicDomain topicDomain) { try { return getPulsarResources().getTopicResources().getExistingPartitions(topicName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 49a221946ee35..39a4eb1306083 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -103,7 +103,6 @@ import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; @@ -111,52 +110,47 @@ public abstract class NamespacesBase extends AdminResource { - protected List internalGetTenantNamespaces(String tenant) { - checkNotNull(tenant, "Tenant should not be null"); + protected CompletableFuture> internalGetTenantNamespaces(String tenant) { + if (tenant == null) { + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null")); + } try { NamedEntity.checkName(tenant); } catch (IllegalArgumentException e) { log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e); - throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"); - } - validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES); - - try { - if (!tenantResources().tenantExists(tenant)) { - throw new RestException(Status.NOT_FOUND, "Tenant not found"); - } - - return tenantResources().getListOfNamespaces(tenant); - } catch (Exception e) { - log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e); - throw new RestException(e); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); } + return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES) + .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant)) + .thenCompose(existed -> { + if (!existed) { + throw new RestException(Status.NOT_FOUND, "Tenant not found"); + } + return tenantResources().getListOfNamespacesAsync(tenant); + }); } - protected void internalCreateNamespace(Policies policies) { - validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE); - validatePoliciesReadOnlyAccess(); - validatePolicies(namespaceName, policies); - - try { - int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant(); - // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded. - if (maxNamespacesPerTenant > 0) { - List namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant()); - if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) { - throw new RestException(Status.PRECONDITION_FAILED, - "Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant()); - } - } - namespaceResources().createPolicies(namespaceName, policies); - log.info("[{}] Created namespace {}", clientAppId(), namespaceName); - } catch (AlreadyExistsException e) { - log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName); - throw new RestException(Status.CONFLICT, "Namespace already exists"); - } catch (Exception e) { - log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e); - throw new RestException(e); - } + protected CompletableFuture internalCreateNamespace(Policies policies) { + return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> validatePolicies(namespaceName, policies)) + .thenCompose(__ -> { + int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant(); + // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded. + if (maxNamespacesPerTenant > 0) { + return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant()) + .thenAccept(namespaces -> { + if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) { + throw new RestException(Status.PRECONDITION_FAILED, + "Exceed the maximum number of namespace in tenant :" + + namespaceName.getTenant()); + } + }); + } + return CompletableFuture.completedFuture(null); + }) + .thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies)) + .thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName)); } protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index c21c8b8910f5c..0ccc505d5e84b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -42,6 +43,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; @@ -67,6 +69,8 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,8 +88,15 @@ public class Namespaces extends NamespacesBase { response = String.class, responseContainer = "Set") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property doesn't exist")}) - public List getTenantNamespaces(@PathParam("property") String property) { - return internalGetTenantNamespaces(property); + public void getTenantNamespaces(@Suspended AsyncResponse response, + @PathParam("property") String property) { + internalGetTenantNamespaces(property) + .thenAccept(response::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET @@ -125,21 +136,20 @@ public List getNamespacesForCluster(@PathParam("property") String tenant @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) - public void getTopics(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, - @Suspended AsyncResponse asyncResponse) { + public void getTopics(@Suspended AsyncResponse response, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) - .thenAccept(asyncResponse::resume) + validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)) + .thenAccept(response::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); - asyncResponse.resume(ex); + resumeAsyncResponseExceptionally(response, ex); return null; }); } @@ -150,11 +160,20 @@ public void getTopics(@PathParam("property") String property, response = Policies.class) @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) - public Policies getPolicies(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getPolicies(@Suspended AsyncResponse response, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.ALL, PolicyOperation.READ); - return getNamespacePolicies(namespaceName); + validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace), PolicyName.ALL, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(response::resume) + .exceptionally(ex -> { + log.error("Failed to get policies for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @SuppressWarnings("deprecation") @@ -165,29 +184,46 @@ public Policies getPolicies(@PathParam("property") String property, @PathParam(" @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(code = 412, message = "Namespace name is not valid") }) - public void createNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, BundlesData initialBundles) { + public void createNamespace(@Suspended AsyncResponse response, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + BundlesData initialBundles) { validateNamespaceName(property, cluster, namespace); + CompletableFuture ret; if (!namespaceName.isGlobal()) { // If the namespace is non global, make sure property has the access on the cluster. For global namespace, // same check is made at the time of setting replication. - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); + ret = validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()); + } else { + ret = CompletableFuture.completedFuture(null); } - - Policies policies = new Policies(); - if (initialBundles != null && initialBundles.getNumBundles() > 0) { - if (initialBundles.getBoundaries() == null || initialBundles.getBoundaries().size() == 0) { - policies.bundles = getBundles(initialBundles.getNumBundles()); + ret.thenApply(__ -> { + Policies policies = new Policies(); + if (initialBundles != null && initialBundles.getNumBundles() > 0) { + if (initialBundles.getBoundaries() == null || initialBundles.getBoundaries().size() == 0) { + policies.bundles = getBundles(initialBundles.getNumBundles()); + } else { + policies.bundles = validateBundlesData(initialBundles); + } } else { - policies.bundles = validateBundlesData(initialBundles); + int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); + policies.bundles = getBundles(defaultNumberOfBundles); } - } else { - int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); - policies.bundles = getBundles(defaultNumberOfBundles); - } - - internalCreateNamespace(policies); + return policies; + }).thenCompose(this::internalCreateNamespace) + .thenAccept(__ -> response.resume(Response.noContent().build())) + .exceptionally(ex -> { + Throwable root = FutureUtil.unwrapCompletionException(ex); + if (root instanceof MetadataStoreException.AlreadyExistsException) { + response.resume(new RestException(Status.CONFLICT, "Namespace already exists")); + } else { + log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + } + return null; + }); } @DELETE @@ -200,9 +236,9 @@ public void createNamespace(@PathParam("property") String property, @PathParam(" @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(property, cluster, namespace); internalDeleteNamespace(asyncResponse, authoritative, force); @@ -236,13 +272,19 @@ public void deleteNamespaceBundle(@PathParam("property") String property, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public Map> getPermissions(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + public void getPermissions(@Suspended AsyncResponse response, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.auth_policies.getNamespaceAuthentication(); + validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> response.resume(policies.auth_policies.getNamespaceAuthentication())) + .exceptionally(ex -> { + log.error("Failed to get permissions for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index e3f82b72d2ad8..2769b5b3bc343 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -75,7 +75,9 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,8 +93,15 @@ public class Namespaces extends NamespacesBase { response = String.class, responseContainer = "Set") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant doesn't exist")}) - public List getTenantNamespaces(@PathParam("tenant") String tenant) { - return internalGetTenantNamespaces(tenant); + public void getTenantNamespaces(@Suspended final AsyncResponse response, + @PathParam("tenant") String tenant) { + internalGetTenantNamespaces(tenant) + .thenAccept(response::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET @@ -102,21 +111,19 @@ public List getTenantNamespaces(@PathParam("tenant") String tenant) { @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) - public void getTopics(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, - @Suspended AsyncResponse asyncResponse) { - validateNamespaceName(tenant, namespace); - validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) - .thenAccept(asyncResponse::resume) + public void getTopics(@Suspended AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { + validateNamespaceName(tenant, namespace); + validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)) + .thenAccept(response::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); - asyncResponse.resume(ex); + resumeAsyncResponseExceptionally(response, ex); return null; }); } @@ -126,10 +133,19 @@ public void getTopics(@PathParam("tenant") String tenant, @ApiOperation(value = "Get the dump all the policies specified for a namespace.", response = Policies.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { - validateNamespaceName(tenant, namespace); - validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.ALL, PolicyOperation.READ); - return getNamespacePolicies(namespaceName); + public void getPolicies(@Suspended AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.ALL, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(response::resume) + .exceptionally(ex -> { + log.error("Failed to get policies for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @PUT @@ -139,11 +155,24 @@ public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("name @ApiResponse(code = 404, message = "Tenant or cluster doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(code = 412, message = "Namespace name is not valid") }) - public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @ApiParam(value = "Policies for the namespace") Policies policies) { + public void createNamespace(@Suspended AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "Policies for the namespace") Policies policies) { validateNamespaceName(tenant, namespace); policies = getDefaultPolicesIfNull(policies); - internalCreateNamespace(policies); + internalCreateNamespace(policies) + .thenAccept(__ -> response.resume(Response.noContent().build())) + .exceptionally(ex -> { + Throwable root = FutureUtil.unwrapCompletionException(ex); + if (root instanceof MetadataStoreException.AlreadyExistsException) { + response.resume(new RestException(Response.Status.CONFLICT, "Namespace already exists")); + } else { + log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + } + return null; + }); } @DELETE @@ -156,9 +185,9 @@ public void createNamespace(@PathParam("tenant") String tenant, @PathParam("name @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @PathParam("namespace") String namespace, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(tenant, namespace); internalDeleteNamespace(asyncResponse, authoritative, force); @@ -191,13 +220,18 @@ public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public Map> getPermissions(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void getPermissions(@Suspended AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.auth_policies.getNamespaceAuthentication(); + validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> response.resume(policies.auth_policies.getNamespaceAuthentication())) + .exceptionally(ex -> { + log.error("Failed to get permissions for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(response, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 1690210a80a45..38c61d3688d77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -415,6 +415,21 @@ protected void validateClusterForTenant(String tenant, String cluster) { log.info("Successfully validated clusters on tenant [{}]", tenant); } + protected CompletableFuture validateClusterForTenantAsync(String tenant, String cluster) { + return pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant) + .thenAccept(tenantInfo -> { + if (!tenantInfo.isPresent()) { + throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); + } + if (!tenantInfo.get().getAllowedClusters().contains(cluster)) { + String msg = String.format("Cluster [%s] is not in the list of allowed clusters list" + + " for tenant [%s]", cluster, tenant); + log.info(msg); + throw new RestException(Status.FORBIDDEN, msg); + } + }); + } + protected CompletableFuture validateClusterOwnershipAsync(String cluster) { return getClusterDataIfDifferentCluster(pulsar(), cluster, clientAppId()) .thenAccept(differentClusterData -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index bfc292500284a..583a87275bda4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -38,17 +38,12 @@ import java.net.URI; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; @@ -196,7 +191,7 @@ public void internalConfiguration() throws Exception { new ClientConfiguration().getZkLedgersRootPath(), conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null, pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null)); - Object response = asynRequests(ctx -> brokers.getInternalConfigurationData(ctx)); + Object response = asyncRequests(ctx -> brokers.getInternalConfigurationData(ctx)); assertTrue(response instanceof InternalConfigurationData); assertEquals(response, expectedData); } @@ -204,18 +199,18 @@ public void internalConfiguration() throws Exception { @Test @SuppressWarnings("unchecked") public void clusters() throws Exception { - assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); + assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); verify(clusters, never()).validateSuperUserAccessAsync(); - asynRequests(ctx -> clusters.createCluster(ctx, + asyncRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); // ensure to read from ZooKeeper directly //clusters.clustersListCache().clear(); - assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet("use")); + assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet("use")); // Check creating existing cluster try { - asynRequests(ctx -> clusters.createCluster(ctx, "use", + asyncRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { @@ -224,23 +219,23 @@ public void clusters() throws Exception { // Check deleting non-existing cluster try { - asynRequests(ctx -> clusters.deleteCluster(ctx, "usc")); + asyncRequests(ctx -> clusters.deleteCluster(ctx, "usc")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } - assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), + assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")), ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); - asynRequests(ctx -> clusters.updateCluster(ctx, "use", + asyncRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build())); - assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), + assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")), ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()); try { - asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); @@ -260,38 +255,38 @@ public void clusters() throws Exception { .build(); AsyncResponse response = mock(AsyncResponse.class); clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData); - asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { - asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 412); } clusters.deleteNamespaceIsolationPolicy("use", "policy1"); - assertTrue(((Map) asynRequests(ctx -> + assertTrue(((Map) asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty()); - asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); - assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); + asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); + assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); try { - asynRequests(ctx -> clusters.getCluster(ctx, "use")); + asyncRequests(ctx -> clusters.getCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); } try { - asynRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().build())); + asyncRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); } try { - asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); @@ -311,7 +306,7 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - asynRequests(ctx -> clusters.getClusters(ctx)); + asyncRequests(ctx -> clusters.getClusters(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -322,7 +317,7 @@ public void clusters() throws Exception { && path.equals("/admin/clusters/test"); }); try { - asynRequests(ctx -> clusters.createCluster(ctx, "test", + asyncRequests(ctx -> clusters.createCluster(ctx, "test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { @@ -336,7 +331,7 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - asynRequests(ctx -> clusters.updateCluster(ctx, "test", + asyncRequests(ctx -> clusters.updateCluster(ctx, "test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build())); fail("should have failed"); } catch (RestException e) { @@ -349,7 +344,7 @@ public void clusters() throws Exception { }); try { - asynRequests(ctx -> clusters.getCluster(ctx, "test")); + asyncRequests(ctx -> clusters.getCluster(ctx, "test")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -361,7 +356,7 @@ public void clusters() throws Exception { }); try { - asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -375,7 +370,7 @@ public void clusters() throws Exception { isolationPolicyCache.invalidateAll(); store.invalidateAll(); try { - asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -383,7 +378,7 @@ public void clusters() throws Exception { // Check name validations try { - asynRequests(ctx -> clusters.createCluster(ctx, "bf@", + asyncRequests(ctx -> clusters.createCluster(ctx, "bf@", ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build())); fail("should have filed"); } catch (RestException e) { @@ -392,7 +387,7 @@ public void clusters() throws Exception { // Check authentication and listener name try { - asynRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder() + asyncRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder() .serviceUrl("http://dummy.web.example.com") .serviceUrlTls("") .brokerServiceUrl("http://dummy.messaging.example.com") @@ -401,7 +396,7 @@ public void clusters() throws Exception { .authenticationParameters("authenticationParameters") .listenerName("listenerName") .build())); - ClusterData cluster = (ClusterData) asynRequests(ctx -> clusters.getCluster(ctx, "auth")); + ClusterData cluster = (ClusterData) asyncRequests(ctx -> clusters.getCluster(ctx, "auth")); assertEquals(cluster.getAuthenticationPlugin(), "authenticationPlugin"); assertEquals(cluster.getAuthenticationParameters(), "authenticationParameters"); assertEquals(cluster.getListenerName(), "listenerName"); @@ -412,23 +407,14 @@ public void clusters() throws Exception { verify(clusters, times(2)).validateSuperUserAccess(); } - Object asynRequests(Consumer function) throws Exception { - TestAsyncResponse ctx = new TestAsyncResponse(); - function.accept(ctx); - ctx.latch.await(); - if (ctx.e != null) { - throw (Exception) ctx.e; - } - return ctx.response; - } @Test public void properties() throws Throwable { - Object response = asynRequests(ctx -> properties.getTenants(ctx)); + Object response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); verify(properties, times(1)).validateSuperUserAccess(); // create local cluster - asynRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); + asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); Set allowedClusters = Sets.newHashSet(); allowedClusters.add(configClusterName); @@ -436,14 +422,14 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "role2")) .allowedClusters(allowedClusters) .build(); - response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); verify(properties, times(2)).validateSuperUserAccess(); - response = asynRequests(ctx -> properties.getTenants(ctx)); + response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList("test-property")); verify(properties, times(3)).validateSuperUserAccess(); - response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, tenantInfo); verify(properties, times(4)).validateSuperUserAccess(); @@ -451,21 +437,21 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(allowedClusters) .build(); - response = asynRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); + response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); verify(properties, times(5)).validateSuperUserAccess(); // Wait for updateTenant to take effect Thread.sleep(100); - response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, newPropertyAdmin); - response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertNotSame(response, tenantInfo); verify(properties, times(7)).validateSuperUserAccess(); // Check creating existing property try { - response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); @@ -473,14 +459,14 @@ public void properties() throws Throwable { // Check non-existing property try { - response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing")); + response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } try { - response = asynRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin)); + response = asyncRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -488,7 +474,7 @@ public void properties() throws Throwable { // Check deleting non-existing property try { - response = asynRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -505,7 +491,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies"); }); try { - response = asynRequests(ctx -> properties.getTenants(ctx)); + response = asyncRequests(ctx -> properties.getTenants(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -515,7 +501,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); }); try { - response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant")); + response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -525,7 +511,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); }); try { - response = asynRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin)); + response = asyncRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -535,7 +521,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.CREATE && path.equals("/admin/policies/test"); }); try { - response = asynRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -547,28 +533,28 @@ public void properties() throws Throwable { try { cache.invalidateAll(); store.invalidateAll(); - response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - response = asynRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo)); mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { return op == MockZooKeeper.Op.DELETE && path.equals("/admin/policies/error-property"); }); try { - response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); - response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); response = Lists.newArrayList(); - response = asynRequests(ctx -> properties.getTenants(ctx)); + response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); // Create a namespace to test deleting a non-empty property @@ -576,12 +562,12 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(Sets.newHashSet("use")) .build(); - response = asynRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2)); - namespaces.createNamespace("my-tenant", "use", "my-namespace", BundlesData.builder().build()); + response = asyncRequests(ctx -> namespaces.createNamespace(ctx,"my-tenant", "use", "my-namespace", BundlesData.builder().build())); try { - response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); fail("should have failed"); } catch (RestException e) { // Ok @@ -589,7 +575,7 @@ public void properties() throws Throwable { // Check name validation try { - response = asynRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -597,7 +583,7 @@ public void properties() throws Throwable { // Check tenantInfo is null try { - response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -611,7 +597,7 @@ public void properties() throws Throwable { .allowedClusters(blankClusters) .build(); try { - response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -625,7 +611,7 @@ public void properties() throws Throwable { .allowedClusters(containBlankClusters) .build(); try { - response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster)); + response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -636,13 +622,13 @@ public void properties() throws Throwable { ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(response2, timeout(5000).times(1)).resume(captor.capture()); assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode()); - response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); + response = asyncRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); } @Test @SuppressWarnings("unchecked") public void brokers() throws Exception { - asynRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder() + asyncRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder() .serviceUrl("http://broker.messaging.use.example.com") .serviceUrlTls("https://broker.messaging.use.example.com:4443") .build())); @@ -654,12 +640,12 @@ public void brokers() throws Exception { Field uriField = PulsarWebResource.class.getDeclaredField("uri"); uriField.setAccessible(true); uriField.set(brokers, mockUri); - Object res = asynRequests(ctx -> brokers.getActiveBrokers(ctx, "use")); + Object res = asyncRequests(ctx -> brokers.getActiveBrokers(ctx, "use")); assertTrue(res instanceof Set); Set activeBrokers = (Set) res; assertEquals(activeBrokers.size(), 1); assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get())); - Object leaderBrokerRes = asynRequests(ctx -> brokers.getLeaderBroker(ctx)); + Object leaderBrokerRes = asyncRequests(ctx -> brokers.getLeaderBroker(ctx)); assertTrue(leaderBrokerRes instanceof BrokerInfo); BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes; assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get()); @@ -707,8 +693,8 @@ public void resourceQuotas() throws Exception { .allowedClusters(Collections.singleton(cluster)) .build(); ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build(); - asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData )); - asynRequests(ctx -> properties.createTenant(ctx, property, admin)); + asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData )); + asyncRequests(ctx -> properties.createTenant(ctx, property, admin)); // customized bandwidth for this namespace double customizeBandwidth = 3000; @@ -876,87 +862,4 @@ public void test500Error() throws Exception { Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); Assert.assertTrue(((ErrorData)responseCaptor.getValue().getResponse().getEntity()).reason.contains("500 error contains error message")); } - - - static class TestAsyncResponse implements AsyncResponse { - - Object response; - Throwable e; - CountDownLatch latch = new CountDownLatch(1); - - @Override - public boolean resume(Object response) { - this.response = response; - latch.countDown(); - return true; - } - - @Override - public boolean resume(Throwable response) { - this.e = response; - latch.countDown(); - return true; - } - - @Override - public boolean cancel() { - return false; - } - - @Override - public boolean cancel(int retryAfter) { - return false; - } - - @Override - public boolean cancel(Date retryAfter) { - return false; - } - - @Override - public boolean isSuspended() { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return false; - } - - @Override - public boolean setTimeout(long time, TimeUnit unit) { - return false; - } - - @Override - public void setTimeoutHandler(TimeoutHandler handler) { - - } - - @Override - public Collection> register(Class callback) { - return null; - } - - @Override - public Map, Collection>> register(Class callback, Class... callbacks) { - return null; - } - - @Override - public Collection> register(Object callback) { - return null; - } - - @Override - public Map, Collection>> register(Object callback, Object... callbacks) { - return null; - } - - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 0eb87491a524c..a02f7d6c93483 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -203,8 +203,8 @@ public void cleanup() throws Exception { @Test public void testCreateNamespaces() throws Exception { try { - namespaces.createNamespace(this.testTenant, "other-colo", "my-namespace", - BundlesData.builder().build()); + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "other-colo", "my-namespace", + BundlesData.builder().build())); fail("should have failed"); } catch (RestException e) { // Ok, cluster doesn't exist @@ -217,24 +217,24 @@ public void testCreateNamespaces() throws Exception { createTestNamespaces(nsnames, BundlesData.builder().build()); try { - namespaces.createNamespace(this.testTenant, "use", "create-namespace-1", - BundlesData.builder().build()); + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-1", + BundlesData.builder().build())); fail("should have failed"); } catch (RestException e) { // Ok, namespace already exists } try { - namespaces.createNamespace("non-existing-tenant", "use", "create-namespace-1", - BundlesData.builder().build()); + asyncRequests(response -> namespaces.createNamespace(response,"non-existing-tenant", "use", "create-namespace-1", + BundlesData.builder().build())); fail("should have failed"); } catch (RestException e) { // Ok, tenant doesn't exist } try { - namespaces.createNamespace(this.testTenant, "use", "create-namespace-#", - BundlesData.builder().build()); + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-#", + BundlesData.builder().build())); fail("should have failed"); } catch (RestException e) { // Ok, invalid namespace name @@ -246,7 +246,7 @@ public void testCreateNamespaces() throws Exception { && path.equals("/admin/policies/my-tenant/use/my-namespace-3"); }); try { - namespaces.createNamespace(this.testTenant, "use", "my-namespace-3", BundlesData.builder().build()); + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "my-namespace-3", BundlesData.builder().build())); fail("should have failed"); } catch (RestException e) { // Ok @@ -263,18 +263,24 @@ public void testGetNamespaces() throws Exception { this.testLocalNamespaces.get(1).toString(), this.testLocalNamespaces.get(2).toString(), this.testGlobalNamespaces.get(0).toString()); expectedList.sort(null); - assertEquals(namespaces.getTenantNamespaces(this.testTenant), expectedList); + AsyncResponse response = mock(AsyncResponse.class); + namespaces.getTenantNamespaces(response, this.testTenant); + ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + List namespacesList = (List) captor.getValue(); + namespacesList.sort(null); + assertEquals(namespacesList, expectedList); try { // check the tenant name is valid - namespaces.getTenantNamespaces(this.testTenant + "/default"); + asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant + "/default")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } try { - namespaces.getTenantNamespaces("non-existing-tenant"); + asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, "non-existing-tenant")); fail("should have failed"); } catch (RestException e) { // Ok, does not exist @@ -299,7 +305,7 @@ public void testGetNamespaces() throws Exception { tenantCache.invalidateAll(); store.invalidateAll(); try { - namespaces.getTenantNamespaces(this.testTenant); + asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant)); fail("should have failed"); } catch (RestException e) { // Ok @@ -321,46 +327,46 @@ public void testGetNamespaces() throws Exception { @Test(enabled = false) public void testGrantAndRevokePermissions() throws Exception { Policies expectedPolicies = new Policies(); - assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); - assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); + assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.grantPermissionOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "my-role", EnumSet.of(AuthAction.produce)); expectedPolicies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.of(AuthAction.produce)); - assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); - assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); + assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.grantPermissionOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "other-role", EnumSet.of(AuthAction.consume)); expectedPolicies.auth_policies.getNamespaceAuthentication().put("other-role", EnumSet.of(AuthAction.consume)); - assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); - assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); + assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.revokePermissionsOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "my-role"); expectedPolicies.auth_policies.getNamespaceAuthentication().remove("my-role"); - assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); - assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); + assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); // Non-existing namespaces try { - namespaces.getPolicies(this.testTenant, this.testLocalCluster, "non-existing-namespace-1"); + asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, "non-existing-namespace-1")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } try { - namespaces.getPermissions(this.testTenant, this.testLocalCluster, "non-existing-namespace-1"); + asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, "non-existing-namespace-1")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -393,7 +399,7 @@ public void testGrantAndRevokePermissions() throws Exception { }); try { - namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); + asyncRequests(ctx -> namespaces.getPolicies(ctx, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName())); fail("should have failed"); } catch (RestException e) { // Ok @@ -407,7 +413,7 @@ public void testGrantAndRevokePermissions() throws Exception { return true; }); try { - namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); + asyncRequests(ctx -> namespaces.getPermissions(ctx, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName())); fail("should have failed"); } catch (RestException e) { // Ok @@ -758,7 +764,7 @@ public void testDeleteNamespaces() throws Exception { List nsList = Lists.newArrayList(this.testLocalNamespaces.get(1).toString(), this.testLocalNamespaces.get(2).toString()); nsList.sort(null); - assertEquals(namespaces.getTenantNamespaces(this.testTenant), nsList); + assertEquals(asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant)), nsList); testNs = this.testLocalNamespaces.get(1); // setup ownership to localhost @@ -978,16 +984,16 @@ public void testUnloadNamespaceWithBundles() throws Exception { private void createBundledTestNamespaces(String property, String cluster, String namespace, BundlesData bundle) throws Exception { - namespaces.createNamespace(property, cluster, namespace, bundle); + asyncRequests(ctx -> namespaces.createNamespace(ctx, property, cluster, namespace, bundle)); } private void createGlobalTestNamespaces(String property, String namespace, BundlesData bundle) throws Exception { - namespaces.createNamespace(property, "global", namespace, bundle); + asyncRequests(ctx -> namespaces.createNamespace(ctx, property, "global", namespace, bundle)); } private void createTestNamespaces(List nsnames, BundlesData bundle) throws Exception { for (NamespaceName nsName : nsnames) { - namespaces.createNamespace(nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle); + asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 55631aea46579..88fe6ece0bbc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -33,11 +33,15 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; @@ -70,6 +74,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; + /** * Base class for all tests that need a Pulsar instance without a ZK and BK cluster. */ @@ -499,5 +506,97 @@ protected void setupDefaultTenantAndNamespace() throws Exception { } } + protected Object asyncRequests(Consumer function) throws Exception { + TestAsyncResponse ctx = new TestAsyncResponse(); + function.accept(ctx); + ctx.latch.await(); + if (ctx.e != null) { + throw (Exception) ctx.e; + } + return ctx.response; + } + + public static class TestAsyncResponse implements AsyncResponse { + + Object response; + Throwable e; + CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean resume(Object response) { + this.response = response; + latch.countDown(); + return true; + } + + @Override + public boolean resume(Throwable response) { + this.e = response; + latch.countDown(); + return true; + } + + @Override + public boolean cancel() { + return false; + } + + @Override + public boolean cancel(int retryAfter) { + return false; + } + + @Override + public boolean cancel(Date retryAfter) { + return false; + } + + @Override + public boolean isSuspended() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public boolean setTimeout(long time, TimeUnit unit) { + return false; + } + + @Override + public void setTimeoutHandler(TimeoutHandler handler) { + + } + + @Override + public Collection> register(Class callback) { + return null; + } + + @Override + public Map, Collection>> register(Class callback, Class... callbacks) { + return null; + } + + @Override + public Collection> register(Object callback) { + return null; + } + + @Override + public Map, Collection>> register(Object callback, Object... callbacks) { + return null; + } + + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); }