From 5fedf63a436792636e4f58b444b2e0dc3dab8746 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Mon, 16 May 2022 16:14:37 +0800 Subject: [PATCH] [improve][broker] Make some methods in TenantsBase async. (#15603) --- .../pulsar/broker/admin/AdminResource.java | 15 - .../pulsar/broker/admin/impl/TenantsBase.java | 345 +++++++----------- .../pulsar/broker/web/PulsarWebResource.java | 27 ++ .../apache/pulsar/broker/admin/AdminTest.java | 12 +- 4 files changed, 169 insertions(+), 230 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 3a396411c63d6..7f77568b8bb42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,10 +40,8 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; @@ -741,19 +739,6 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn return future; } - protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { - Throwable realCause = FutureUtil.unwrapCompletionException(exception); - if (realCause instanceof WebApplicationException) { - asyncResponse.resume(realCause); - } else if (realCause instanceof BrokerServiceException.NotAllowedException) { - asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); - } else if (realCause instanceof PulsarAdminException) { - asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); - } else { - asyncResponse.resume(new RestException(realCause)); - } - } - protected CompletableFuture getSchemaCompatibilityStrategyAsync() { return validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 4f1bad2973618..045cafbfd5389 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.common.naming.Constants.GLOBAL_CLUSTER; import com.google.common.collect.Lists; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -45,8 +46,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.Constants; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -64,23 +64,18 @@ public class TenantsBase extends PulsarWebResource { @ApiResponse(code = 404, message = "Tenant doesn't exist")}) public void getTenants(@Suspended final AsyncResponse asyncResponse) { final String clientAppId = clientAppId(); - try { - validateSuperUserAccess(); - } catch (Exception e) { - asyncResponse.resume(e); - return; - } - tenantResources().listTenantsAsync().whenComplete((tenants, e) -> { - if (e != null) { - log.error("[{}] Failed to get tenants list", clientAppId, e); - asyncResponse.resume(new RestException(e)); - return; - } - // deep copy the tenants to avoid concurrent sort exception - List deepCopy = new ArrayList<>(tenants); - deepCopy.sort(null); - asyncResponse.resume(deepCopy); - }); + validateSuperUserAccessAsync() + .thenCompose(__ -> tenantResources().listTenantsAsync()) + .thenAccept(tenants -> { + // deep copy the tenants to avoid concurrent sort exception + List deepCopy = new ArrayList<>(tenants); + deepCopy.sort(null); + asyncResponse.resume(deepCopy); + }).exceptionally(ex -> { + log.error("[{}] Failed to get tenants list", clientAppId, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -91,22 +86,20 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) { public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) { final String clientAppId = clientAppId(); - try { - validateSuperUserAccess(); - } catch (Exception e) { - asyncResponse.resume(e); - } - - tenantResources().getTenantAsync(tenant).whenComplete((tenantInfo, e) -> { - if (e != null) { - log.error("[{}] Failed to get Tenant {}", clientAppId, e.getMessage()); - asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant")); - return; - } - boolean response = tenantInfo.isPresent() ? asyncResponse.resume(tenantInfo.get()) - : asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant does not exist")); - return; - }); + validateSuperUserAccessAsync() + .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) + .thenApply(tenantInfo -> { + if (!tenantInfo.isPresent()) { + throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); + } + return tenantInfo.get(); + }) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get tenant admin {}", clientAppId, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @PUT @@ -120,58 +113,44 @@ public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, public void createTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) { - final String clientAppId = clientAppId(); try { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - validateClusters(tenantInfo); NamedEntity.checkName(tenant); } catch (IllegalArgumentException e) { - log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId(), tenant, e); + log.warn("[{}] Failed to create tenant with invalid name {}", clientAppId, tenant, e); asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); return; - } catch (Exception e) { - asyncResponse.resume(e); - return; } - - tenantResources().listTenantsAsync().whenComplete((tenants, e) -> { - if (e != null) { - log.error("[{}] Failed to create tenant ", clientAppId, e.getCause()); - asyncResponse.resume(new RestException(e)); - return; - } - - int maxTenants = pulsar().getConfiguration().getMaxTenants(); - // Due to the cost of distributed locks, no locks are added here. - // In a concurrent scenario, the threshold will be exceeded. - if (maxTenants > 0) { - if (tenants != null && tenants.size() >= maxTenants) { - asyncResponse.resume( - new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants")); - return; - } - } - tenantResources().tenantExistsAsync(tenant).thenAccept(exist -> { - if (exist) { - asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist")); - return; - } - tenantResources().createTenantAsync(tenant, tenantInfo).thenAccept((r) -> { - log.info("[{}] Created tenant {}", clientAppId(), tenant); + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> validateClustersAsync(tenantInfo)) + .thenCompose(__ -> tenantResources().listTenantsAsync()) + .thenAccept(tenants -> { + int maxTenants = pulsar().getConfiguration().getMaxTenants(); + // Due to the cost of distributed locks, no locks are added here. + // In a concurrent scenario, the threshold will be exceeded. + if (maxTenants > 0) { + if (tenants != null && tenants.size() >= maxTenants) { + throw new RestException(Status.PRECONDITION_FAILED, "Exceed the maximum number of tenants"); + } + } + }) + .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant)) + .thenAccept(exist -> { + if (exist) { + throw new RestException(Status.CONFLICT, "Tenant already exist"); + } + }) + .thenCompose(__ -> tenantResources().createTenantAsync(tenant, tenantInfo)) + .thenAccept(__ -> { + log.info("[{}] Created tenant {}", clientAppId, tenant); asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { + }) + .exceptionally(ex -> { log.error("[{}] Failed to create tenant {}", clientAppId, tenant, ex); - asyncResponse.resume(new RestException(ex)); + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - }).exceptionally(ex -> { - log.error("[{}] Failed to create tenant {}", clientAppId(), tenant, ex); - asyncResponse.resume(new RestException(ex)); - return null; - }); - }); } @POST @@ -186,42 +165,28 @@ public void createTenant(@Suspended final AsyncResponse asyncResponse, public void updateTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) { - try { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - validateClusters(newTenantAdmin); - } catch (Exception e) { - asyncResponse.resume(e); - return; - } - - final String clientAddId = clientAppId(); - tenantResources().getTenantAsync(tenant).thenAccept(tenantAdmin -> { - if (!tenantAdmin.isPresent()) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found")); - return; - } - TenantInfo oldTenantAdmin = tenantAdmin.get(); - Set newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters()); - canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters).thenApply(r -> { - tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin).thenAccept(done -> { - log.info("Successfully updated tenant info {}", tenant); + final String clientAppId = clientAppId(); + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> validateClustersAsync(newTenantAdmin)) + .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) + .thenCompose(tenantAdmin -> { + if (!tenantAdmin.isPresent()) { + throw new RestException(Status.NOT_FOUND, "Tenant " + tenant + " not found"); + } + TenantInfo oldTenantAdmin = tenantAdmin.get(); + Set newClusters = new HashSet<>(newTenantAdmin.getAllowedClusters()); + return canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(), newClusters); + }) + .thenCompose(__ -> tenantResources().updateTenantAsync(tenant, old -> newTenantAdmin)) + .thenAccept(__ -> { + log.info("[{}] Successfully updated tenant info {}", clientAppId, tenant); asyncResponse.resume(Response.noContent().build()); }).exceptionally(ex -> { - log.warn("Failed to update tenant {}", tenant, ex.getCause()); - asyncResponse.resume(new RestException(ex)); + log.warn("[{}] Failed to update tenant {}", clientAppId, tenant, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - return null; - }).exceptionally(nsEx -> { - asyncResponse.resume(nsEx.getCause()); - return null; - }); - }).exceptionally(ex -> { - log.error("[{}] Failed to get tenant {}", clientAddId, tenant, ex.getCause()); - asyncResponse.resume(new RestException(ex)); - return null; - }); } @DELETE @@ -234,127 +199,89 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse, public void deleteTenant(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant, @QueryParam("force") @DefaultValue("false") boolean force) { - try { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - } catch (Exception e) { - asyncResponse.resume(e); - return; - } - internalDeleteTenant(asyncResponse, tenant, force); + final String clientAppId = clientAppId(); + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> internalDeleteTenant(tenant, force)) + .thenAccept(__ -> { + log.info("[{}] Deleted tenant {}", clientAppId, tenant); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to delete tenant {}", clientAppId, tenant, cause); + if (cause instanceof IllegalStateException) { + asyncResponse.resume(new RestException(Status.CONFLICT, cause)); + } else { + resumeAsyncResponseExceptionally(asyncResponse, cause); + } + return null; + }); } - protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant, boolean force) { - if (force) { - internalDeleteTenantForcefully(asyncResponse, tenant); - } else { - internalDeleteTenant(asyncResponse, tenant); - } + protected CompletableFuture internalDeleteTenant(String tenant, boolean force) { + return force ? internalDeleteTenantAsyncForcefully(tenant) : internalDeleteTenantAsync(tenant); } - protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant) { - tenantResources().tenantExistsAsync(tenant).thenApply(exists -> { - if (!exists) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist")); - return null; - } - - return hasActiveNamespace(tenant) - .thenCompose(ignore -> tenantResources().deleteTenantAsync(tenant)) - .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources() - .clearTenantPersistence(tenant)) - .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() - .deleteTenantAsync(tenant)) - .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() + protected CompletableFuture internalDeleteTenantAsync(String tenant) { + return tenantResources().tenantExistsAsync(tenant) + .thenAccept(exists -> { + if (!exists) { + throw new RestException(Status.NOT_FOUND, "Tenant doesn't exist"); + } + }) + .thenCompose(__ -> hasActiveNamespace(tenant)) + .thenCompose(__ -> tenantResources().deleteTenantAsync(tenant)) + .thenCompose(__ -> pulsar().getPulsarResources().getTopicResources().clearTenantPersistence(tenant)) + .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources().deleteTenantAsync(tenant)) + .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant)) - .thenCompose(ignore -> pulsar().getPulsarResources().getLocalPolicies() + .thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies() .deleteLocalPoliciesTenantAsync(tenant)) - .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources() - .deleteBundleDataTenantAsync(tenant)) - .whenComplete((ignore, ex) -> { - if (ex != null) { - log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex); - if (ex.getCause() instanceof IllegalStateException) { - asyncResponse.resume(new RestException(Status.CONFLICT, ex.getCause())); - } else { - asyncResponse.resume(new RestException(ex)); - } - } else { - log.info("[{}] Deleted tenant {}", clientAppId(), tenant); - asyncResponse.resume(Response.noContent().build()); - } - }); - }); + .thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources() + .deleteBundleDataTenantAsync(tenant)); } - protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) { + protected CompletableFuture internalDeleteTenantAsyncForcefully(String tenant) { if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) { - asyncResponse.resume( + return FutureUtil.failedFuture( new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants")); - return; - } - - List namespaces; - try { - namespaces = tenantResources().getListOfNamespaces(tenant); - } catch (Exception e) { - log.error("[{}] Failed to get namespaces list of {}", clientAppId(), tenant, e); - asyncResponse.resume(new RestException(e)); - return; } - - final List> futures = Lists.newArrayList(); - try { - for (String namespace : namespaces) { - futures.add(pulsar().getAdminClient().namespaces().deleteNamespaceAsync(namespace, true)); - } - } catch (Exception e) { - log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e); - asyncResponse.resume(new RestException(e)); - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - } else { - log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, exception); - asyncResponse.resume(new RestException(exception.getCause())); - } - return null; - } - - // delete tenant normally - internalDeleteTenant(asyncResponse, tenant); - - asyncResponse.resume(Response.noContent().build()); - return null; - }); + return tenantResources().getListOfNamespacesAsync(tenant) + .thenApply(namespaces -> { + final List> futures = Lists.newArrayList(); + try { + PulsarAdmin adminClient = pulsar().getAdminClient(); + for (String namespace : namespaces) { + futures.add(adminClient.namespaces().deleteNamespaceAsync(namespace, true)); + } + } catch (Exception e) { + log.error("[{}] Failed to force delete namespaces {}", clientAppId(), namespaces, e); + throw new RestException(e); + } + return futures; + }) + .thenCompose(futures -> FutureUtil.waitForAll(futures)) + .thenCompose(__ -> internalDeleteTenantAsync(tenant)); } - private void validateClusters(TenantInfo info) { + private CompletableFuture validateClustersAsync(TenantInfo info) { // empty cluster shouldn't be allowed if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)) .collect(Collectors.toSet()).isEmpty() || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) { log.warn("[{}] Failed to validate due to clusters are empty", clientAppId()); - throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty")); } - - List nonexistentClusters; - try { - Set availableClusters = clusterResources().list(); + return clusterResources().listAsync().thenAccept(availableClusters -> { Set allowedClusters = info.getAllowedClusters(); - nonexistentClusters = allowedClusters.stream().filter( - cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster))) + List nonexistentClusters = allowedClusters.stream() + .filter(cluster -> !(availableClusters.contains(cluster) || GLOBAL_CLUSTER.equals(cluster))) .collect(Collectors.toList()); - } catch (Exception e) { - log.error("[{}] Failed to get available clusters", clientAppId(), e); - throw new RestException(e); - } - if (nonexistentClusters.size() > 0) { - log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters); - throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist"); - } + if (nonexistentClusters.size() > 0) { + log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters); + throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist"); + } + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 38c61d3688d77..e491f78c9e23e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -40,6 +40,7 @@ import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -64,6 +65,8 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.common.naming.Constants; @@ -1042,6 +1045,17 @@ public void validatePoliciesReadOnlyAccess() { } } + public CompletableFuture validatePoliciesReadOnlyAccessAsync() { + return namespaceResources().getPoliciesReadOnlyAsync().thenAccept(readOnly -> { + if (readOnly) { + if (log.isDebugEnabled()) { + log.debug("Policies are read-only. Broker cannot do read-write operations"); + } + throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); + } + }); + } + protected CompletableFuture hasActiveNamespace(String tenant) { return tenantResources().hasActiveNamespace(tenant); } @@ -1183,4 +1197,17 @@ public T sync(Supplier> supplier) { throw new RestException(ex); } } + + protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { + Throwable realCause = FutureUtil.unwrapCompletionException(exception); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else if (realCause instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); + } else if (realCause instanceof PulsarAdminException) { + asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); + } else { + asyncResponse.resume(new RestException(realCause)); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 583a87275bda4..23c50f3879e6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -411,7 +411,7 @@ public void clusters() throws Exception { public void properties() throws Throwable { Object response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList()); - verify(properties, times(1)).validateSuperUserAccess(); + verify(properties, times(1)).validateSuperUserAccessAsync(); // create local cluster asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); @@ -423,22 +423,22 @@ public void properties() throws Throwable { .allowedClusters(allowedClusters) .build(); response = asyncRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo)); - verify(properties, times(2)).validateSuperUserAccess(); + verify(properties, times(2)).validateSuperUserAccessAsync(); response = asyncRequests(ctx -> properties.getTenants(ctx)); assertEquals(response, Lists.newArrayList("test-property")); - verify(properties, times(3)).validateSuperUserAccess(); + verify(properties, times(3)).validateSuperUserAccessAsync(); response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertEquals(response, tenantInfo); - verify(properties, times(4)).validateSuperUserAccess(); + verify(properties, times(4)).validateSuperUserAccessAsync(); final TenantInfoImpl newPropertyAdmin = TenantInfoImpl.builder() .adminRoles(Sets.newHashSet("role1", "other-role")) .allowedClusters(allowedClusters) .build(); response = asyncRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin)); - verify(properties, times(5)).validateSuperUserAccess(); + verify(properties, times(5)).validateSuperUserAccessAsync(); // Wait for updateTenant to take effect Thread.sleep(100); @@ -447,7 +447,7 @@ public void properties() throws Throwable { assertEquals(response, newPropertyAdmin); response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, "test-property")); assertNotSame(response, tenantInfo); - verify(properties, times(7)).validateSuperUserAccess(); + verify(properties, times(7)).validateSuperUserAccessAsync(); // Check creating existing property try {