Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make some methods in NamespacesBase async. #15518

Merged
merged 7 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resources;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -78,7 +79,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
}

public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
Expand Down Expand Up @@ -110,6 +111,41 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
// this will return a cluster in v1 and a namespace in v2
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
} else {
CompletableFuture<List<String>> ret = new CompletableFuture();
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
.toString()).collect(Collectors.toList()));
return ret;
}
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand Down Expand Up @@ -534,6 +539,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}
}

protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
return namespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain);
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,54 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NamespacesBase extends AdminResource {

protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
if (tenant == null) {
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
}
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
}
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
if (!tenantResources().tenantExists(tenant)) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}

return tenantResources().getListOfNamespaces(tenant);
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
}
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
});
}

protected void internalCreateNamespace(Policies policies) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
validatePolicies(namespaceName, policies);

try {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().createPolicies(namespaceName, policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Namespace already exists");
} catch (Exception e) {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validatePolicies(namespaceName, policies))
.thenCompose(__ -> {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
.thenAccept(namespaces -> {
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :"
+ namespaceName.getTenant());
}
});
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}

protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
Expand Down
Loading