Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make some methods of ClusterBase pure async. #15437

Merged
merged 10 commits into from
May 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -397,31 +396,49 @@ private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Map<String, ? extends NamespaceIsolationData> getNamespaceIsolationPolicies(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
if (!clusterResources().clusterExists(cluster)) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
}
public void getNamespaceIsolationPolicies(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.NOT_FOUND))
.thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
return nsIsolationPolicies.getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
throw new RestException(e);
}
/**
* Verify that the cluster exists.
* For compatibility to avoid breaking changes, we can specify a REST status code when it doesn't exist.
* @param cluster Cluster name
* @param notExistStatus REST status code
*/
private CompletableFuture<Void> validateClusterExistAsync(String cluster, Status notExistStatus) {
return clusterResources().clusterExistsAsync(cluster)
.thenAccept(clusterExist -> {
if (!clusterExist) {
throw new RestException(notExistStatus, "Cluster " + cluster + " does not exist.");
}
});
}

private CompletableFuture<Map<String, NamespaceIsolationDataImpl>> internalGetNamespaceIsolationPolicies(
String cluster) {
return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster)
.thenApply(namespaceIsolationPolicies -> {
if (!namespaceIsolationPolicies.isPresent()) {
throw new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
}
return namespaceIsolationPolicies.get().getPolicies();
});
}


@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
Expand All @@ -435,40 +452,28 @@ private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public NamespaceIsolationData getNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The name of the namespace isolation policy",
required = true
)
public void getNamespaceIsolationPolicy(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster,
@ApiParam(value = "The name of the namespace isolation policy", required = true)
@PathParam("policyName") String policyName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
clientAppId(), policyName, cluster);
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
return nsIsolationPolicies.getPolicies().get(policyName);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
throw new RestException(e);
}
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
.thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(policies -> {
// construct the response to Namespace isolation data map
if (!policies.containsKey(policyName)) {
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
asyncResponse.resume(policies.get(policyName));
}).exceptionally(ex -> {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}",
clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand All @@ -485,53 +490,44 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
public void getBrokersWithNamespaceIsolationPolicy(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccess();
validateClusterExists(cluster);

Set<String> availableBrokers;
Map<String, ? extends NamespaceIsolationData> nsPolicies;
try {
availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
} catch (Exception e) {
log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return availableBrokers.stream().map(broker -> {
BrokerNamespaceIsolationData.Builder brokerIsolationData = BrokerNamespaceIsolationData.builder()
.brokerName(broker);
if (nsPolicies != null) {
List<String> namespaceRegexes = new ArrayList<>();
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
namespaceRegexes.addAll(policyData.getNamespaces());
if (nsPolicyImpl.isPrimaryBroker(broker)) {
brokerIsolationData.primary(true);
}
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
.thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenCompose(availableBrokers -> internalGetNamespaceIsolationPolicies(cluster)
.thenApply(policies -> availableBrokers.stream()
.map(broker -> internalGetBrokerNsIsolationData(broker, policies))
.collect(Collectors.toList())))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

brokerIsolationData.namespaceRegex(namespaceRegexes);
}
private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
String broker,
Map<String, NamespaceIsolationDataImpl> policies) {
BrokerNamespaceIsolationData.Builder brokerIsolationData =
BrokerNamespaceIsolationData.builder().brokerName(broker);
if (policies == null) {
return brokerIsolationData.build();
}).collect(Collectors.toList());
}
List<String> namespaceRegexes = new ArrayList<>();
policies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
namespaceRegexes.addAll(policyData.getNamespaces());
brokerIsolationData.primary(nsPolicyImpl.isPrimaryBroker(broker));
brokerIsolationData.policyName(name);
}
});
brokerIsolationData.namespaceRegex(namespaceRegexes);
return brokerIsolationData.build();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,10 @@ public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle)
}

public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
return getLocalNamespaceIsolationPoliciesAsync()
.thenCompose(namespaceIsolationPolicies -> {
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ public void brokerNamespaceIsolationPolicies() throws Exception {
assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);

BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public void internalConfiguration() throws Exception {
}

@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
verify(clusters, never()).validateSuperUserAccessAsync();
Expand Down Expand Up @@ -239,7 +240,7 @@ public void clusters() throws Exception {
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());

try {
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
Expand All @@ -259,7 +260,7 @@ public void clusters() throws Exception {
.build();
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));

try {
asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
Expand All @@ -269,7 +270,8 @@ public void clusters() throws Exception {
}

clusters.deleteNamespaceIsolationPolicy("use", "policy1");
assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx ->
clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());

asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
Expand All @@ -289,7 +291,7 @@ public void clusters() throws Exception {
}

try {
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
Expand Down Expand Up @@ -406,8 +408,8 @@ public void clusters() throws Exception {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
verify(clusters, times(18)).validateSuperUserAccessAsync();
verify(clusters, times(6)).validateSuperUserAccess();
verify(clusters, times(22)).validateSuperUserAccessAsync();
verify(clusters, times(2)).validateSuperUserAccess();
}

Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ public void clusterNamespaceIsolationPolicies() throws PulsarAdminException {
List<BrokerNamespaceIsolationData> isoList = admin.clusters().getBrokersWithNamespaceIsolationPolicy("use");
assertEquals(isoList.size(), 1);
assertTrue(isoList.get(0).isPrimary());
assertEquals(isoList.get(0).getPolicyName(), policyName1);

// verify update of primary
nsPolicyData1.getPrimary().remove(0);
Expand Down