Skip to content

Commit

Permalink
Make some methods in NamespacesBase async.
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed May 10, 2022
1 parent d7a4f7f commit a553d73
Show file tree
Hide file tree
Showing 9 changed files with 410 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
}

public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
Expand Down Expand Up @@ -110,6 +110,40 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
List<String> namespaces = new ArrayList<>();
// this will return a cluster in v1 and a namespace in v2
CompletableFuture<List<CompletableFuture<Void>>> result = getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenApply(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
ret = ret.thenCompose(__ -> getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenAccept(opt -> opt.map(k -> namespaces.add(namespace)))
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return null;
}
throw FutureUtil.wrapToCompletionException(ex);
}));
} else {
children.forEach(ns -> {
namespaces.add(NamespaceName.get(tenant, key, ns).toString());
});
}
return ret;
})).collect(Collectors.toList())
);
return result.thenCompose(futures -> FutureUtil.waitForAll(futures)).thenApply(__ -> namespaces);
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}
}

protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
return namespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain);
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,55 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NamespacesBase extends AdminResource {

protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
if (tenant == null) {
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
}
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
}
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
if (!tenantResources().tenantExists(tenant)) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}

return tenantResources().getListOfNamespaces(tenant);
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
}
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
});
}

protected void internalCreateNamespace(Policies policies) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
validatePolicies(namespaceName, policies);

try {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().createPolicies(namespaceName, policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Namespace already exists");
} catch (Exception e) {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validatePolicies(namespaceName, policies))
.thenCompose(__ -> {
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
ret = tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
.thenAccept(namespaces -> {
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :"
+ namespaceName.getTenant());
}
});
}
return ret;
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
}

protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
Expand Down
Loading

0 comments on commit a553d73

Please sign in to comment.