Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: Revert PRs 15518 and 15603 #70

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.resources;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -79,7 +78,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
}

public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
Expand Down Expand Up @@ -111,41 +110,6 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
// this will return a cluster in v1 and a namespace in v2
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
} else {
CompletableFuture<List<String>> ret = new CompletableFuture();
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
.toString()).collect(Collectors.toList()));
return ret;
}
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -321,11 +323,6 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand Down Expand Up @@ -537,11 +534,6 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}
}

protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
return namespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain);
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
Expand Down Expand Up @@ -739,6 +731,19 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
return future;
}

protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
asyncResponse.resume(new RestException(realCause));
}
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,54 +103,60 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NamespacesBase extends AdminResource {

protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
if (tenant == null) {
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
}
protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
}
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
if (!tenantResources().tenantExists(tenant)) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}

return tenantResources().getListOfNamespaces(tenant);
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
}
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
});
}

protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validatePolicies(namespaceName, policies))
.thenCompose(__ -> {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
.thenAccept(namespaces -> {
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :"
+ namespaceName.getTenant());
}
});
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
protected void internalCreateNamespace(Policies policies) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
validatePolicies(namespaceName, policies);

try {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().createPolicies(namespaceName, policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Namespace already exists");
} catch (Exception e) {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
Expand Down Expand Up @@ -791,18 +797,16 @@ protected void internalSetNamespaceReplicationClusters(List<String> clusterIds)
});
}

protected CompletableFuture<Void> internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
if (messageTTL != null && messageTTL < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL, message TTL must >= 0");
}
}).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.message_ttl_in_seconds = messageTTL;
return policies;
}));
protected void internalSetNamespaceMessageTTL(Integer messageTTL) {
validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (messageTTL != null && messageTTL < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}
updatePolicies(namespaceName, policies -> {
policies.message_ttl_in_seconds = messageTTL;
return policies;
});
}

protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
Expand Down Expand Up @@ -908,13 +912,13 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons
internalSetAutoSubscriptionCreation(asyncResponse, null);
}

protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
}));
protected void internalModifyDeduplication(Boolean enableDeduplication) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
updatePolicies(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
});
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -2148,10 +2152,9 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
}
}

protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.deduplicationEnabled);
protected Boolean internalGetDeduplication() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).deduplicationEnabled;
}

protected Integer internalGetMaxConsumersPerTopic() {
Expand Down
Loading