Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make some methods of ClusterBase pure async. #15318

Merged
merged 4 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ClusterResources(MetadataStore store, int operationTimeoutSec) {
}

public CompletableFuture<Set<String>> listAsync() {
return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list));
return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(HashSet::new);
}

public Set<String> list() throws MetadataStoreException {
Expand All @@ -66,6 +66,15 @@ public void createCluster(String clusterName, ClusterData clusterData) throws Me
create(joinPath(BASE_CLUSTERS_PATH, clusterName), clusterData);
}

public CompletableFuture<Void> createClusterAsync(String clusterName, ClusterData clusterData) {
return createAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), clusterData);
}

public CompletableFuture<Void> updateClusterAsync(String clusterName,
Function<ClusterData, ClusterData> modifyFunction) {
return setAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction);
}

public void updateCluster(String clusterName, Function<ClusterData, ClusterData> modifyFunction)
throws MetadataStoreException {
set(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,15 @@ public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
return pulsar().getPulsarResources().getNamespaceResources().getPoliciesReadOnlyAsync()
.thenAccept(arePoliciesReadOnly -> {
if (arePoliciesReadOnly) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
} else {
// Do nothing, just log the message.
log.debug("Broker is allowed to make read-write operations");
if (log.isDebugEnabled()) {
log.debug("Broker is allowed to make read-write operations");
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
Expand All @@ -64,11 +64,12 @@
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClustersBase extends PulsarWebResource {
public class ClustersBase extends AdminResource {

@GET
@ApiOperation(
Expand All @@ -79,16 +80,18 @@ public class ClustersBase extends PulsarWebResource {
@ApiResponse(code = 200, message = "Return a list of clusters."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Set<String> getClusters() throws Exception {
try {
// Remove "global" cluster from returned list
Set<String> clusters = clusterResources().list().stream()
.filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet());
return clusters;
} catch (Exception e) {
log.error("[{}] Failed to get clusters list", clientAppId(), e);
throw new RestException(e);
}
public void getClusters(@Suspended AsyncResponse asyncResponse) {
clusterResources().listAsync()
.thenApply(clusters -> clusters.stream()
// Remove "global" cluster from returned list
.filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster))
.collect(Collectors.toSet()))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get clusters {}", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand All @@ -104,26 +107,19 @@ public Set<String> getClusters() throws Exception {
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public ClusterData getCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();

try {
return clusterResources().getCluster(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
if (e instanceof RestException) {
throw (RestException) e;
} else {
throw new RestException(e);
}
}
public void getCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> clusterResources().getClusterAsync(cluster))
.thenAccept(clusterData -> {
asyncResponse.resume(clusterData
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")));
}).exceptionally(ex -> {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@PUT
Expand All @@ -140,10 +136,8 @@ public ClusterData getCluster(
@ApiResponse(code = 500, message = "Internal server error.")
})
public void createCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
Expand All @@ -158,27 +152,31 @@ public void createCluster(
+ "}"
)
)
)
ClusterDataImpl clusterData
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

try {
NamedEntity.checkName(cluster);
if (clusterResources().getCluster(cluster).isPresent()) {
log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster);
throw new RestException(Status.CONFLICT, "Cluster already exists");
}
clusterResources().createCluster(cluster, clusterData);
log.info("[{}] Created cluster {}", clientAppId(), cluster);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid");
} catch (Exception e) {
log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
) ClusterDataImpl clusterData) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
NamedEntity.checkName(cluster);
return clusterResources().getClusterAsync(cluster);
}).thenCompose(clusterOpt -> {
if (clusterOpt.isPresent()) {
throw new RestException(Status.CONFLICT, "Cluster already exists");
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}
return clusterResources().createClusterAsync(cluster, clusterData);
}).thenAccept(__ -> {
log.info("[{}] Created cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, ex);
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof IllegalArgumentException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cluster name is not valid"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -193,10 +191,8 @@ public void createCluster(
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
Expand All @@ -211,22 +207,23 @@ public void updateCluster(
+ "}"
)
)
)
ClusterDataImpl clusterData
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

try {
clusterResources().updateCluster(cluster, old -> clusterData);
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
) ClusterDataImpl clusterData) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> clusterData))
.thenAccept(__ -> {
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex);
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Loading