From 99e6f2d88545a21b6d1830c127894098c37c14ca Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 18 May 2022 12:03:49 +0300 Subject: [PATCH 1/4] Revert "[improve][broker] Make some methods in TenantsBase async. (#15603)" This reverts commit 5fedf63a436792636e4f58b444b2e0dc3dab8746. --- .../pulsar/broker/admin/AdminResource.java | 15 + .../pulsar/broker/admin/impl/TenantsBase.java | 345 +++++++++++------- .../pulsar/broker/web/PulsarWebResource.java | 27 -- .../apache/pulsar/broker/admin/AdminTest.java | 12 +- 4 files changed, 230 insertions(+), 169 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 43addc30af79a..211f128cc0a4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,8 +40,10 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; @@ -739,6 +741,19 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn return future; } + protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { + Throwable realCause = FutureUtil.unwrapCompletionException(exception); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else if (realCause instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); + } else if (realCause instanceof PulsarAdminException) { + asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); + } else { + asyncResponse.resume(new RestException(realCause)); + } + } + protected CompletableFuture getSchemaCompatibilityStrategyAsync() { return validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 045cafbfd5389..4f1bad2973618 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.admin.impl; -import static org.apache.pulsar.common.naming.Constants.GLOBAL_CLUSTER; import com.google.common.collect.Lists; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -46,7 +45,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -64,18 +64,23 @@ public class TenantsBase extends PulsarWebResource { @ApiResponse(code = 404, message = "Tenant doesn't exist")}) public void getTenants(@Suspended final AsyncResponse asyncResponse) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() - .thenCompose(__ -> tenantResources().listTenantsAsync()) - .thenAccept(tenants -> { - // deep copy the tenants to avoid concurrent sort exception - List deepCopy = new ArrayList<>(tenants); - deepCopy.sort(null); - asyncResponse.resume(deepCopy); - }).exceptionally(ex -> { - log.error("[{}] Failed to get tenants list", clientAppId, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + try { + validateSuperUserAccess(); + } catch (Exception e) { + asyncResponse.resume(e); + return; + } + tenantResources().listTenantsAsync().whenComplete((tenants, e) -> { + if (e != null) { + log.error("[{}] Failed to get tenants list", clientAppId, e); + asyncResponse.resume(new RestException(e)); + return; + } + // deep copy the tenants to avoid concurrent sort exception + List deepCopy = new ArrayList<>(tenants); + deepCopy.sort(null); + asyncResponse.resume(deepCopy); + }); } @GET @@ -86,20 +91,22 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) { public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() - .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) - .thenApply(tenantInfo -> { - if (!tenantInfo.isPresent()) { - throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); - } - return tenantInfo.get(); - }) - .thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - log.error("[{}] Failed to get tenant admin {}", clientAppId, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + try { + validateSuperUserAccess(); + } catch (Exception e) { + asyncResponse.resume(e); + } + + tenantResources().getTenantAsync(tenant).whenComplete((tenantInfo, e) -> { + if (e != null) { + log.error("[{}] Failed to get Tenant {}", clientAppId, e.getMessage()); + asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant")); + return; + } + boolean response = tenantInfo.isPresent() ? asyncResponse.resume(tenantInfo.get()) + : asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant does not exist")); + return; + }); } @PUT @@ -113,44 +120,58 @@ public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, public void createTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) { + final String clientAppId = clientAppId(); try { + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + validateClusters(tenantInfo); NamedEntity.checkName(tenant); } catch (IllegalArgumentException e) { - log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId, tenant, e); + log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e); asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); return; + } catch (Exception e) { + asyncResponse.resume(e); + return; } - validateSuperUserAccessAsync() - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> validateClustersAsync(tenantInfo)) - .thenCompose(__ -> tenantResources().listTenantsAsync()) - .thenAccept(tenants -> { - int maxTenants = pulsar().getConfiguration().getMaxTenants(); - // Due to the cost of distributed locks, no locks are added here. - // In a concurrent scenario, the threshold will be exceeded. - if (maxTenants > 0) { - if (tenants != null && tenants.size() >= maxTenants) { - throw new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants"); - } - } - }) - .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant)) - .thenAccept(exist -> { - if (exist) { - throw new RestException(Status.CONFLICT, "Tenant already exist"); - } - }) - .thenCompose(__ -> tenantResources().createTenantAsync(tenant, tenantInfo)) - .thenAccept(__ -> { - log.info("[{}] Created tenant {}", clientAppId, tenant); + + tenantResources().listTenantsAsync().whenComplete((tenants, e) -> { + if (e != null) { + log.error("[{}] Failed to create tenant ", clientAppId, e.getCause()); + asyncResponse.resume(new RestException(e)); + return; + } + + int maxTenants = pulsar().getConfiguration().getMaxTenants(); + // Due to the cost of distributed locks, no locks are added here. + // In a concurrent scenario, the threshold will be exceeded. + if (maxTenants > 0) { + if (tenants != null && tenants.size() >= maxTenants) { + asyncResponse.resume( + new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants")); + return; + } + } + tenantResources().tenantExistsAsync(tenant).thenAccept(exist -> { + if (exist) { + asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist")); + return; + } + tenantResources().createTenantAsync(tenant, tenantInfo).thenAccept((r) -> { + log.info("[{}] Created tenant {}", clientAppId(), tenant); asyncResponse.resume(Response.noContent().build()); - }) - .exceptionally(ex -> { + }).exceptionally(ex -> { log.error("[{}] Failed to create tenant {}", clientAppId, tenant, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); + asyncResponse.resume(new RestException(ex)); return null; }); + }).exceptionally(ex -> { + log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, ex); + asyncResponse.resume(new RestException(ex)); + return null; + }); + }); } @POST @@ -165,28 +186,42 @@ public void createTenant(@Suspended final AsyncResponse asyncResponse, public void updateTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) { - final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> validateClustersAsync(newTenantAdmin)) - .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) - .thenCompose(tenantAdmin -> { - if (!tenantAdmin.isPresent()) { - throw new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found"); - } - TenantInfo oldTenantAdmin = tenantAdmin.get(); - Set newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters()); - return canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters); - }) - .thenCompose(__ -> tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin)) - .thenAccept(__ -> { - log.info("[{}] Successfully updated tenant info {}", clientAppId, tenant); + try { + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + validateClusters(newTenantAdmin); + } catch (Exception e) { + asyncResponse.resume(e); + return; + } + + final String clientAddId = clientAppId(); + tenantResources().getTenantAsync(tenant).thenAccept(tenantAdmin -> { + if (!tenantAdmin.isPresent()) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found")); + return; + } + TenantInfo oldTenantAdmin = tenantAdmin.get(); + Set newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters()); + canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters).thenApply(r -> { + tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin).thenAccept(done -> { + log.info("Successfully updated tenant info {}", tenant); asyncResponse.resume(Response.noContent().build()); }).exceptionally(ex -> { - log.warn("[{}] Failed to update tenant {}", clientAppId, tenant, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); + log.warn("Failed to update tenant {}", tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); return null; }); + return null; + }).exceptionally(nsEx -> { + asyncResponse.resume(nsEx.getCause()); + return null; + }); + }).exceptionally(ex -> { + log.error("[{}] Failed to get tenant {}", clientAddId, tenant, ex.getCause()); + asyncResponse.resume(new RestException(ex)); + return null; + }); } @DELETE @@ -199,89 +234,127 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse, public void deleteTenant(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant, @QueryParam("force") @DefaultValue("false") boolean force) { - final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> internalDeleteTenant(tenant, force)) - .thenAccept(__ -> { - log.info("[{}] Deleted tenant {}", clientAppId, tenant); - asyncResponse.resume(Response.noContent().build()); - }) - .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("[{}] Failed to delete tenant {}", clientAppId, tenant, cause); - if (cause instanceof IllegalStateException) { - asyncResponse.resume(new RestException(Status.CONFLICT, cause)); - } else { - resumeAsyncResponseExceptionally(asyncResponse, cause); - } - return null; - }); + try { + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + } catch (Exception e) { + asyncResponse.resume(e); + return; + } + internalDeleteTenant(asyncResponse, tenant, force); } - protected CompletableFuture internalDeleteTenant(String tenant, boolean force) { - return force ? internalDeleteTenantAsyncForcefully(tenant) : internalDeleteTenantAsync(tenant); + protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant, boolean force) { + if (force) { + internalDeleteTenantForcefully(asyncResponse, tenant); + } else { + internalDeleteTenant(asyncResponse, tenant); + } } - protected CompletableFuture internalDeleteTenantAsync(String tenant) { - return tenantResources().tenantExistsAsync(tenant) - .thenAccept(exists -> { - if (!exists) { - throw new RestException(Status.NOT_FOUND, "Tenant doesn't exist"); - } - }) - .thenCompose(__ -> hasActiveNamespace(tenant)) - .thenCompose(__ -> tenantResources().deleteTenantAsync(tenant)) - .thenCompose(__ -> pulsar().getPulsarResources().getTopicResources().clearTenantPersistence(tenant)) - .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources().deleteTenantAsync(tenant)) - .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources() + protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant) { + tenantResources().tenantExistsAsync(tenant).thenApply(exists -> { + if (!exists) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist")); + return null; + } + + return hasActiveNamespace(tenant) + .thenCompose(ignore -> tenantResources().deleteTenantAsync(tenant)) + .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources() + .clearTenantPersistence(tenant)) + .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() + .deleteTenantAsync(tenant)) + .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant)) - .thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies() + .thenCompose(ignore -> pulsar().getPulsarResources().getLocalPolicies() .deleteLocalPoliciesTenantAsync(tenant)) - .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources() - .deleteBundleDataTenantAsync(tenant)); + .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() + .deleteBundleDataTenantAsync(tenant)) + .whenComplete((ignore, ex) -> { + if (ex != null) { + log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex); + if (ex.getCause() instanceof IllegalStateException) { + asyncResponse.resume(new RestException(Status.CONFLICT, ex.getCause())); + } else { + asyncResponse.resume(new RestException(ex)); + } + } else { + log.info("[{}] Deleted tenant {}", clientAppId(), tenant); + asyncResponse.resume(Response.noContent().build()); + } + }); + }); } - protected CompletableFuture internalDeleteTenantAsyncForcefully(String tenant) { + protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) { if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) { - return FutureUtil.failedFuture( + asyncResponse.resume( new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants")); + return; } - return tenantResources().getListOfNamespacesAsync(tenant) - .thenApply(namespaces -> { - final List> futures = Lists.newArrayList(); - try { - PulsarAdmin adminClient = pulsar().getAdminClient(); - for (String namespace : namespaces) { - futures.add(adminClient.namespaces().deleteNamespaceAsync(namespace, true)); - } - } catch (Exception e) { - log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e); - throw new RestException(e); - } - return futures; - }) - .thenCompose(futures -> FutureUtil.waitForAll(futures)) - .thenCompose(__ -> internalDeleteTenantAsync(tenant)); + + List namespaces; + try { + namespaces = tenantResources().getListOfNamespaces(tenant); + } catch (Exception e) { + log.error("[{}] Failed to get namespaces list of {}", clientAppId(), tenant, e); + asyncResponse.resume(new RestException(e)); + return; + } + + final List> futures = Lists.newArrayList(); + try { + for (String namespace : namespaces) { + futures.add(pulsar().getAdminClient().namespaces().deleteNamespaceAsync(namespace, true)); + } + } catch (Exception e) { + log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e); + asyncResponse.resume(new RestException(e)); + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + } else { + log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, exception); + asyncResponse.resume(new RestException(exception.getCause())); + } + return null; + } + + // delete tenant normally + internalDeleteTenant(asyncResponse, tenant); + + asyncResponse.resume(Response.noContent().build()); + return null; + }); } - private CompletableFuture validateClustersAsync(TenantInfo info) { + private void validateClusters(TenantInfo info) { // empty cluster shouldn't be allowed if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)) .collect(Collectors.toSet()).isEmpty() || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) { log.warn("[{}] Failed to validate due to clusters are empty", clientAppId()); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty")); + throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty"); } - return clusterResources().listAsync().thenAccept(availableClusters -> { + + List nonexistentClusters; + try { + Set availableClusters = clusterResources().list(); Set allowedClusters = info.getAllowedClusters(); - List nonexistentClusters = allowedClusters.stream() - .filter(cluster -> !(availableClusters.contains(cluster) || GLOBAL_CLUSTER.equals(cluster))) + nonexistentClusters = allowedClusters.stream().filter( + cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster))) .collect(Collectors.toList()); - if (nonexistentClusters.size() > 0) { - log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters); - throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist"); - } - }); + } catch (Exception e) { + log.error("[{}] Failed to get available clusters", clientAppId(), e); + throw new RestException(e); + } + if (nonexistentClusters.size() > 0) { + log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters); + throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist"); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index e491f78c9e23e..38c61d3688d77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -40,7 +40,6 @@ import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -65,8 +64,6 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.common.naming.Constants; @@ -1045,17 +1042,6 @@ public void validatePoliciesReadOnlyAccess() { } } - public CompletableFuture validatePoliciesReadOnlyAccessAsync() { - return namespaceResources().getPoliciesReadOnlyAsync().thenAccept(readOnly -> { - if (readOnly) { - if (log.isDebugEnabled()) { - log.debug("Policies are read-only. Broker cannot do read-write operations"); - } - throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); - } - }); - } - protected CompletableFuture hasActiveNamespace(String tenant) { return tenantResources().hasActiveNamespace(tenant); } @@ -1197,17 +1183,4 @@ public T sync(Supplier> supplier) { throw new RestException(ex); } } - - protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { - Throwable realCause = FutureUtil.unwrapCompletionException(exception); - if (realCause instanceof WebApplicationException) { - asyncResponse.resume(realCause); - } else if (realCause instanceof BrokerServiceException.NotAllowedException) { - asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); - } else if (realCause instanceof PulsarAdminException) { - asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); - } else { - asyncResponse.resume(new RestException(realCause)); - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 23c50f3879e6a..583a87275bda4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -411,7 +411,7 @@ public void clusters() throws Exception { public void properties() throws Throwable { Object response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); - verify(properties, times(1)).validateSuperUserAccessAsync(); + verify(properties, times(1)).validateSuperUserAccess(); // create local cluster asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); @@ -423,22 +423,22 @@ public void properties() throws Throwable { .allowedClusters(allowedClusters) .build(); response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); - verify(properties, times(2)).validateSuperUserAccessAsync(); + verify(properties, times(2)).validateSuperUserAccess(); response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList("test-property")); - verify(properties, times(3)).validateSuperUserAccessAsync(); + verify(properties, times(3)).validateSuperUserAccess(); response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, tenantInfo); - verify(properties, times(4)).validateSuperUserAccessAsync(); + verify(properties, times(4)).validateSuperUserAccess(); final TenantInfoImpl newPropertyAdmin = TenantInfoImpl.builder() .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(allowedClusters) .build(); response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); - verify(properties, times(5)).validateSuperUserAccessAsync(); + verify(properties, times(5)).validateSuperUserAccess(); // Wait for updateTenant to take effect Thread.sleep(100); @@ -447,7 +447,7 @@ public void properties() throws Throwable { assertEquals(response, newPropertyAdmin); response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertNotSame(response, tenantInfo); - verify(properties, times(7)).validateSuperUserAccessAsync(); + verify(properties, times(7)).validateSuperUserAccess(); // Check creating existing property try { From d23a44209874983cd3fec24b9f9d334f8c5d60cf Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 18 May 2022 12:03:59 +0300 Subject: [PATCH 2/4] Revert "[improve][broker] Make some methods in NamespacesBase async. (#15518)" This reverts commit 138ea354f9de30e4b7bc9d4fe6ee16b84cd4a896. --- .../broker/resources/NamespaceResources.java | 4 - .../broker/resources/TenantResources.java | 38 +-- .../pulsar/broker/admin/AdminResource.java | 10 - .../broker/admin/impl/NamespacesBase.java | 74 +++--- .../pulsar/broker/admin/v1/Namespaces.java | 126 ++++------ .../pulsar/broker/admin/v2/Namespaces.java | 92 +++----- .../pulsar/broker/web/PulsarWebResource.java | 15 -- .../apache/pulsar/broker/admin/AdminTest.java | 221 +++++++++++++----- .../pulsar/broker/admin/NamespacesTest.java | 80 +++---- .../auth/MockedPulsarServiceBaseTest.java | 99 -------- 10 files changed, 308 insertions(+), 451 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index c24df6c586fb4..ce797a80c7c3a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -93,10 +93,6 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies); } - public CompletableFuture createPoliciesAsync(NamespaceName ns, Policies policies) { - return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies); - } - public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException { String path = joinPath(BASE_POLICIES_PATH, ns.toString()); return super.exists(path) && super.getChildren(path).isEmpty(); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java index 87f48f7e682a4..3313b61c8a1e1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.resources; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -79,7 +78,7 @@ public CompletableFuture updateTenantAsync(String tenantName, Function tenantExistsAsync(String tenantName) { - return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName)); + return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName)); } public List getListOfNamespaces(String tenant) throws MetadataStoreException { @@ -111,41 +110,6 @@ public List getListOfNamespaces(String tenant) throws MetadataStoreExcep return namespaces; } - public CompletableFuture> getListOfNamespacesAsync(String tenant) { - // this will return a cluster in v1 and a namespace in v2 - return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant)) - .thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key -> - getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key)) - .thenCompose(children -> { - if (children == null || children.isEmpty()) { - String namespace = NamespaceName.get(tenant, key).toString(); - // if the length is 0 then this is probably a leftover cluster from namespace - // created with the v1 admin format (prop/cluster/ns) and then deleted, so no - // need to add it to the list - return getAsync(joinPath(BASE_POLICIES_PATH, namespace)) - .thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace) - : new ArrayList()) - .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof MetadataStoreException - .ContentDeserializationException) { - return new ArrayList<>(); - } - throw FutureUtil.wrapToCompletionException(ex); - }); - } else { - CompletableFuture> ret = new CompletableFuture(); - ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns) - .toString()).collect(Collectors.toList())); - return ret; - } - })).reduce(CompletableFuture.completedFuture(new ArrayList<>()), - (accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> { - namespaces.addAll(m); - return namespaces; - })))); - } - public CompletableFuture> getActiveNamespaces(String tenant, String cluster) { return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 211f128cc0a4c..7699ec91a0053 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -323,11 +323,6 @@ protected CompletableFuture getNamespacePoliciesAsync(NamespaceName na return FutureUtil.failedFuture(new RestException(e)); } policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; - if (policies.get().is_allow_auto_update_schema == null) { - // the type changed from boolean to Boolean. return broker value here for keeping compatibility. - policies.get().is_allow_auto_update_schema = pulsar().getConfig() - .isAllowAutoUpdateSchemaEnabled(); - } return CompletableFuture.completedFuture(policies.get()); }); } else { @@ -539,11 +534,6 @@ protected List getPartitionedTopicList(TopicDomain topicDomain) { } } - protected CompletableFuture> getPartitionedTopicListAsync(TopicDomain topicDomain) { - return namespaceResources().getPartitionedTopicResources() - .listPartitionedTopicsAsync(namespaceName, topicDomain); - } - protected List getTopicPartitionList(TopicDomain topicDomain) { try { return getPulsarResources().getTopicResources().getExistingPartitions(topicName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 6a3dbefb75110..208dc223f316f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -103,6 +103,7 @@ import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; @@ -110,47 +111,52 @@ public abstract class NamespacesBase extends AdminResource { - protected CompletableFuture> internalGetTenantNamespaces(String tenant) { - if (tenant == null) { - return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null")); - } + protected List internalGetTenantNamespaces(String tenant) { + checkNotNull(tenant, "Tenant should not be null"); try { NamedEntity.checkName(tenant); } catch (IllegalArgumentException e) { log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); + throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"); + } + validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES); + + try { + if (!tenantResources().tenantExists(tenant)) { + throw new RestException(Status.NOT_FOUND, "Tenant not found"); + } + + return tenantResources().getListOfNamespaces(tenant); + } catch (Exception e) { + log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e); + throw new RestException(e); } - return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES) - .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant)) - .thenCompose(existed -> { - if (!existed) { - throw new RestException(Status.NOT_FOUND, "Tenant not found"); - } - return tenantResources().getListOfNamespacesAsync(tenant); - }); } - protected CompletableFuture internalCreateNamespace(Policies policies) { - return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenAccept(__ -> validatePolicies(namespaceName, policies)) - .thenCompose(__ -> { - int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant(); - // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded. - if (maxNamespacesPerTenant > 0) { - return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant()) - .thenAccept(namespaces -> { - if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) { - throw new RestException(Status.PRECONDITION_FAILED, - "Exceed the maximum number of namespace in tenant :" - + namespaceName.getTenant()); - } - }); - } - return CompletableFuture.completedFuture(null); - }) - .thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies)) - .thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName)); + protected void internalCreateNamespace(Policies policies) { + validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE); + validatePoliciesReadOnlyAccess(); + validatePolicies(namespaceName, policies); + + try { + int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant(); + // no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded. + if (maxNamespacesPerTenant > 0) { + List namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant()); + if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) { + throw new RestException(Status.PRECONDITION_FAILED, + "Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant()); + } + } + namespaceResources().createPolicies(namespaceName, policies); + log.info("[{}] Created namespace {}", clientAppId(), namespaceName); + } catch (AlreadyExistsException e) { + log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName); + throw new RestException(Status.CONFLICT, "Namespace already exists"); + } catch (Exception e) { + log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e); + throw new RestException(e); + } } protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 6076ba10c5236..99a9d1db53a38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -43,7 +42,6 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; @@ -69,8 +67,6 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; -import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,15 +84,8 @@ public class Namespaces extends NamespacesBase { response = String.class, responseContainer = "Set") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property doesn't exist")}) - public void getTenantNamespaces(@Suspended AsyncResponse response, - @PathParam("property") String property) { - internalGetTenantNamespaces(property) - .thenAccept(response::resume) - .exceptionally(ex -> { - log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + public List getTenantNamespaces(@PathParam("property") String property) { + return internalGetTenantNamespaces(property); } @GET @@ -136,20 +125,21 @@ public List getNamespacesForCluster(@PathParam("property") String tenant @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) - public void getTopics(@Suspended AsyncResponse response, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { + public void getTopics(@PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, + @Suspended AsyncResponse asyncResponse) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS) - // Validate that namespace exists, throws 404 if it doesn't exist - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)) - .thenAccept(response::resume) + validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS); + + // Validate that namespace exists, throws 404 if it doesn't exist + getNamespacePolicies(namespaceName); + + pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); + asyncResponse.resume(ex); return null; }); } @@ -160,20 +150,11 @@ public void getTopics(@Suspended AsyncResponse response, response = Policies.class) @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) - public void getPolicies(@Suspended AsyncResponse response, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public Policies getPolicies(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace), PolicyName.ALL, - PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(response::resume) - .exceptionally(ex -> { - log.error("Failed to get policies for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.ALL, PolicyOperation.READ); + return getNamespacePolicies(namespaceName); } @SuppressWarnings("deprecation") @@ -184,46 +165,29 @@ public void getPolicies(@Suspended AsyncResponse response, @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(code = 412, message = "Namespace name is not valid") }) - public void createNamespace(@Suspended AsyncResponse response, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, - BundlesData initialBundles) { + public void createNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, BundlesData initialBundles) { validateNamespaceName(property, cluster, namespace); - CompletableFuture ret; if (!namespaceName.isGlobal()) { // If the namespace is non global, make sure property has the access on the cluster. For global namespace, // same check is made at the time of setting replication. - ret = validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()); - } else { - ret = CompletableFuture.completedFuture(null); + validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); } - ret.thenApply(__ -> { - Policies policies = new Policies(); - if (initialBundles != null && initialBundles.getNumBundles() > 0) { - if (initialBundles.getBoundaries() == null || initialBundles.getBoundaries().size() == 0) { - policies.bundles = getBundles(initialBundles.getNumBundles()); - } else { - policies.bundles = validateBundlesData(initialBundles); - } + + Policies policies = new Policies(); + if (initialBundles != null && initialBundles.getNumBundles() > 0) { + if (initialBundles.getBoundaries() == null || initialBundles.getBoundaries().size() == 0) { + policies.bundles = getBundles(initialBundles.getNumBundles()); } else { - int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); - policies.bundles = getBundles(defaultNumberOfBundles); + policies.bundles = validateBundlesData(initialBundles); } - return policies; - }).thenCompose(this::internalCreateNamespace) - .thenAccept(__ -> response.resume(Response.noContent().build())) - .exceptionally(ex -> { - Throwable root = FutureUtil.unwrapCompletionException(ex); - if (root instanceof MetadataStoreException.AlreadyExistsException) { - response.resume(new RestException(Status.CONFLICT, "Namespace already exists")); - } else { - log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - } - return null; - }); + } else { + int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); + policies.bundles = getBundles(defaultNumberOfBundles); + } + + internalCreateNamespace(policies); } @DELETE @@ -236,9 +200,9 @@ public void createNamespace(@Suspended AsyncResponse response, @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(property, cluster, namespace); internalDeleteNamespace(asyncResponse, authoritative, force); @@ -272,19 +236,13 @@ public void deleteNamespaceBundle(@PathParam("property") String property, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public void getPermissions(@Suspended AsyncResponse response, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public Map> getPermissions(@PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(policies -> response.resume(policies.auth_policies.getNamespaceAuthentication())) - .exceptionally(ex -> { - log.error("Failed to get permissions for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.auth_policies.getNamespaceAuthentication(); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 7dcf97020fb09..3259023658647 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -75,9 +75,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,15 +91,8 @@ public class Namespaces extends NamespacesBase { response = String.class, responseContainer = "Set") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant doesn't exist")}) - public void getTenantNamespaces(@Suspended final AsyncResponse response, - @PathParam("tenant") String tenant) { - internalGetTenantNamespaces(tenant) - .thenAccept(response::resume) - .exceptionally(ex -> { - log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + public List getTenantNamespaces(@PathParam("tenant") String tenant) { + return internalGetTenantNamespaces(tenant); } @GET @@ -111,19 +102,21 @@ public void getTenantNamespaces(@Suspended final AsyncResponse response, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) - public void getTopics(@Suspended AsyncResponse response, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { + public void getTopics(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, + @Suspended AsyncResponse asyncResponse) { validateNamespaceName(tenant, namespace); - validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS) - // Validate that namespace exists, throws 404 if it doesn't exist - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)) - .thenAccept(response::resume) + validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS); + + // Validate that namespace exists, throws 404 if it doesn't exist + getNamespacePolicies(namespaceName); + + pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); + asyncResponse.resume(ex); return null; }); } @@ -133,19 +126,10 @@ public void getTopics(@Suspended AsyncResponse response, @ApiOperation(value = "Get the dump all the policies specified for a namespace.", response = Policies.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void getPolicies(@Suspended AsyncResponse response, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.ALL, - PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(response::resume) - .exceptionally(ex -> { - log.error("Failed to get policies for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.ALL, PolicyOperation.READ); + return getNamespacePolicies(namespaceName); } @PUT @@ -155,24 +139,11 @@ public void getPolicies(@Suspended AsyncResponse response, @ApiResponse(code = 404, message = "Tenant or cluster doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(code = 412, message = "Namespace name is not valid") }) - public void createNamespace(@Suspended AsyncResponse response, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @ApiParam(value = "Policies for the namespace") Policies policies) { + public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @ApiParam(value = "Policies for the namespace") Policies policies) { validateNamespaceName(tenant, namespace); policies = getDefaultPolicesIfNull(policies); - internalCreateNamespace(policies) - .thenAccept(__ -> response.resume(Response.noContent().build())) - .exceptionally(ex -> { - Throwable root = FutureUtil.unwrapCompletionException(ex); - if (root instanceof MetadataStoreException.AlreadyExistsException) { - response.resume(new RestException(Response.Status.CONFLICT, "Namespace already exists")); - } else { - log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - } - return null; - }); + internalCreateNamespace(policies); } @DELETE @@ -185,9 +156,9 @@ public void createNamespace(@Suspended AsyncResponse response, @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @PathParam("namespace") String namespace, + @QueryParam("force") @DefaultValue("false") boolean force, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateNamespaceName(tenant, namespace); internalDeleteNamespace(asyncResponse, authoritative, force); @@ -220,18 +191,13 @@ public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public void getPermissions(@Suspended AsyncResponse response, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public Map> getPermissions(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(policies -> response.resume(policies.auth_policies.getNamespaceAuthentication())) - .exceptionally(ex -> { - log.error("Failed to get permissions for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(response, ex); - return null; - }); + validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.auth_policies.getNamespaceAuthentication(); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 38c61d3688d77..1690210a80a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -415,21 +415,6 @@ protected void validateClusterForTenant(String tenant, String cluster) { log.info("Successfully validated clusters on tenant [{}]", tenant); } - protected CompletableFuture validateClusterForTenantAsync(String tenant, String cluster) { - return pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant) - .thenAccept(tenantInfo -> { - if (!tenantInfo.isPresent()) { - throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); - } - if (!tenantInfo.get().getAllowedClusters().contains(cluster)) { - String msg = String.format("Cluster [%s] is not in the list of allowed clusters list" - + " for tenant [%s]", cluster, tenant); - log.info(msg); - throw new RestException(Status.FORBIDDEN, msg); - } - }); - } - protected CompletableFuture validateClusterOwnershipAsync(String cluster) { return getClusterDataIfDifferentCluster(pulsar(), cluster, clientAppId()) .thenAccept(differentClusterData -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 583a87275bda4..bfc292500284a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -38,12 +38,17 @@ import java.net.URI; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; @@ -191,7 +196,7 @@ public void internalConfiguration() throws Exception { new ClientConfiguration().getZkLedgersRootPath(), conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null, pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null)); - Object response = asyncRequests(ctx -> brokers.getInternalConfigurationData(ctx)); + Object response = asynRequests(ctx -> brokers.getInternalConfigurationData(ctx)); assertTrue(response instanceof InternalConfigurationData); assertEquals(response, expectedData); } @@ -199,18 +204,18 @@ public void internalConfiguration() throws Exception { @Test @SuppressWarnings("unchecked") public void clusters() throws Exception { - assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); verify(clusters, never()).validateSuperUserAccessAsync(); - asyncRequests(ctx -> clusters.createCluster(ctx, + asynRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); // ensure to read from ZooKeeper directly //clusters.clustersListCache().clear(); - assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet("use")); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet("use")); // Check creating existing cluster try { - asyncRequests(ctx -> clusters.createCluster(ctx, "use", + asynRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { @@ -219,23 +224,23 @@ public void clusters() throws Exception { // Check deleting non-existing cluster try { - asyncRequests(ctx -> clusters.deleteCluster(ctx, "usc")); + asynRequests(ctx -> clusters.deleteCluster(ctx, "usc")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } - assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")), + assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); - asyncRequests(ctx -> clusters.updateCluster(ctx, "use", + asynRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build())); - assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")), + assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()); try { - asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); @@ -255,38 +260,38 @@ public void clusters() throws Exception { .build(); AsyncResponse response = mock(AsyncResponse.class); clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData); - asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { - asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 412); } clusters.deleteNamespaceIsolationPolicy("use", "policy1"); - assertTrue(((Map) asyncRequests(ctx -> + assertTrue(((Map) asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty()); - asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); - assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); try { - asyncRequests(ctx -> clusters.getCluster(ctx, "use")); + asynRequests(ctx -> clusters.getCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); } try { - asyncRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().build())); + asynRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); } try { - asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); + asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); @@ -306,7 +311,7 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - asyncRequests(ctx -> clusters.getClusters(ctx)); + asynRequests(ctx -> clusters.getClusters(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -317,7 +322,7 @@ public void clusters() throws Exception { && path.equals("/admin/clusters/test"); }); try { - asyncRequests(ctx -> clusters.createCluster(ctx, "test", + asynRequests(ctx -> clusters.createCluster(ctx, "test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { @@ -331,7 +336,7 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - asyncRequests(ctx -> clusters.updateCluster(ctx, "test", + asynRequests(ctx -> clusters.updateCluster(ctx, "test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build())); fail("should have failed"); } catch (RestException e) { @@ -344,7 +349,7 @@ public void clusters() throws Exception { }); try { - asyncRequests(ctx -> clusters.getCluster(ctx, "test")); + asynRequests(ctx -> clusters.getCluster(ctx, "test")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -356,7 +361,7 @@ public void clusters() throws Exception { }); try { - asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -370,7 +375,7 @@ public void clusters() throws Exception { isolationPolicyCache.invalidateAll(); store.invalidateAll(); try { - asyncRequests(ctx -> clusters.deleteCluster(ctx, "use")); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -378,7 +383,7 @@ public void clusters() throws Exception { // Check name validations try { - asyncRequests(ctx -> clusters.createCluster(ctx, "bf@", + asynRequests(ctx -> clusters.createCluster(ctx, "bf@", ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build())); fail("should have filed"); } catch (RestException e) { @@ -387,7 +392,7 @@ public void clusters() throws Exception { // Check authentication and listener name try { - asyncRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder() + asynRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder() .serviceUrl("http://dummy.web.example.com") .serviceUrlTls("") .brokerServiceUrl("http://dummy.messaging.example.com") @@ -396,7 +401,7 @@ public void clusters() throws Exception { .authenticationParameters("authenticationParameters") .listenerName("listenerName") .build())); - ClusterData cluster = (ClusterData) asyncRequests(ctx -> clusters.getCluster(ctx, "auth")); + ClusterData cluster = (ClusterData) asynRequests(ctx -> clusters.getCluster(ctx, "auth")); assertEquals(cluster.getAuthenticationPlugin(), "authenticationPlugin"); assertEquals(cluster.getAuthenticationParameters(), "authenticationParameters"); assertEquals(cluster.getListenerName(), "listenerName"); @@ -407,14 +412,23 @@ public void clusters() throws Exception { verify(clusters, times(2)).validateSuperUserAccess(); } + Object asynRequests(Consumer function) throws Exception { + TestAsyncResponse ctx = new TestAsyncResponse(); + function.accept(ctx); + ctx.latch.await(); + if (ctx.e != null) { + throw (Exception) ctx.e; + } + return ctx.response; + } @Test public void properties() throws Throwable { - Object response = asyncRequests(ctx -> properties.getTenants(ctx)); + Object response = asynRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); verify(properties, times(1)).validateSuperUserAccess(); // create local cluster - asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); + asynRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); Set allowedClusters = Sets.newHashSet(); allowedClusters.add(configClusterName); @@ -422,14 +436,14 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "role2")) .allowedClusters(allowedClusters) .build(); - response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); + response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); verify(properties, times(2)).validateSuperUserAccess(); - response = asyncRequests(ctx -> properties.getTenants(ctx)); + response = asynRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList("test-property")); verify(properties, times(3)).validateSuperUserAccess(); - response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, tenantInfo); verify(properties, times(4)).validateSuperUserAccess(); @@ -437,21 +451,21 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(allowedClusters) .build(); - response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); + response = asynRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); verify(properties, times(5)).validateSuperUserAccess(); // Wait for updateTenant to take effect Thread.sleep(100); - response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, newPropertyAdmin); - response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertNotSame(response, tenantInfo); verify(properties, times(7)).validateSuperUserAccess(); // Check creating existing property try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); + response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); @@ -459,14 +473,14 @@ public void properties() throws Throwable { // Check non-existing property try { - response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing")); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } try { - response = asyncRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin)); + response = asynRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -474,7 +488,7 @@ public void properties() throws Throwable { // Check deleting non-existing property try { - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -491,7 +505,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies"); }); try { - response = asyncRequests(ctx -> properties.getTenants(ctx)); + response = asynRequests(ctx -> properties.getTenants(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -501,7 +515,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); }); try { - response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant")); + response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -511,7 +525,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant"); }); try { - response = asyncRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin)); + response = asynRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -521,7 +535,7 @@ public void properties() throws Throwable { return op == MockZooKeeper.Op.CREATE && path.equals("/admin/policies/test"); }); try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo)); + response = asynRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -533,28 +547,28 @@ public void properties() throws Throwable { try { cache.invalidateAll(); store.invalidateAll(); - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - response = asyncRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo)); + response = asynRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo)); mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> { return op == MockZooKeeper.Op.DELETE && path.equals("/admin/policies/error-property"); }); try { - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false)); response = Lists.newArrayList(); - response = asyncRequests(ctx -> properties.getTenants(ctx)); + response = asynRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); // Create a namespace to test deleting a non-empty property @@ -562,12 +576,12 @@ public void properties() throws Throwable { .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(Sets.newHashSet("use")) .build(); - response = asyncRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2)); + response = asynRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2)); - response = asyncRequests(ctx -> namespaces.createNamespace(ctx,"my-tenant", "use", "my-namespace", BundlesData.builder().build())); + namespaces.createNamespace("my-tenant", "use", "my-namespace", BundlesData.builder().build()); try { - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); fail("should have failed"); } catch (RestException e) { // Ok @@ -575,7 +589,7 @@ public void properties() throws Throwable { // Check name validation try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo)); + response = asynRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -583,7 +597,7 @@ public void properties() throws Throwable { // Check tenantInfo is null try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null)); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -597,7 +611,7 @@ public void properties() throws Throwable { .allowedClusters(blankClusters) .build(); try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster)); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -611,7 +625,7 @@ public void properties() throws Throwable { .allowedClusters(containBlankClusters) .build(); try { - response = asyncRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster)); + response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -622,13 +636,13 @@ public void properties() throws Throwable { ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(response2, timeout(5000).times(1)).resume(captor.capture()); assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode()); - response = asyncRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); + response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false)); } @Test @SuppressWarnings("unchecked") public void brokers() throws Exception { - asyncRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder() + asynRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder() .serviceUrl("http://broker.messaging.use.example.com") .serviceUrlTls("https://broker.messaging.use.example.com:4443") .build())); @@ -640,12 +654,12 @@ public void brokers() throws Exception { Field uriField = PulsarWebResource.class.getDeclaredField("uri"); uriField.setAccessible(true); uriField.set(brokers, mockUri); - Object res = asyncRequests(ctx -> brokers.getActiveBrokers(ctx, "use")); + Object res = asynRequests(ctx -> brokers.getActiveBrokers(ctx, "use")); assertTrue(res instanceof Set); Set activeBrokers = (Set) res; assertEquals(activeBrokers.size(), 1); assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get())); - Object leaderBrokerRes = asyncRequests(ctx -> brokers.getLeaderBroker(ctx)); + Object leaderBrokerRes = asynRequests(ctx -> brokers.getLeaderBroker(ctx)); assertTrue(leaderBrokerRes instanceof BrokerInfo); BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes; assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get()); @@ -693,8 +707,8 @@ public void resourceQuotas() throws Exception { .allowedClusters(Collections.singleton(cluster)) .build(); ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build(); - asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData )); - asyncRequests(ctx -> properties.createTenant(ctx, property, admin)); + asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData )); + asynRequests(ctx -> properties.createTenant(ctx, property, admin)); // customized bandwidth for this namespace double customizeBandwidth = 3000; @@ -862,4 +876,87 @@ public void test500Error() throws Exception { Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); Assert.assertTrue(((ErrorData)responseCaptor.getValue().getResponse().getEntity()).reason.contains("500 error contains error message")); } + + + static class TestAsyncResponse implements AsyncResponse { + + Object response; + Throwable e; + CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean resume(Object response) { + this.response = response; + latch.countDown(); + return true; + } + + @Override + public boolean resume(Throwable response) { + this.e = response; + latch.countDown(); + return true; + } + + @Override + public boolean cancel() { + return false; + } + + @Override + public boolean cancel(int retryAfter) { + return false; + } + + @Override + public boolean cancel(Date retryAfter) { + return false; + } + + @Override + public boolean isSuspended() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public boolean setTimeout(long time, TimeUnit unit) { + return false; + } + + @Override + public void setTimeoutHandler(TimeoutHandler handler) { + + } + + @Override + public Collection> register(Class callback) { + return null; + } + + @Override + public Map, Collection>> register(Class callback, Class... callbacks) { + return null; + } + + @Override + public Collection> register(Object callback) { + return null; + } + + @Override + public Map, Collection>> register(Object callback, Object... callbacks) { + return null; + } + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index bf83d1702ecd2..55af39f32c1f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -204,8 +204,8 @@ public void cleanup() throws Exception { @Test public void testCreateNamespaces() throws Exception { try { - asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "other-colo", "my-namespace", - BundlesData.builder().build())); + namespaces.createNamespace(this.testTenant, "other-colo", "my-namespace", + BundlesData.builder().build()); fail("should have failed"); } catch (RestException e) { // Ok, cluster doesn't exist @@ -218,24 +218,24 @@ public void testCreateNamespaces() throws Exception { createTestNamespaces(nsnames, BundlesData.builder().build()); try { - asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-1", - BundlesData.builder().build())); + namespaces.createNamespace(this.testTenant, "use", "create-namespace-1", + BundlesData.builder().build()); fail("should have failed"); } catch (RestException e) { // Ok, namespace already exists } try { - asyncRequests(response -> namespaces.createNamespace(response,"non-existing-tenant", "use", "create-namespace-1", - BundlesData.builder().build())); + namespaces.createNamespace("non-existing-tenant", "use", "create-namespace-1", + BundlesData.builder().build()); fail("should have failed"); } catch (RestException e) { // Ok, tenant doesn't exist } try { - asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-#", - BundlesData.builder().build())); + namespaces.createNamespace(this.testTenant, "use", "create-namespace-#", + BundlesData.builder().build()); fail("should have failed"); } catch (RestException e) { // Ok, invalid namespace name @@ -247,7 +247,7 @@ public void testCreateNamespaces() throws Exception { && path.equals("/admin/policies/my-tenant/use/my-namespace-3"); }); try { - asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "my-namespace-3", BundlesData.builder().build())); + namespaces.createNamespace(this.testTenant, "use", "my-namespace-3", BundlesData.builder().build()); fail("should have failed"); } catch (RestException e) { // Ok @@ -264,24 +264,18 @@ public void testGetNamespaces() throws Exception { this.testLocalNamespaces.get(1).toString(), this.testLocalNamespaces.get(2).toString(), this.testGlobalNamespaces.get(0).toString()); expectedList.sort(null); - AsyncResponse response = mock(AsyncResponse.class); - namespaces.getTenantNamespaces(response, this.testTenant); - ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(captor.capture()); - List namespacesList = (List) captor.getValue(); - namespacesList.sort(null); - assertEquals(namespacesList, expectedList); + assertEquals(namespaces.getTenantNamespaces(this.testTenant), expectedList); try { // check the tenant name is valid - asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant + "/default")); + namespaces.getTenantNamespaces(this.testTenant + "/default"); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } try { - asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, "non-existing-tenant")); + namespaces.getTenantNamespaces("non-existing-tenant"); fail("should have failed"); } catch (RestException e) { // Ok, does not exist @@ -306,7 +300,7 @@ public void testGetNamespaces() throws Exception { tenantCache.invalidateAll(); store.invalidateAll(); try { - asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant)); + namespaces.getTenantNamespaces(this.testTenant); fail("should have failed"); } catch (RestException e) { // Ok @@ -328,46 +322,46 @@ public void testGetNamespaces() throws Exception { @Test(enabled = false) public void testGrantAndRevokePermissions() throws Exception { Policies expectedPolicies = new Policies(); - assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); - assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); + assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.grantPermissionOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "my-role", EnumSet.of(AuthAction.produce)); expectedPolicies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.of(AuthAction.produce)); - assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); - assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); + assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.grantPermissionOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "other-role", EnumSet.of(AuthAction.consume)); expectedPolicies.auth_policies.getNamespaceAuthentication().put("other-role", EnumSet.of(AuthAction.consume)); - assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); - assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); + assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); namespaces.revokePermissionsOnNamespace(this.testTenant, this.testLocalCluster, this.testLocalNamespaces.get(0).getLocalName(), "my-role"); expectedPolicies.auth_policies.getNamespaceAuthentication().remove("my-role"); - assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies); - assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, - this.testLocalNamespaces.get(0).getLocalName())), expectedPolicies.auth_policies.getNamespaceAuthentication()); + assertEquals(namespaces.getPolicies(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies); + assertEquals(namespaces.getPermissions(this.testTenant, this.testLocalCluster, + this.testLocalNamespaces.get(0).getLocalName()), expectedPolicies.auth_policies.getNamespaceAuthentication()); // Non-existing namespaces try { - asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, this.testLocalCluster, "non-existing-namespace-1")); + namespaces.getPolicies(this.testTenant, this.testLocalCluster, "non-existing-namespace-1"); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } try { - asyncRequests(ctx -> namespaces.getPermissions(ctx, this.testTenant, this.testLocalCluster, "non-existing-namespace-1")); + namespaces.getPermissions(this.testTenant, this.testLocalCluster, "non-existing-namespace-1"); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -400,7 +394,7 @@ public void testGrantAndRevokePermissions() throws Exception { }); try { - asyncRequests(ctx -> namespaces.getPolicies(ctx, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName())); + namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); fail("should have failed"); } catch (RestException e) { // Ok @@ -414,7 +408,7 @@ public void testGrantAndRevokePermissions() throws Exception { return true; }); try { - asyncRequests(ctx -> namespaces.getPermissions(ctx, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName())); + namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); fail("should have failed"); } catch (RestException e) { // Ok @@ -765,7 +759,7 @@ public void testDeleteNamespaces() throws Exception { List nsList = Lists.newArrayList(this.testLocalNamespaces.get(1).toString(), this.testLocalNamespaces.get(2).toString()); nsList.sort(null); - assertEquals(asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, this.testTenant)), nsList); + assertEquals(namespaces.getTenantNamespaces(this.testTenant), nsList); testNs = this.testLocalNamespaces.get(1); // setup ownership to localhost @@ -985,16 +979,16 @@ public void testUnloadNamespaceWithBundles() throws Exception { private void createBundledTestNamespaces(String property, String cluster, String namespace, BundlesData bundle) throws Exception { - asyncRequests(ctx -> namespaces.createNamespace(ctx, property, cluster, namespace, bundle)); + namespaces.createNamespace(property, cluster, namespace, bundle); } private void createGlobalTestNamespaces(String property, String namespace, BundlesData bundle) throws Exception { - asyncRequests(ctx -> namespaces.createNamespace(ctx, property, "global", namespace, bundle)); + namespaces.createNamespace(property, "global", namespace, bundle); } private void createTestNamespaces(List nsnames, BundlesData bundle) throws Exception { for (NamespaceName nsName : nsnames) { - asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle)); + namespaces.createNamespace(nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index dd75f6eab3f9d..34cdce483bf32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -34,15 +34,11 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; @@ -75,9 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; - /** * Base class for all tests that need a Pulsar instance without a ZK and BK cluster. */ @@ -527,97 +520,5 @@ protected void setupDefaultTenantAndNamespace() throws Exception { } } - protected Object asyncRequests(Consumer function) throws Exception { - TestAsyncResponse ctx = new TestAsyncResponse(); - function.accept(ctx); - ctx.latch.await(); - if (ctx.e != null) { - throw (Exception) ctx.e; - } - return ctx.response; - } - - public static class TestAsyncResponse implements AsyncResponse { - - Object response; - Throwable e; - CountDownLatch latch = new CountDownLatch(1); - - @Override - public boolean resume(Object response) { - this.response = response; - latch.countDown(); - return true; - } - - @Override - public boolean resume(Throwable response) { - this.e = response; - latch.countDown(); - return true; - } - - @Override - public boolean cancel() { - return false; - } - - @Override - public boolean cancel(int retryAfter) { - return false; - } - - @Override - public boolean cancel(Date retryAfter) { - return false; - } - - @Override - public boolean isSuspended() { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return false; - } - - @Override - public boolean setTimeout(long time, TimeUnit unit) { - return false; - } - - @Override - public void setTimeoutHandler(TimeoutHandler handler) { - - } - - @Override - public Collection> register(Class callback) { - return null; - } - - @Override - public Map, Collection>> register(Class callback, Class... callbacks) { - return null; - } - - @Override - public Collection> register(Object callback) { - return null; - } - - @Override - public Map, Collection>> register(Object callback, Object... callbacks) { - return null; - } - - } - private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } From 7f0456e97636cb07e9dd0f49e4096688618a85f3 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 18 May 2022 14:51:43 +0300 Subject: [PATCH 3/4] Revert "[improve][broker] Make some operation messageTTL methods in Namespaces async. (#15577)" This reverts commit 7c5067461f04a2a4125ca0945a5f7ca9dfcfa9b3. --- .../broker/admin/impl/NamespacesBase.java | 22 ++++----- .../pulsar/broker/admin/v1/Namespaces.java | 48 +++++-------------- .../pulsar/broker/admin/v2/Namespaces.java | 42 +++++----------- .../pulsar/broker/admin/NamespacesTest.java | 28 ----------- 4 files changed, 34 insertions(+), 106 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 208dc223f316f..3958e102536a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -797,18 +797,16 @@ protected void internalSetNamespaceReplicationClusters(List clusterIds) }); } - protected CompletableFuture internalSetNamespaceMessageTTLAsync(Integer messageTTL) { - return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenAccept(__ -> { - if (messageTTL != null && messageTTL < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "Invalid value for message TTL, message TTL must >= 0"); - } - }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { - policies.message_ttl_in_seconds = messageTTL; - return policies; - })); + protected void internalSetNamespaceMessageTTL(Integer messageTTL) { + validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + if (messageTTL != null && messageTTL < 0) { + throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); + } + updatePolicies(namespaceName, policies -> { + policies.message_ttl_in_seconds = messageTTL; + return policies; + }); } protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 99a9d1db53a38..62ef5c56a4b89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -348,20 +348,13 @@ public void setNamespaceReplicationClusters(@PathParam("property") String proper @ApiOperation(hidden = true, value = "Get the message TTL for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public Integer getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace), PolicyName.TTL, - PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) - .exceptionally(ex -> { - log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.message_ttl_in_seconds; } @POST @@ -370,37 +363,22 @@ public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - int messageTTL) { + public void setNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, int messageTTL) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceMessageTTLAsync(messageTTL) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetNamespaceMessageTTL(messageTTL); } @DELETE @Path("/{property}/{cluster}/{namespace}/messageTTL") - @ApiOperation(value = "Remove message TTL in seconds for namespace") + @ApiOperation(value = "Set message TTL in seconds for namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, - @PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void removeNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceMessageTTLAsync(null) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetNamespaceMessageTTL(null); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 3259023658647..53f0e4968bce4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -299,18 +299,13 @@ public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant, @ApiOperation(value = "Get the message TTL for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + public Integer getNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.TTL, - PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) - .exceptionally(ex -> { - log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.message_ttl_in_seconds; } @POST @@ -319,37 +314,22 @@ public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @ApiParam(value = "TTL in seconds for the specified namespace", required = true) - int messageTTL) { + public void setNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @ApiParam(value = "TTL in seconds for the specified namespace", required = true) int messageTTL) { validateNamespaceName(tenant, namespace); - internalSetNamespaceMessageTTLAsync(messageTTL) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetNamespaceMessageTTL(messageTTL); } @DELETE @Path("/{tenant}/{namespace}/messageTTL") - @ApiOperation(value = "Remove message TTL in seconds for namespace") + @ApiOperation(value = "Set message TTL in seconds for namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL")}) - public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, + public void removeNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalSetNamespaceMessageTTLAsync(null) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetNamespaceMessageTTL(null); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 55af39f32c1f8..0eb87491a524c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -31,7 +31,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; @@ -1320,33 +1319,6 @@ public void close() { } } - @Test - public void testOperationNamespaceMessageTTL() throws Exception { - String namespace = "ttlnamespace"; - - asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster, - namespace, BundlesData.builder().build())); - - asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, - namespace, 100)); - - int namespaceMessageTTL = (Integer) asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, - namespace)); - assertEquals(100, namespaceMessageTTL); - - asyncRequests(response -> namespaces.removeNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, namespace)); - assertNull(asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, - namespace))); - - try { - asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, - namespace, -1)); - fail("should have failed"); - } catch (RestException e) { - assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); - } - } - @Test public void testSetOffloadThreshold() throws Exception { TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic"); From 68b725bfc90f5fb3ef6a0093fec4222f3d3c43c1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 18 May 2022 14:52:11 +0300 Subject: [PATCH 4/4] Revert "[improve][broker] Make some operation deduplication methods in Namespaces async. (#15608)" This reverts commit 132ba4ad8e3808923b2c8b0a9fa5db5a8d413542. --- .../broker/admin/impl/NamespacesBase.java | 21 ++++++------ .../pulsar/broker/admin/v1/Namespaces.java | 13 ++------ .../pulsar/broker/admin/v2/Namespaces.java | 33 ++++--------------- 3 files changed, 19 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 3958e102536a7..49a221946ee35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -912,13 +912,13 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons internalSetAutoSubscriptionCreation(asyncResponse, null); } - protected CompletableFuture internalModifyDeduplicationAsync(Boolean enableDeduplication) { - return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { - policies.deduplicationEnabled = enableDeduplication; - return policies; - })); + protected void internalModifyDeduplication(Boolean enableDeduplication) { + validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + updatePolicies(namespaceName, policies -> { + policies.deduplicationEnabled = enableDeduplication; + return policies; + }); } @SuppressWarnings("deprecation") @@ -2152,10 +2152,9 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) { } } - protected CompletableFuture internalGetDeduplicationAsync() { - return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply(policies -> policies.deduplicationEnabled); + protected Boolean internalGetDeduplication() { + validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ); + return getNamespacePolicies(namespaceName).deduplicationEnabled; } protected Integer internalGetMaxConsumersPerTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 62ef5c56a4b89..c21c8b8910f5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -471,17 +471,10 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - boolean enableDeduplication) { + public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, boolean enableDeduplication) { validateNamespaceName(property, cluster, namespace); - internalModifyDeduplicationAsync(enableDeduplication) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalModifyDeduplication(enableDeduplication); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 53f0e4968bce4..e3f82b72d2ad8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -377,16 +377,9 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant, @ApiOperation(value = "Get broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalGetDeduplicationAsync() - .thenAccept(deduplication -> asyncResponse.resume(deduplication)) - .exceptionally(ex -> { - log.error("Failed to get broker deduplication config for namespace {}", namespace, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + return internalGetDeduplication(); } @POST @@ -394,19 +387,12 @@ public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam( @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, + public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Flag for disabling or enabling broker side deduplication " + "for all topics in the specified namespace", required = true) boolean enableDeduplication) { validateNamespaceName(tenant, namespace); - internalModifyDeduplicationAsync(enableDeduplication) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalModifyDeduplication(enableDeduplication); } @DELETE @@ -414,16 +400,9 @@ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalModifyDeduplicationAsync(null) - .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { - log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalModifyDeduplication(null); } @GET