diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 4429afe59395b..d3acd7bbe10b3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -43,7 +43,7 @@ public ClusterResources(MetadataStore store, int operationTimeoutSec) { } public CompletableFuture> listAsync() { - return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list)); + return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(HashSet::new); } public Set list() throws MetadataStoreException { @@ -66,6 +66,15 @@ public void createCluster(String clusterName, ClusterData clusterData) throws Me create(joinPath(BASE_CLUSTERS_PATH, clusterName), clusterData); } + public CompletableFuture createClusterAsync(String clusterName, ClusterData clusterData) { + return createAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), clusterData); + } + + public CompletableFuture updateClusterAsync(String clusterName, + Function modifyFunction) { + return setAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); + } + public void updateCluster(String clusterName, Function modifyFunction) throws MetadataStoreException { set(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index b00226e3a2991..381f5a1f19a9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -142,11 +142,15 @@ public CompletableFuture validatePoliciesReadOnlyAccessAsync() { return pulsar().getPulsarResources().getNamespaceResources().getPoliciesReadOnlyAsync() .thenAccept(arePoliciesReadOnly -> { if (arePoliciesReadOnly) { - log.debug("Policies are read-only. Broker cannot do read-write operations"); + if (log.isDebugEnabled()) { + log.debug("Policies are read-only. Broker cannot do read-write operations"); + } throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); } else { // Do nothing, just log the message. - log.debug("Broker is allowed to make read-write operations"); + if (log.isDebugEnabled()) { + log.debug("Broker is allowed to make read-write operations"); + } } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 17bf1301e4597..6b984930ed7a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -47,8 +47,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources; -import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.common.naming.Constants; @@ -64,11 +64,12 @@ import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ClustersBase extends PulsarWebResource { +public class ClustersBase extends AdminResource { @GET @ApiOperation( @@ -79,16 +80,18 @@ public class ClustersBase extends PulsarWebResource { @ApiResponse(code = 200, message = "Return a list of clusters."), @ApiResponse(code = 500, message = "Internal server error.") }) - public Set getClusters() throws Exception { - try { - // Remove "global" cluster from returned list - Set clusters = clusterResources().list().stream() - .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet()); - return clusters; - } catch (Exception e) { - log.error("[{}] Failed to get clusters list", clientAppId(), e); - throw new RestException(e); - } + public void getClusters(@Suspended AsyncResponse asyncResponse) { + clusterResources().listAsync() + .thenApply(clusters -> clusters.stream() + // Remove "global" cluster from returned list + .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)) + .collect(Collectors.toSet())) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("[{}] Failed to get clusters {}", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -104,26 +107,19 @@ public Set getClusters() throws Exception { @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.") }) - public ClusterData getCluster( - @ApiParam( - value = "The cluster name", - required = true - ) - @PathParam("cluster") String cluster - ) { - validateSuperUserAccess(); - - try { - return clusterResources().getCluster(cluster) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); - } catch (Exception e) { - log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); - if (e instanceof RestException) { - throw (RestException) e; - } else { - throw new RestException(e); - } - } + public void getCluster(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster) { + validateSuperUserAccessAsync() + .thenCompose(__ -> clusterResources().getClusterAsync(cluster)) + .thenAccept(clusterData -> { + asyncResponse.resume(clusterData + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"))); + }).exceptionally(ex -> { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @PUT @@ -140,10 +136,8 @@ public ClusterData getCluster( @ApiResponse(code = 500, message = "Internal server error.") }) public void createCluster( - @ApiParam( - value = "The cluster name", - required = true - ) + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster, @ApiParam( value = "The cluster data", @@ -158,27 +152,31 @@ public void createCluster( + "}" ) ) - ) - ClusterDataImpl clusterData - ) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - - try { - NamedEntity.checkName(cluster); - if (clusterResources().getCluster(cluster).isPresent()) { - log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster); - throw new RestException(Status.CONFLICT, "Cluster already exists"); - } - clusterResources().createCluster(cluster, clusterData); - log.info("[{}] Created cluster {}", clientAppId(), cluster); - } catch (IllegalArgumentException e) { - log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e); - throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid"); - } catch (Exception e) { - log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e); - throw new RestException(e); - } + ) ClusterDataImpl clusterData) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> { + NamedEntity.checkName(cluster); + return clusterResources().getClusterAsync(cluster); + }).thenCompose(clusterOpt -> { + if (clusterOpt.isPresent()) { + throw new RestException(Status.CONFLICT, "Cluster already exists"); + } + return clusterResources().createClusterAsync(cluster, clusterData); + }).thenAccept(__ -> { + log.info("[{}] Created cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, ex); + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof IllegalArgumentException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Cluster name is not valid")); + return null; + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -193,10 +191,8 @@ public void createCluster( @ApiResponse(code = 500, message = "Internal server error.") }) public void updateCluster( - @ApiParam( - value = "The cluster name", - required = true - ) + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster, @ApiParam( value = "The cluster data", @@ -211,22 +207,23 @@ public void updateCluster( + "}" ) ) - ) - ClusterDataImpl clusterData - ) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); - - try { - clusterResources().updateCluster(cluster, old -> clusterData); - log.info("[{}] Updated cluster {}", clientAppId(), cluster); - } catch (NotFoundException e) { - log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster); - throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); - } catch (Exception e) { - log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e); - throw new RestException(e); - } + ) ClusterDataImpl clusterData) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> clusterData)) + .thenAccept(__ -> { + log.info("[{}] Updated cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex); + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist")); + return null; + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index f506feedd33ed..1dd1f871a9f71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -201,18 +201,19 @@ public void internalConfiguration() throws Exception { @Test public void clusters() throws Exception { - assertEquals(clusters.getClusters(), Lists.newArrayList()); - verify(clusters, never()).validateSuperUserAccess(); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); + verify(clusters, never()).validateSuperUserAccessAsync(); - clusters.createCluster("use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); - verify(clusters, times(1)).validateSuperUserAccess(); + asynRequests(ctx -> clusters.createCluster(ctx, + "use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); // ensure to read from ZooKeeper directly //clusters.clustersListCache().clear(); - assertEquals(clusters.getClusters(), Lists.newArrayList("use")); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet("use")); // Check creating existing cluster try { - clusters.createCluster("use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); + asynRequests(ctx -> clusters.createCluster(ctx, "use", + ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); @@ -226,14 +227,14 @@ public void clusters() throws Exception { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); } - assertEquals(clusters.getCluster("use"), ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); - verify(clusters, times(4)).validateSuperUserAccess(); + assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), + ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()); - clusters.updateCluster("use", ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()); - verify(clusters, times(5)).validateSuperUserAccess(); + asynRequests(ctx -> clusters.updateCluster(ctx, "use", + ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build())); - assertEquals(clusters.getCluster("use"), ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()); - verify(clusters, times(6)).validateSuperUserAccess(); + assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")), + ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()); try { clusters.getNamespaceIsolationPolicies("use"); @@ -269,18 +270,17 @@ public void clusters() throws Exception { assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty()); clusters.deleteCluster("use"); - verify(clusters, times(13)).validateSuperUserAccess(); - assertEquals(clusters.getClusters(), Lists.newArrayList()); + assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); try { - clusters.getCluster("use"); + asynRequests(ctx -> clusters.getCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); } try { - clusters.updateCluster("use", ClusterDataImpl.builder().build()); + asynRequests(ctx -> clusters.updateCluster(ctx, "use", ClusterDataImpl.builder().build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 404); @@ -307,7 +307,7 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - clusters.getClusters(); + asynRequests(ctx -> clusters.getClusters(ctx)); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -318,7 +318,8 @@ public void clusters() throws Exception { && path.equals("/admin/clusters/test"); }); try { - clusters.createCluster("test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build()); + asynRequests(ctx -> clusters.createCluster(ctx, "test", + ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -331,7 +332,8 @@ public void clusters() throws Exception { clusterCache.invalidateAll(); store.invalidateAll(); try { - clusters.updateCluster("test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build()); + asynRequests(ctx -> clusters.updateCluster(ctx, "test", + ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build())); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -343,7 +345,7 @@ public void clusters() throws Exception { }); try { - clusters.getCluster("test"); + asynRequests(ctx -> clusters.getCluster(ctx, "test")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -377,7 +379,8 @@ public void clusters() throws Exception { // Check name validations try { - clusters.createCluster("bf@", ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build()); + asynRequests(ctx -> clusters.createCluster(ctx, "bf@", + ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build())); fail("should have filed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); @@ -385,7 +388,7 @@ public void clusters() throws Exception { // Check authentication and listener name try { - clusters.createCluster("auth", ClusterDataImpl.builder() + asynRequests(ctx -> clusters.createCluster(ctx, "auth", ClusterDataImpl.builder() .serviceUrl("http://dummy.web.example.com") .serviceUrlTls("") .brokerServiceUrl("http://dummy.messaging.example.com") @@ -393,14 +396,16 @@ public void clusters() throws Exception { .authenticationPlugin("authenticationPlugin") .authenticationParameters("authenticationParameters") .listenerName("listenerName") - .build()); - ClusterData cluster = clusters.getCluster("auth"); + .build())); + ClusterData cluster = (ClusterData) asynRequests(ctx -> clusters.getCluster(ctx, "auth")); assertEquals(cluster.getAuthenticationPlugin(), "authenticationPlugin"); assertEquals(cluster.getAuthenticationParameters(), "authenticationParameters"); assertEquals(cluster.getListenerName(), "listenerName"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } + verify(clusters, times(13)).validateSuperUserAccessAsync(); + verify(clusters, times(11)).validateSuperUserAccess(); } Object asynRequests(Consumer function) throws Exception { @@ -419,7 +424,7 @@ public void properties() throws Throwable { verify(properties, times(1)).validateSuperUserAccess(); // create local cluster - clusters.createCluster(configClusterName, ClusterDataImpl.builder().build()); + asynRequests(ctx -> clusters.createCluster(ctx, configClusterName, ClusterDataImpl.builder().build())); Set allowedClusters = Sets.newHashSet(); allowedClusters.add(configClusterName); @@ -633,10 +638,10 @@ public void properties() throws Throwable { @Test @SuppressWarnings("unchecked") public void brokers() throws Exception { - clusters.createCluster("use", ClusterDataImpl.builder() + asynRequests(ctx -> clusters.createCluster(ctx, "use", ClusterDataImpl.builder() .serviceUrl("http://broker.messaging.use.example.com") .serviceUrlTls("https://broker.messaging.use.example.com:4443") - .build()); + .build())); URI requestUri = new URI( "http://broker.messaging.use.example.com:8080/admin/brokers/use"); @@ -698,7 +703,7 @@ public void resourceQuotas() throws Exception { .allowedClusters(Collections.singleton(cluster)) .build(); ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build(); - clusters.createCluster(cluster, clusterData ); + asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData )); asynRequests(ctx -> properties.createTenant(ctx, property, admin)); // customized bandwidth for this namespace