From 52438cc4e794cc3c95d9e503f58bd76ce5a18867 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 12 May 2022 23:48:27 -0500 Subject: [PATCH 1/8] Make Implicit Subscription Permission Configurable --- .../PulsarAuthorizationProvider.java | 14 +++-- .../broker/admin/impl/NamespacesBase.java | 18 +++++++ .../pulsar/broker/admin/v1/Namespaces.java | 43 ++++++++++++++++ .../pulsar/broker/admin/v2/Namespaces.java | 42 +++++++++++++++ .../pulsar/client/admin/Namespaces.java | 51 +++++++++++++++++++ .../common/policies/data/AuthPolicies.java | 9 ++++ .../client/admin/internal/NamespacesImpl.java | 49 ++++++++++++++++++ .../pulsar/admin/cli/CmdNamespaces.java | 43 ++++++++++++++++ .../admin/internal/data/AuthPoliciesImpl.java | 16 +++++- 9 files changed, 280 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 77e7edb64d7c6..31dc0b17bdf86 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -107,16 +107,21 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) .thenCompose(policies -> { if (!policies.isPresent()) { + // TODO this case seems like it could bypass authorization checks. if (log.isDebugEnabled()) { log.debug("Policies node couldn't be found for topic : {}", topicName); } } else { if (isNotBlank(subscription)) { - // validate if role is authorized to access subscription. (skip validation if authorization - // list is empty) + // Reject request if role is unauthorized to access subscription. + // If implicitSubscriptionAuthentication is enabled, set of roles must be null or empty, or + // role must be in set of roles. Otherwise, role must be in the set of roles. Set roles = policies.get().auth_policies .getSubscriptionAuthentication().get(subscription); - if (roles != null && !roles.isEmpty() && !roles.contains(role)) { + boolean isUnauthorized = policies.get().auth_policies.isImplicitSubscriptionAuth() + ? (roles != null && !roles.isEmpty() && !roles.contains(role)) + : (roles == null || roles.isEmpty() || !roles.contains(role)); + if (isUnauthorized) { log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); return CompletableFuture.completedFuture(false); } @@ -482,6 +487,8 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam case GET_TOPICS: case GET_BUNDLE: return allowConsumeOrProduceOpsAsync(namespaceName, role, authData); + // TODO these only require ability to consume on namespace; ignore namespace's subscription + // permission. case UNSUBSCRIBE: case CLEAR_BACKLOG: return allowTheSpecifiedActionOpsAsync( @@ -536,6 +543,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, return canLookupAsync(topicName, role, authData); case PRODUCE: return canProduceAsync(topicName, role, authData); + // TODO consume from single subscription lets role view all subscriptions on a topic case GET_SUBSCRIPTIONS: case CONSUME: case SUBSCRIBE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 49a221946ee35..ddd506d0cf0bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -698,6 +698,24 @@ protected void internalGrantPermissionOnNamespace(String role, Set a } } + protected boolean getImplicitPermissionOnSubscription() { + validateNamespaceOperation(namespaceName, NamespaceOperation.GET_PERMISSION); + Policies policies = getNamespacePolicies(namespaceName); + return policies.auth_policies.isImplicitSubscriptionAuth(); + } + + protected void internalSetImplicitPermissionOnSubscription(boolean isImplicitPermissionOnSubscription) { + if (isImplicitPermissionOnSubscription) { + validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); + } else { + validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION); + } + validatePoliciesReadOnlyAccess(); + updatePolicies(namespaceName, policies -> { + policies.auth_policies.setImplicitSubscriptionAuth(isImplicitPermissionOnSubscription); + return policies; + }); + } protected void internalGrantPermissionOnSubscription(String subscription, Set roles) { validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index c21c8b8910f5c..d0cfdc39ef983 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -314,6 +314,49 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert internalRevokePermissionsOnSubscription(subscription, role); } + @PUT + @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a" + + " subscription.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 501, message = "Authorization is not enabled")}) + public void grantImplicitPermissionOnSubscription( + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + internalSetImplicitPermissionOnSubscription(true); + } + + @DELETE + @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a" + + " subscription.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 501, message = "Authorization is not enabled")}) + public void revokeImplicitPermissionOnSubscription( + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + internalSetImplicitPermissionOnSubscription(false); + } + + @GET + @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(value = "Get permission on subscription required for namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Namespace is not empty")}) + public boolean getImplicitPermissionOnSubscription(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + return getImplicitPermissionOnSubscription(); + } + @GET @Path("/{property}/{cluster}/{namespace}/replication") @ApiOperation(hidden = true, value = "Get the replication clusters for a namespace.", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index e3f82b72d2ad8..9de009f3d0e9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -267,6 +267,48 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert internalRevokePermissionsOnSubscription(subscription, role); } + @PUT + @Path("/{property}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a" + + " subscription.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 501, message = "Authorization is not enabled")}) + public void grantImplicitPermissionOnSubscription( + @PathParam("property") String property, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, namespace); + internalSetImplicitPermissionOnSubscription(true); + } + + @DELETE + @Path("/{property}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a" + + " subscription.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 501, message = "Authorization is not enabled")}) + public void revokeImplicitPermissionOnSubscription( + @PathParam("property") String property, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, namespace); + internalSetImplicitPermissionOnSubscription(false); + } + + @GET + @Path("/{property}/{namespace}/implicitPermissionOnSubscription") + @ApiOperation(value = "Get permission on subscription required for namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Namespace is not empty")}) + public boolean getRequirePermissionOnSubscriptions(@PathParam("property") String property, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, namespace); + return getImplicitPermissionOnSubscription(); + } + @GET @Path("/{tenant}/{namespace}/replication") @ApiOperation(value = "Get the replication clusters for a namespace.", diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index f279f70839c1d..359fe0bd91ea5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -737,6 +737,57 @@ CompletableFuture grantPermissionOnSubscriptionAsync( */ CompletableFuture revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role); + /** + * Get whether a namespace allows implicit permission to consume from a subscription. + * + * @param namespace Pulsar namespace name + * @return + * @throws PulsarAdminException + */ + boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException; + + /** + * Get whether a namespace allows implicit permission to consume from a subscription. + * @param namespace Pulsar namespace name + * @return + */ + CompletableFuture getImplicitPermissionOnSubscriptionAsync(String namespace); + + /** + * Grant all roles implicit permission to consume from a subscription if no subscription permission is defined + * for that subscription in the namespace. + * @param namespace Pulsar namespace name + * @throws PulsarAdminException + */ + void grantImplicitPermissionOnSubscription(String namespace) + throws PulsarAdminException; + + /** + * Grant all roles implicit permission to consume from a subscription if no subscription permission is defined + * for that subscription in the namespace. + * @param namespace Pulsar namespace name + * @return + */ + CompletableFuture grantImplicitPermissionOnSubscriptionAsync(String namespace); + + /** + * Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined + * for that subscription in the namespace. + * @param namespace Pulsar namespace name + * @throws PulsarAdminException + */ + void revokeImplicitPermissionOnSubscription(String namespace) + throws PulsarAdminException; + + /** + * Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined + * for that subscription in the namespace. + * @param namespace Pulsar namespace name + * @return + */ + CompletableFuture revokeImplicitPermissionOnSubscriptionAsync(String namespace); + + /** * Get the replication clusters for a namespace. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java index 6ca81170d6946..77e2cab592b9e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java @@ -30,6 +30,14 @@ public interface AuthPolicies { Map>> getTopicAuthentication(); Map> getSubscriptionAuthentication(); + /** + * Whether an empty set of subscription authentication roles returned by {@link #getSubscriptionAuthentication()} + * implicitly grants permission to consume from the target subscription. + * @return + */ + boolean isImplicitSubscriptionAuth(); + void setImplicitSubscriptionAuth(boolean implicitSubscriptionAuth); + static Builder builder() { return ReflectionUtils.newBuilder("org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl"); } @@ -39,5 +47,6 @@ interface Builder { Builder namespaceAuthentication(Map> namespaceAuthentication); Builder topicAuthentication(Map>> topicAuthentication); Builder subscriptionAuthentication(Map> subscriptionAuthentication); + Builder implicitSubscriptionAuth(boolean implicitSubscriptionAuth); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 2058be5c1993f..7951fa5991a32 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -425,6 +425,55 @@ public CompletableFuture revokePermissionOnSubscriptionAsync( return asyncDeleteRequest(path); } + @Override + public void grantImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { + sync(() -> grantImplicitPermissionOnSubscriptionAsync(namespace)); + } + + @Override + public CompletableFuture grantImplicitPermissionOnSubscriptionAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override + public void revokeImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { + sync(() -> revokeImplicitPermissionOnSubscriptionAsync(namespace)); + } + + @Override + public CompletableFuture revokeImplicitPermissionOnSubscriptionAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); + return asyncDeleteRequest(path); + } + + @Override + public boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { + return sync(() -> getImplicitPermissionOnSubscriptionAsync(namespace)); + } + + @Override + public CompletableFuture getImplicitPermissionOnSubscriptionAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public List getNamespaceReplicationClusters(String namespace) throws PulsarAdminException { return sync(() -> getNamespaceReplicationClustersAsync(namespace)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 80d0ddf3aafeb..04b63efde160e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -290,6 +290,45 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = + "Get whether a namespace allows implicit permission to consume from a subscription.") + private class ImplicitSubscriptionPermission extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(getAdmin().namespaces().getImplicitPermissionOnSubscription(namespace)); + } + } + + @Parameters(commandDescription = "Grant all roles implicit permission to consume from a subscription if no " + + "subscription permission is defined for that subscription in the namespace.") + private class GrantImplicitSubscriptionPermission extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().grantImplicitPermissionOnSubscription(namespace); + } + } + + @Parameters(commandDescription = "Revoke implicit permission for any role to consume from a subscription if no " + + "subscription permission is defined for that subscription in the namespace.") + private class RevokeImplicitSubscriptionPermission extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().revokeImplicitPermissionOnSubscription(namespace); + } + } + @Parameters(commandDescription = "Get the permissions on a namespace") private class Permissions extends CliCommand { @Parameter(description = "tenant/namespace", required = true) @@ -2581,6 +2620,10 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions()); jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions()); + jcommander.addCommand("implicit-subscription-permission", new ImplicitSubscriptionPermission()); + jcommander.addCommand("grant-implicit-subscription-permission", new GrantImplicitSubscriptionPermission()); + jcommander.addCommand("revoke-implicit-subscription-permission", new RevokeImplicitSubscriptionPermission()); + jcommander.addCommand("set-clusters", new SetReplicationClusters()); jcommander.addCommand("get-clusters", new GetReplicationClusters()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java index 166fe2d59d023..cc27e9e0cb666 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java @@ -42,6 +42,10 @@ public final class AuthPoliciesImpl implements AuthPolicies { @JsonProperty("subscription_auth_roles") private Map> subscriptionAuthentication = new TreeMap<>(); + // Default value is set in the builder + @JsonProperty(value = "implicit_subscription_auth") + private boolean implicitSubscriptionAuth; + public static AuthPolicies.Builder builder() { return new AuthPoliciesImplBuilder(); } @@ -51,6 +55,7 @@ public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder { private Map> namespaceAuthentication = new TreeMap<>(); private Map>> topicAuthentication = new TreeMap<>();; private Map> subscriptionAuthentication = new TreeMap<>();; + private boolean implicitSubscriptionAuth = true; AuthPoliciesImplBuilder() { } @@ -73,14 +78,21 @@ public AuthPoliciesImplBuilder subscriptionAuthentication( return this; } + public AuthPoliciesImplBuilder implicitSubscriptionAuth(boolean implicitSubscriptionAuth) { + this.implicitSubscriptionAuth = implicitSubscriptionAuth; + return this; + } + public AuthPoliciesImpl build() { - return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication); + return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication, + implicitSubscriptionAuth); } public String toString() { return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication=" - + this.subscriptionAuthentication + ")"; + + this.subscriptionAuthentication + ", implicitSubscriptionAuth=" + + this.implicitSubscriptionAuth + ")"; } } } From 93c9940d8413e42abab1318fa5b0c11d39eb6c01 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 18 May 2022 00:01:50 -0500 Subject: [PATCH 2/8] Replace implicit with required in feature name --- .../PulsarAuthorizationProvider.java | 10 ++--- .../broker/admin/impl/NamespacesBase.java | 14 +++---- .../pulsar/broker/admin/v1/Namespaces.java | 36 +++++----------- .../pulsar/broker/admin/v2/Namespaces.java | 30 ++++---------- .../pulsar/client/admin/Namespaces.java | 41 ++++++------------- .../common/policies/data/AuthPolicies.java | 8 ++-- .../client/admin/internal/NamespacesImpl.java | 32 +++++---------- .../pulsar/admin/cli/CmdNamespaces.java | 38 ++++++++--------- .../admin/internal/data/AuthPoliciesImpl.java | 17 ++++---- 9 files changed, 84 insertions(+), 142 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 31dc0b17bdf86..2c66588dbf47f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -114,13 +114,13 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro } else { if (isNotBlank(subscription)) { // Reject request if role is unauthorized to access subscription. - // If implicitSubscriptionAuthentication is enabled, set of roles must be null or empty, or - // role must be in set of roles. Otherwise, role must be in the set of roles. + // If subscriptionAuthRequired is enabled, role must be in the set of roles. + // Otherwise, set of roles must be null or empty, or role must be in set of roles. Set roles = policies.get().auth_policies .getSubscriptionAuthentication().get(subscription); - boolean isUnauthorized = policies.get().auth_policies.isImplicitSubscriptionAuth() - ? (roles != null && !roles.isEmpty() && !roles.contains(role)) - : (roles == null || roles.isEmpty() || !roles.contains(role)); + boolean isUnauthorized = policies.get().auth_policies.isSubscriptionAuthRequired() + ? (roles == null || roles.isEmpty() || !roles.contains(role)) + : (roles != null && !roles.isEmpty() && !roles.contains(role)); if (isUnauthorized) { log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); return CompletableFuture.completedFuture(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ddd506d0cf0bb..51c66ed92eece 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -698,21 +698,21 @@ protected void internalGrantPermissionOnNamespace(String role, Set a } } - protected boolean getImplicitPermissionOnSubscription() { + protected boolean getPermissionOnSubscriptionRequired() { validateNamespaceOperation(namespaceName, NamespaceOperation.GET_PERMISSION); Policies policies = getNamespacePolicies(namespaceName); - return policies.auth_policies.isImplicitSubscriptionAuth(); + return policies.auth_policies.isSubscriptionAuthRequired(); } - protected void internalSetImplicitPermissionOnSubscription(boolean isImplicitPermissionOnSubscription) { - if (isImplicitPermissionOnSubscription) { - validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); - } else { + protected void internalSetPermissionOnSubscriptionRequired(boolean permissionOnSubscriptionRequired) { + if (permissionOnSubscriptionRequired) { validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION); + } else { + validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); } validatePoliciesReadOnlyAccess(); updatePolicies(namespaceName, policies -> { - policies.auth_policies.setImplicitSubscriptionAuth(isImplicitPermissionOnSubscription); + policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired); return policies; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index d0cfdc39ef983..8eb2773856b47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -315,46 +315,32 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert } @PUT - @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") - @ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a" - + " subscription.") + @Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired") + @ApiOperation(hidden = true, value = "Set whether a role requires explicit permission to consume from a " + + "subscription that has no subscription permission defined in the namespace.") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 501, message = "Authorization is not enabled")}) - public void grantImplicitPermissionOnSubscription( + public void setPermissionOnSubscriptionRequired( @PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { - validateNamespaceName(property, cluster, namespace); - internalSetImplicitPermissionOnSubscription(true); - } - - @DELETE - @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") - @ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a" - + " subscription.") - @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), - @ApiResponse(code = 409, message = "Concurrent modification"), - @ApiResponse(code = 501, message = "Authorization is not enabled")}) - public void revokeImplicitPermissionOnSubscription( - @PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + @PathParam("namespace") String namespace, boolean permissionOnSubscriptionRequired) { validateNamespaceName(property, cluster, namespace); - internalSetImplicitPermissionOnSubscription(false); + internalSetPermissionOnSubscriptionRequired(permissionOnSubscriptionRequired); } @GET - @Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription") - @ApiOperation(value = "Get permission on subscription required for namespace.") + @Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired") + @ApiOperation(value = "Get whether a role requires explicit permission to consume from a " + + "subscription that has no subscription permission defined in the namespace.") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")}) - public boolean getImplicitPermissionOnSubscription(@PathParam("property") String property, + public boolean getPermissionOnSubscriptionRequired(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return getImplicitPermissionOnSubscription(); + return getPermissionOnSubscriptionRequired(); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 9de009f3d0e9b..dad31397242d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -268,45 +268,31 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert } @PUT - @Path("/{property}/{namespace}/implicitPermissionOnSubscription") + @Path("/{property}/{namespace}/permissionOnSubscriptionRequired") @ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a" + " subscription.") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 501, message = "Authorization is not enabled")}) - public void grantImplicitPermissionOnSubscription( + public void setPermissionOnSubscriptionRequired( @PathParam("property") String property, - @PathParam("namespace") String namespace) { - validateNamespaceName(property, namespace); - internalSetImplicitPermissionOnSubscription(true); - } - - @DELETE - @Path("/{property}/{namespace}/implicitPermissionOnSubscription") - @ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a" - + " subscription.") - @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), - @ApiResponse(code = 409, message = "Concurrent modification"), - @ApiResponse(code = 501, message = "Authorization is not enabled")}) - public void revokeImplicitPermissionOnSubscription( - @PathParam("property") String property, - @PathParam("namespace") String namespace) { + @PathParam("namespace") String namespace, + boolean required) { validateNamespaceName(property, namespace); - internalSetImplicitPermissionOnSubscription(false); + internalSetPermissionOnSubscriptionRequired(required); } @GET - @Path("/{property}/{namespace}/implicitPermissionOnSubscription") + @Path("/{property}/{namespace}/permissionOnSubscriptionRequired") @ApiOperation(value = "Get permission on subscription required for namespace.") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")}) - public boolean getRequirePermissionOnSubscriptions(@PathParam("property") String property, + public boolean getPermissionOnSubscriptionRequired(@PathParam("property") String property, @PathParam("namespace") String namespace) { validateNamespaceName(property, namespace); - return getImplicitPermissionOnSubscription(); + return getPermissionOnSubscriptionRequired(); } @GET diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 359fe0bd91ea5..d2a2028487c98 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -738,55 +738,40 @@ CompletableFuture grantPermissionOnSubscriptionAsync( CompletableFuture revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role); /** - * Get whether a namespace allows implicit permission to consume from a subscription. + * Get whether a role requires explicit permission to consume from a subscription that has no subscription + * permission defined in the namespace. * * @param namespace Pulsar namespace name * @return * @throws PulsarAdminException */ - boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException; + boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException; /** - * Get whether a namespace allows implicit permission to consume from a subscription. + * Get whether a role requires explicit permission to consume from a subscription that has no subscription + * permission defined in the namespace. * @param namespace Pulsar namespace name * @return */ - CompletableFuture getImplicitPermissionOnSubscriptionAsync(String namespace); + CompletableFuture getPermissionOnSubscriptionRequiredAsync(String namespace); /** - * Grant all roles implicit permission to consume from a subscription if no subscription permission is defined - * for that subscription in the namespace. + * Set whether a role requires explicit permission to consume from a subscription that has no subscription + * permission defined in the namespace. * @param namespace Pulsar namespace name * @throws PulsarAdminException */ - void grantImplicitPermissionOnSubscription(String namespace) + void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired) throws PulsarAdminException; /** - * Grant all roles implicit permission to consume from a subscription if no subscription permission is defined - * for that subscription in the namespace. + * Set whether a role requires explicit permission to consume from a subscription that has no subscription + * permission defined in the namespace. * @param namespace Pulsar namespace name * @return */ - CompletableFuture grantImplicitPermissionOnSubscriptionAsync(String namespace); - - /** - * Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined - * for that subscription in the namespace. - * @param namespace Pulsar namespace name - * @throws PulsarAdminException - */ - void revokeImplicitPermissionOnSubscription(String namespace) - throws PulsarAdminException; - - /** - * Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined - * for that subscription in the namespace. - * @param namespace Pulsar namespace name - * @return - */ - CompletableFuture revokeImplicitPermissionOnSubscriptionAsync(String namespace); - + CompletableFuture setPermissionOnSubscriptionRequiredAsync(String namespace, + boolean permissionOnSubscriptionRequired); /** * Get the replication clusters for a namespace. diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java index 77e2cab592b9e..df1dfaf96a81e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java @@ -32,11 +32,11 @@ public interface AuthPolicies { /** * Whether an empty set of subscription authentication roles returned by {@link #getSubscriptionAuthentication()} - * implicitly grants permission to consume from the target subscription. + * requires explicit permission to consume from the target subscription. * @return */ - boolean isImplicitSubscriptionAuth(); - void setImplicitSubscriptionAuth(boolean implicitSubscriptionAuth); + boolean isSubscriptionAuthRequired(); + void setSubscriptionAuthRequired(boolean subscriptionAuthRequired); static Builder builder() { return ReflectionUtils.newBuilder("org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl"); @@ -47,6 +47,6 @@ interface Builder { Builder namespaceAuthentication(Map> namespaceAuthentication); Builder topicAuthentication(Map>> topicAuthentication); Builder subscriptionAuthentication(Map> subscriptionAuthentication); - Builder implicitSubscriptionAuth(boolean implicitSubscriptionAuth); + Builder subscriptionAuthRequired(boolean subscriptionAuthRequired); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7951fa5991a32..fa0177dbd204c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -426,38 +426,28 @@ public CompletableFuture revokePermissionOnSubscriptionAsync( } @Override - public void grantImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { - sync(() -> grantImplicitPermissionOnSubscriptionAsync(namespace)); - } - - @Override - public CompletableFuture grantImplicitPermissionOnSubscriptionAsync(String namespace) { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); - return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); - } - - @Override - public void revokeImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { - sync(() -> revokeImplicitPermissionOnSubscriptionAsync(namespace)); + public void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired) + throws PulsarAdminException { + sync(() -> setPermissionOnSubscriptionRequiredAsync(namespace, permissionOnSubscriptionRequired)); } @Override - public CompletableFuture revokeImplicitPermissionOnSubscriptionAsync(String namespace) { + public CompletableFuture setPermissionOnSubscriptionRequiredAsync(String namespace, + boolean permissionOnSubscriptionRequired) { NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); - return asyncDeleteRequest(path); + WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired"); + return asyncPutRequest(path, Entity.entity(permissionOnSubscriptionRequired, MediaType.APPLICATION_JSON)); } @Override - public boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException { - return sync(() -> getImplicitPermissionOnSubscriptionAsync(namespace)); + public boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException { + return sync(() -> getPermissionOnSubscriptionRequiredAsync(namespace)); } @Override - public CompletableFuture getImplicitPermissionOnSubscriptionAsync(String namespace) { + public CompletableFuture getPermissionOnSubscriptionRequiredAsync(String namespace) { NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription"); + WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired"); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 04b63efde160e..5f59ffe059aea 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -291,41 +291,38 @@ void run() throws PulsarAdminException { } @Parameters(commandDescription = - "Get whether a namespace allows implicit permission to consume from a subscription.") - private class ImplicitSubscriptionPermission extends CliCommand { + "Get whether a namespace requires explicit permission to consume from a subscription when no permission is " + + "defined.") + private class GetSubscriptionPermissionRequired extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - print(getAdmin().namespaces().getImplicitPermissionOnSubscription(namespace)); + print(getAdmin().namespaces().getPermissionOnSubscriptionRequired(namespace)); } } - @Parameters(commandDescription = "Grant all roles implicit permission to consume from a subscription if no " - + "subscription permission is defined for that subscription in the namespace.") - private class GrantImplicitSubscriptionPermission extends CliCommand { + @Parameters(commandDescription = "Set whether a role requires explicit permission to consume from a subscription " + + "that has no subscription permission defined in the namespace.") + private class SetSubscriptionPermissionRequired extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; - @Override - void run() throws PulsarAdminException { - String namespace = validateNamespace(params); - getAdmin().namespaces().grantImplicitPermissionOnSubscription(namespace); - } - } + @Parameter(names = { "--enable", "-e" }, description = "Enable message encryption required") + private boolean enable = false; - @Parameters(commandDescription = "Revoke implicit permission for any role to consume from a subscription if no " - + "subscription permission is defined for that subscription in the namespace.") - private class RevokeImplicitSubscriptionPermission extends CliCommand { - @Parameter(description = "tenant/namespace", required = true) - private java.util.List params; + @Parameter(names = { "--disable", "-d" }, description = "Disable message encryption required") + private boolean disable = false; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - getAdmin().namespaces().revokeImplicitPermissionOnSubscription(namespace); + if (enable == disable) { + throw new ParameterException("Need to specify either --enable or --disable"); + } + getAdmin().namespaces().setPermissionOnSubscriptionRequired(namespace, enable); } } @@ -2620,9 +2617,8 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions()); jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions()); - jcommander.addCommand("implicit-subscription-permission", new ImplicitSubscriptionPermission()); - jcommander.addCommand("grant-implicit-subscription-permission", new GrantImplicitSubscriptionPermission()); - jcommander.addCommand("revoke-implicit-subscription-permission", new RevokeImplicitSubscriptionPermission()); + jcommander.addCommand("get-subscription-permission-required", new GetSubscriptionPermissionRequired()); + jcommander.addCommand("set-subscription-permission-required", new SetSubscriptionPermissionRequired()); jcommander.addCommand("set-clusters", new SetReplicationClusters()); jcommander.addCommand("get-clusters", new GetReplicationClusters()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java index cc27e9e0cb666..ee28febc59990 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java @@ -42,9 +42,8 @@ public final class AuthPoliciesImpl implements AuthPolicies { @JsonProperty("subscription_auth_roles") private Map> subscriptionAuthentication = new TreeMap<>(); - // Default value is set in the builder - @JsonProperty(value = "implicit_subscription_auth") - private boolean implicitSubscriptionAuth; + @JsonProperty(value = "subscription_auth_required") + private boolean subscriptionAuthRequired; public static AuthPolicies.Builder builder() { return new AuthPoliciesImplBuilder(); @@ -55,7 +54,7 @@ public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder { private Map> namespaceAuthentication = new TreeMap<>(); private Map>> topicAuthentication = new TreeMap<>();; private Map> subscriptionAuthentication = new TreeMap<>();; - private boolean implicitSubscriptionAuth = true; + private boolean subscriptionAuthRequired = false; AuthPoliciesImplBuilder() { } @@ -78,21 +77,21 @@ public AuthPoliciesImplBuilder subscriptionAuthentication( return this; } - public AuthPoliciesImplBuilder implicitSubscriptionAuth(boolean implicitSubscriptionAuth) { - this.implicitSubscriptionAuth = implicitSubscriptionAuth; + public AuthPoliciesImplBuilder subscriptionAuthRequired(boolean explicitSubscriptionAuth) { + this.subscriptionAuthRequired = explicitSubscriptionAuth; return this; } public AuthPoliciesImpl build() { return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication, - implicitSubscriptionAuth); + subscriptionAuthRequired); } public String toString() { return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication=" - + this.subscriptionAuthentication + ", implicitSubscriptionAuth=" - + this.implicitSubscriptionAuth + ")"; + + this.subscriptionAuthentication + ", subscriptionAuthRequired=" + + this.subscriptionAuthRequired + ")"; } } } From e017e629e7bc49da1eada0e391f03ca7512debf8 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 18 May 2022 00:10:46 -0500 Subject: [PATCH 3/8] Replace PUT with POST --- .../main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 2 +- .../main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 2 +- .../org/apache/pulsar/client/admin/internal/NamespacesImpl.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 8eb2773856b47..966e64849ea6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -314,7 +314,7 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert internalRevokePermissionsOnSubscription(subscription, role); } - @PUT + @POST @Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired") @ApiOperation(hidden = true, value = "Set whether a role requires explicit permission to consume from a " + "subscription that has no subscription permission defined in the namespace.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index dad31397242d8..4e699ad72c487 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -267,7 +267,7 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert internalRevokePermissionsOnSubscription(subscription, role); } - @PUT + @POST @Path("/{property}/{namespace}/permissionOnSubscriptionRequired") @ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a" + " subscription.") diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index fa0177dbd204c..08b690a63d2ce 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -436,7 +436,7 @@ public CompletableFuture setPermissionOnSubscriptionRequiredAsync(String n boolean permissionOnSubscriptionRequired) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired"); - return asyncPutRequest(path, Entity.entity(permissionOnSubscriptionRequired, MediaType.APPLICATION_JSON)); + return asyncPostRequest(path, Entity.entity(permissionOnSubscriptionRequired, MediaType.APPLICATION_JSON)); } @Override From 879047fdf04a85be3cc4891ae600d2082498de84 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 18 May 2022 14:18:14 -0500 Subject: [PATCH 4/8] Add test --- .../AuthorizationProducerConsumerTest.java | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 00faf964bf7b1..65fe472f45b0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -21,6 +21,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -364,6 +365,83 @@ public void testSubscriberPermission() throws Exception { log.info("-- Exiting {} test --", methodName); } + @Test + public void testSubscriberPermissionRequired() throws Exception { + log.info("-- Starting {} test --", methodName); + + conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + setup(); + + final String tenantRole = "tenant-role"; + final String subscriptionRole = "sub-role"; + final String subscriptionName = "sub"; + final String namespace = "my-property/ns-sub-auth-req"; + final String topicName = "persistent://" + namespace + "/my-topic"; + Authentication adminAuthentication = new ClientAuthentication("superUser"); + + clientAuthProviderSupportedRoles.add(subscriptionRole); + + @Cleanup + PulsarAdmin superAdmin = spy( + PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build()); + + Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole); + @Cleanup + PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(tenantAdminAuthentication).build()); + + Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole); + @Cleanup + PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(subAdminAuthentication).build()); + + Authentication authentication = new ClientAuthentication(subscriptionRole); + + superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + + // Initialize cluster and configure namespace to require permission on subscription + superAdmin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test"))); + superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + assertFalse(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace), "Defaults to false."); + superAdmin.namespaces().setPermissionOnSubscriptionRequired(namespace, true); + tenantAdmin.topics().createNonPartitionedTopic(topicName); + tenantAdmin.topics().grantPermission(topicName, subscriptionRole, + Collections.singleton(AuthAction.consume)); + assertNull(superAdmin.namespaces().getPublishRate(namespace)); + assertTrue(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace)); + replacePulsarClient(PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(authentication)); + + // Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have + // explicit subscription permission. Verify that several operations which rely on subscription permission fail. + try { + sub1Admin.topics().resetCursor(topicName, subscriptionName, 0); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith( + "Unauthorized to validateTopicOperation for operation [RESET_CURSOR]")); + } + try { + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage()); + } + + // Grant the role permission. + tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole)); + + // Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription) + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .subscribe(); + consumer.close(); + sub1Admin.topics().resetCursor(topicName, subscriptionName, 0); + + log.info("-- Exiting {} test --", methodName); + } + @Test public void testClearBacklogPermission() throws Exception { log.info("-- Starting {} test --", methodName); From ae4d425262d5eb43d87fd81055c71ade31fa5be2 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 18 May 2022 14:56:57 -0500 Subject: [PATCH 5/8] Make admin server operations completely async --- .../broker/admin/impl/NamespacesBase.java | 40 +++++++++++++------ .../pulsar/broker/admin/v1/Namespaces.java | 16 ++++---- .../pulsar/broker/admin/v2/Namespaces.java | 8 ++-- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 51c66ed92eece..d37667b05bc3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -698,23 +698,39 @@ protected void internalGrantPermissionOnNamespace(String role, Set a } } - protected boolean getPermissionOnSubscriptionRequired() { - validateNamespaceOperation(namespaceName, NamespaceOperation.GET_PERMISSION); - Policies policies = getNamespacePolicies(namespaceName); - return policies.auth_policies.isSubscriptionAuthRequired(); + protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) { + validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies -> + asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build()) + )).exceptionally(ex -> { + log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } - protected void internalSetPermissionOnSubscriptionRequired(boolean permissionOnSubscriptionRequired) { + protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse, + boolean permissionOnSubscriptionRequired) { + CompletableFuture isAuthorized; if (permissionOnSubscriptionRequired) { - validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION); + isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION); } else { - validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION); + isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION); } - validatePoliciesReadOnlyAccess(); - updatePolicies(namespaceName, policies -> { - policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired); - return policies; - }); + isAuthorized + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired); + return policies; + })).thenAccept(__ -> { + log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(), + namespaceName, permissionOnSubscriptionRequired); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } protected void internalGrantPermissionOnSubscription(String subscription, Set roles) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 966e64849ea6d..d4462dac35676 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -323,10 +323,11 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 501, message = "Authorization is not enabled")}) public void setPermissionOnSubscriptionRequired( - @PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, boolean permissionOnSubscriptionRequired) { + @Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + boolean permissionOnSubscriptionRequired) { validateNamespaceName(property, cluster, namespace); - internalSetPermissionOnSubscriptionRequired(permissionOnSubscriptionRequired); + internalSetPermissionOnSubscriptionRequired(asyncResponse, permissionOnSubscriptionRequired); } @GET @@ -336,11 +337,12 @@ public void setPermissionOnSubscriptionRequired( @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")}) - public boolean getPermissionOnSubscriptionRequired(@PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return getPermissionOnSubscriptionRequired(); + getPermissionOnSubscriptionRequired(asyncResponse); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 4e699ad72c487..8b38fc15afcbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -276,11 +276,12 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 501, message = "Authorization is not enabled")}) public void setPermissionOnSubscriptionRequired( + @Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("namespace") String namespace, boolean required) { validateNamespaceName(property, namespace); - internalSetPermissionOnSubscriptionRequired(required); + internalSetPermissionOnSubscriptionRequired(asyncResponse, required); } @GET @@ -289,10 +290,11 @@ public void setPermissionOnSubscriptionRequired( @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty")}) - public boolean getPermissionOnSubscriptionRequired(@PathParam("property") String property, + public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("namespace") String namespace) { validateNamespaceName(property, namespace); - return getPermissionOnSubscriptionRequired(); + getPermissionOnSubscriptionRequired(asyncResponse); } @GET From 96be50c31006be36d778413ea33359b94fde9fdc Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 10 Dec 2022 23:11:52 +0800 Subject: [PATCH 6/8] checkstyle Signed-off-by: tison --- .../broker/admin/impl/NamespacesBase.java | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 94f43457229f0..8cb94ba7195b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -378,8 +378,11 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { replClusterUrl = new URL(replClusterData.getServiceUrlTls()); } else { - throw new RestException(Status.PRECONDITION_FAILED, + // CHECKSTYLE.OFF: LineLength + throw new RestException( + Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service"); + // CHECKSTYLE.ON: LineLength } } catch (MalformedURLException checkedEx) { throw new RestException(checkedEx); @@ -506,8 +509,8 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund } deleteTopicsFuture = FutureUtil.waitForAll(futures); } - return deleteTopicsFuture.thenCompose( - ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) + return deleteTopicsFuture.thenCompose(___ -> + pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) .thenRun(() -> pulsar().getBrokerService().getBundleStats() .remove(bundle.toString())); }); @@ -771,9 +774,10 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( })); } - protected CompletableFuture internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride - autoSubscriptionCreationOverride) { - // Force to read the data s.t. the watch to the cache content is setup. + protected CompletableFuture internalSetAutoSubscriptionCreationAsync( + AutoSubscriptionCreationOverride autoSubscriptionCreationOverride + ) { + // Force to read the data s.t. the watch to the cache content is set up. return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -952,14 +956,18 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) .thenCompose(flag -> { if (!flag) { - log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), - namespaceName, bundleRange); + log.info("[{}] Namespace bundle is not owned by any broker {}/{}", + clientAppId(), namespaceName, bundleRange); return CompletableFuture.completedFuture(null); } - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, true) - .thenCompose(nsBundle -> - pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); + return validateNamespaceBundleOwnershipAsync( + namespaceName, + policies.bundles, + bundleRange, + authoritative, + true + ).thenCompose(nsBundle -> + pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); })); } From 3d69525002d51fae9afd00ac9c64f0263d2828cf Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 10 Dec 2022 23:18:25 +0800 Subject: [PATCH 7/8] Revert "checkstyle" This reverts commit 96be50c31006be36d778413ea33359b94fde9fdc. --- .../broker/admin/impl/NamespacesBase.java | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8cb94ba7195b6..94f43457229f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -378,11 +378,8 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) { replClusterUrl = new URL(replClusterData.getServiceUrlTls()); } else { - // CHECKSTYLE.OFF: LineLength - throw new RestException( - Status.PRECONDITION_FAILED, + throw new RestException(Status.PRECONDITION_FAILED, "The replication cluster does not provide TLS encrypted service"); - // CHECKSTYLE.ON: LineLength } } catch (MalformedURLException checkedEx) { throw new RestException(checkedEx); @@ -509,8 +506,8 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund } deleteTopicsFuture = FutureUtil.waitForAll(futures); } - return deleteTopicsFuture.thenCompose(___ -> - pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) + return deleteTopicsFuture.thenCompose( + ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) .thenRun(() -> pulsar().getBrokerService().getBundleStats() .remove(bundle.toString())); }); @@ -774,10 +771,9 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( })); } - protected CompletableFuture internalSetAutoSubscriptionCreationAsync( - AutoSubscriptionCreationOverride autoSubscriptionCreationOverride - ) { - // Force to read the data s.t. the watch to the cache content is set up. + protected CompletableFuture internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride + autoSubscriptionCreationOverride) { + // Force to read the data s.t. the watch to the cache content is setup. return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -956,18 +952,14 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) .thenCompose(flag -> { if (!flag) { - log.info("[{}] Namespace bundle is not owned by any broker {}/{}", - clientAppId(), namespaceName, bundleRange); + log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), + namespaceName, bundleRange); return CompletableFuture.completedFuture(null); } - return validateNamespaceBundleOwnershipAsync( - namespaceName, - policies.bundles, - bundleRange, - authoritative, - true - ).thenCompose(nsBundle -> - pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); + return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, true) + .thenCompose(nsBundle -> + pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); })); } From c6a4b3ae3f6bc920a36bffb7128615665d243322 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 10 Dec 2022 23:24:02 +0800 Subject: [PATCH 8/8] revert style changes Signed-off-by: tison --- .../broker/admin/impl/NamespacesBase.java | 500 +++++++++--------- 1 file changed, 249 insertions(+), 251 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 94f43457229f0..c301106917632 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -205,18 +205,18 @@ protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { CompletableFuture preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force); return preconditionCheck .thenCompose(policies -> { - if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)) { + if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); } return pulsar().getNamespaceService().getFullListOfTopics(namespaceName); }) .thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) - .thenCompose(allPartitionedTopics -> { - List> topicsSum = new ArrayList<>(2); - topicsSum.add(allTopics); - topicsSum.add(allPartitionedTopics); - return CompletableFuture.completedFuture(topicsSum); - })) + .thenCompose(allPartitionedTopics -> { + List> topicsSum = new ArrayList<>(2); + topicsSum.add(allTopics); + topicsSum.add(allPartitionedTopics); + return CompletableFuture.completedFuture(topicsSum); + })) .thenCompose(topics -> { List allTopics = topics.get(0); ArrayList allUserCreatedTopics = new ArrayList<>(); @@ -258,7 +258,7 @@ protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { } return namespaceResources().setPoliciesAsync(namespaceName, old -> { old.deleted = true; - return old; + return old; }).thenCompose(ignore -> { return internalDeleteTopicsAsync(allUserCreatedTopics); }).thenCompose(ignore -> { @@ -360,7 +360,7 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns // There are still more than one clusters configured for the global namespace throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + nsName + ". There are still more than " - + "one replication clusters configured."); + + "one replication clusters configured."); } if (replicationClusters.size() == 1 && !policies.replication_clusters.contains(config().getClusterName())) { @@ -379,7 +379,7 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns replClusterUrl = new URL(replClusterData.getServiceUrlTls()); } else { throw new RestException(Status.PRECONDITION_FAILED, - "The replication cluster does not provide TLS encrypted service"); + "The replication cluster does not provide TLS encrypted service"); } } catch (MalformedURLException checkedEx) { throw new RestException(checkedEx); @@ -507,7 +507,7 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund deleteTopicsFuture = FutureUtil.waitForAll(futures); } return deleteTopicsFuture.thenCompose( - ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) + ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle)) .thenRun(() -> pulsar().getBrokerService().getBundleStats() .remove(bundle.toString())); }); @@ -515,41 +515,6 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund }); } - protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) { - validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies -> - asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build()) - )).exceptionally(ex -> { - log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - - protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse, - boolean permissionOnSubscriptionRequired) { - CompletableFuture isAuthorized; - if (permissionOnSubscriptionRequired) { - isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION); - } else { - isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION); - } - isAuthorized - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { - policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired); - return policies; - })).thenAccept(__ -> { - log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(), - namespaceName, permissionOnSubscriptionRequired); - asyncResponse.resume(Response.ok().build()); - }).exceptionally(ex -> { - log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - protected CompletableFuture internalGrantPermissionOnNamespaceAsync(String role, Set actions) { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); if (null != authService) { @@ -590,7 +555,7 @@ protected CompletableFuture internalGrantPermissionOnNamespaceAsync(String protected CompletableFuture internalGrantPermissionOnSubscriptionAsync(String subscription, - Set roles) { + Set roles) { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); if (null != authService) { return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION) @@ -639,7 +604,7 @@ protected CompletableFuture internalRevokePermissionsOnNamespaceAsync(Stri } protected CompletableFuture internalRevokePermissionsOnSubscriptionAsync(String subscriptionName, - String role) { + String role) { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); if (null != authService) { return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION) @@ -667,6 +632,7 @@ protected CompletableFuture> internalGetNamespaceReplicationClusters .thenApply(policies -> policies.replication_clusters); } + @SuppressWarnings("checkstyle:WhitespaceAfter") protected CompletableFuture internalSetNamespaceReplicationClusters(List clusterIds) { return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -755,7 +721,7 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( + validateResult.getErrorInfo()); } if (Objects.equals(autoTopicCreationOverride.getTopicType(), - TopicType.PARTITIONED.toString())) { + TopicType.PARTITIONED.toString())){ if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { throw new RestException(Status.NOT_ACCEPTABLE, @@ -772,15 +738,15 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( } protected CompletableFuture internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride - autoSubscriptionCreationOverride) { + autoSubscriptionCreationOverride) { // Force to read the data s.t. the watch to the cache content is setup. return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(unused -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { - policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride; - return policies; - })) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(unused -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride; + return policies; + })) .thenAccept(r -> { if (autoSubscriptionCreationOverride != null) { String autoOverride = autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation() @@ -859,9 +825,9 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi try { getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> { LocalPolicies localPolicies = oldPolicies.map( - policies -> new LocalPolicies(policies.bundles, - bookieAffinityGroup, - policies.namespaceAntiAffinityGroup)) + policies -> new LocalPolicies(policies.bundles, + bookieAffinityGroup, + policies.namespaceAntiAffinityGroup)) .orElseGet(() -> new LocalPolicies(defaultBundle(), bookieAffinityGroup, null)); @@ -919,18 +885,18 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); }) .thenApply(__ -> - pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundle(namespaceName.toString(), bundleRange) + pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundle(namespaceName.toString(), bundleRange) ) .thenCompose(bundle -> - pulsar().getNamespaceService().isNamespaceBundleOwned(bundle) - .exceptionally(ex -> { - if (log.isDebugEnabled()) { - log.debug("Failed to validate cluster ownership for {}-{}, {}", - namespaceName.toString(), bundleRange, ex.getMessage(), ex); - } - return false; - }) + pulsar().getNamespaceService().isNamespaceBundleOwned(bundle) + .exceptionally(ex -> { + if (log.isDebugEnabled()) { + log.debug("Failed to validate cluster ownership for {}-{}, {}", + namespaceName.toString(), bundleRange, ex.getMessage(), ex); + } + return false; + }) ) .thenCompose(isOwnedByLocalCluster -> { if (!isOwnedByLocalCluster) { @@ -949,18 +915,18 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenCompose(policies -> - isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) - .thenCompose(flag -> { - if (!flag) { - log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), - namespaceName, bundleRange); - return CompletableFuture.completedFuture(null); - } - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, true) - .thenCompose(nsBundle -> - pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); - })); + isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) + .thenCompose(flag -> { + if (!flag) { + log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), + namespaceName, bundleRange); + return CompletableFuture.completedFuture(null); + } + return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, true) + .thenCompose(nsBundle -> + pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); + })); } @SuppressWarnings("deprecation") @@ -1005,7 +971,7 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl return getNamespacePoliciesAsync(namespaceName) .thenCompose(policies -> validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false)) + authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)); }); @@ -1240,7 +1206,6 @@ protected CompletableFuture internalGetSubscribeRateAsync() { .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName())); } - protected CompletableFuture setBacklogQuotaAsync(BacklogQuotaType backlogQuotaType, BacklogQuota quota) { return namespaceResources().setPoliciesAsync(namespaceName, policies -> { @@ -1274,7 +1239,7 @@ protected void internalSetRetention(RetentionPolicies retention) { try { Policies policies = namespaceResources().getPolicies(namespaceName) .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Namespace policies does not exist")); + "Namespace policies does not exist")); if (!checkQuotas(policies, retention)) { log.warn("[{}] Failed to update retention configuration" + " for namespace {}: conflicts with backlog quota", @@ -1568,7 +1533,7 @@ protected void internalSetPolicies(String fieldName, Object value) { try { Policies policies = namespaceResources().getPolicies(namespaceName) .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Namespace policies does not exist")); + "Namespace policies does not exist")); Field field = Policies.class.getDeclaredField(fieldName); field.setAccessible(true); field.set(policies, value); @@ -1603,12 +1568,12 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { } try { - getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp) -> - lp.map(policies -> new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - antiAffinityGroup)) - .orElseGet(() -> new LocalPolicies(defaultBundle(), - null, antiAffinityGroup)) + getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)-> + lp.map(policies -> new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, + antiAffinityGroup)) + .orElseGet(() -> new LocalPolicies(defaultBundle(), + null, antiAffinityGroup)) ); log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(), namespaceName, antiAffinityGroup); @@ -1639,10 +1604,10 @@ protected void internalRemoveNamespaceAntiAffinityGroup() { log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName); try { - getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> - new LocalPolicies(policies.bundles, - policies.bookieAffinityGroup, - null)); + getLocalPolicies().setLocalPolicies(namespaceName, (policies)-> + new LocalPolicies(policies.bundles, + policies.bookieAffinityGroup, + null)); log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); } catch (Exception e) { log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); @@ -1788,32 +1753,32 @@ private CompletableFuture validatePoliciesAsync(NamespaceName ns, Policies // Validate cluster names and permissions return policies.replication_clusters.stream() - .map(cluster -> validateClusterForTenantAsync(ns.getTenant(), cluster)) - .reduce(CompletableFuture.completedFuture(null), (a, b) -> a.thenCompose(ignore -> b)) - .thenAccept(__ -> { - if (policies.message_ttl_in_seconds != null && policies.message_ttl_in_seconds < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); - } + .map(cluster -> validateClusterForTenantAsync(ns.getTenant(), cluster)) + .reduce(CompletableFuture.completedFuture(null), (a, b) -> a.thenCompose(ignore -> b)) + .thenAccept(__ -> { + if (policies.message_ttl_in_seconds != null && policies.message_ttl_in_seconds < 0) { + throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); + } - if (policies.bundles != null && policies.bundles.getNumBundles() > 0) { - if (policies.bundles.getBoundaries() == null || policies.bundles.getBoundaries().size() == 0) { - policies.bundles = getBundles(policies.bundles.getNumBundles()); - } else { - policies.bundles = validateBundlesData(policies.bundles); - } + if (policies.bundles != null && policies.bundles.getNumBundles() > 0) { + if (policies.bundles.getBoundaries() == null || policies.bundles.getBoundaries().size() == 0) { + policies.bundles = getBundles(policies.bundles.getNumBundles()); } else { - int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); - policies.bundles = getBundles(defaultNumberOfBundles); + policies.bundles = validateBundlesData(policies.bundles); } + } else { + int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); + policies.bundles = getBundles(defaultNumberOfBundles); + } - if (policies.persistence != null) { - validatePersistencePolicies(policies.persistence); - } + if (policies.persistence != null) { + validatePersistencePolicies(policies.persistence); + } - if (policies.retention_policies != null) { - validateRetentionPolicies(policies.retention_policies); - } - }); + if (policies.retention_policies != null) { + validateRetentionPolicies(policies.retention_policies); + } + }); } protected void validateRetentionPolicies(RetentionPolicies retention) { @@ -1941,7 +1906,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag } } - protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) { + protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ validateSuperUserAccess(); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { @@ -2267,12 +2232,12 @@ private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { } } - protected void internalRemoveMaxTopicsPerNamespace() { + protected void internalRemoveMaxTopicsPerNamespace() { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); internalSetMaxTopicsPerNamespace(null); - } + } - protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { + protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); @@ -2281,45 +2246,45 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { "maxTopicsPerNamespace must be 0 or more"); } internalSetPolicies("max_topics_per_namespace", maxTopicsPerNamespace); - } - - protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.put(key, value); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); - } - - protected void internalSetProperties(Map properties, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.putAll(properties); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); - } - - protected void internalGetProperty(String key, AsyncResponse asyncResponse) { + } + + protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { + validatePoliciesReadOnlyAccess(); + updatePoliciesAsync(namespaceName, policies -> { + policies.properties.put(key, value); + return policies; + }).thenAccept(v -> { + log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); + } + + protected void internalSetProperties(Map properties, AsyncResponse asyncResponse) { + validatePoliciesReadOnlyAccess(); + updatePoliciesAsync(namespaceName, policies -> { + policies.properties.putAll(properties); + return policies; + }).thenAccept(v -> { + log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); + } + + protected void internalGetProperty(String key, AsyncResponse asyncResponse) { getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { asyncResponse.resume(policies.properties.get(key)); }).exceptionally(ex -> { @@ -2329,97 +2294,97 @@ protected void internalGetProperty(String key, AsyncResponse asyncResponse) { asyncResponse.resume(cause); return null; }); - } - - protected void internalGetProperties(AsyncResponse asyncResponse) { - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - asyncResponse.resume(policies.properties); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); - } - - protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - - AtomicReference oldVal = new AtomicReference<>(null); - updatePoliciesAsync(namespaceName, policies -> { - oldVal.set(policies.properties.remove(key)); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(oldVal.get()); - log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); - } - - protected void internalClearProperties(AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - AtomicReference clearedCount = new AtomicReference<>(0); - updatePoliciesAsync(namespaceName, policies -> { - clearedCount.set(policies.properties.size()); - policies.properties.clear(); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(), - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); - } - - private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function updateFunction) { - CompletableFuture result = new CompletableFuture<>(); - namespaceResources().setPoliciesAsync(ns, updateFunction) - .thenAccept(v -> { - log.info("[{}] Successfully updated the policies on namespace {}", clientAppId(), namespaceName); - result.complete(null); - }) - .exceptionally(ex -> { - Throwable cause = ex.getCause(); - if (cause instanceof NotFoundException) { - result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist")); - } else if (cause instanceof BadVersionException) { - log.warn("[{}] Failed to update the replication clusters on" - + " namespace {} : concurrent modification", clientAppId(), namespaceName); - result.completeExceptionally(new RestException(Status.CONFLICT, "Concurrent modification")); - } else { - log.error("[{}] Failed to update namespace policies {}", clientAppId(), namespaceName, cause); - result.completeExceptionally(new RestException(cause)); - } - return null; - }); - return result; - } - - private void updatePolicies(NamespaceName ns, Function updateFunction) { - // Force to read the data s.t. the watch to the cache content is setup. - try { - updatePoliciesAsync(ns, updateFunction).get(namespaceResources().getOperationTimeoutSec(), - TimeUnit.SECONDS); - } catch (Exception e) { - Throwable cause = e.getCause(); - if (!(cause instanceof RestException)) { - throw new RestException(cause); - } else { - throw (RestException) cause; - } - } - } + } + + protected void internalGetProperties(AsyncResponse asyncResponse) { + getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { + asyncResponse.resume(policies.properties); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); + } + + protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { + validatePoliciesReadOnlyAccess(); + + AtomicReference oldVal = new AtomicReference<>(null); + updatePoliciesAsync(namespaceName, policies -> { + oldVal.set(policies.properties.remove(key)); + return policies; + }).thenAccept(v -> { + asyncResponse.resume(oldVal.get()); + log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); + } + + protected void internalClearProperties(AsyncResponse asyncResponse) { + validatePoliciesReadOnlyAccess(); + AtomicReference clearedCount = new AtomicReference<>(0); + updatePoliciesAsync(namespaceName, policies -> { + clearedCount.set(policies.properties.size()); + policies.properties.clear(); + return policies; + }).thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); + } + + private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function updateFunction) { + CompletableFuture result = new CompletableFuture<>(); + namespaceResources().setPoliciesAsync(ns, updateFunction) + .thenAccept(v -> { + log.info("[{}] Successfully updated the policies on namespace {}", clientAppId(), namespaceName); + result.complete(null); + }) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof NotFoundException) { + result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else if (cause instanceof BadVersionException) { + log.warn("[{}] Failed to update the replication clusters on" + + " namespace {} : concurrent modification", clientAppId(), namespaceName); + result.completeExceptionally(new RestException(Status.CONFLICT, "Concurrent modification")); + } else { + log.error("[{}] Failed to update namespace policies {}", clientAppId(), namespaceName, cause); + result.completeExceptionally(new RestException(cause)); + } + return null; + }); + return result; + } + + private void updatePolicies(NamespaceName ns, Function updateFunction) { + // Force to read the data s.t. the watch to the cache content is setup. + try { + updatePoliciesAsync(ns, updateFunction).get(namespaceResources().getOperationTimeoutSec(), + TimeUnit.SECONDS); + } catch (Exception e) { + Throwable cause = e.getCause(); + if (!(cause instanceof RestException)) { + throw new RestException(cause); + } else { + throw (RestException) cause; + } + } + } protected void internalSetNamespaceResourceGroup(String rgName) { validateNamespacePolicyOperation(namespaceName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE); @@ -2489,7 +2454,6 @@ protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, Di return null; }); } - /** * Base method for getReplicatorDispatchRate v1 and v2. * Notion: don't re-use this logic. @@ -2511,7 +2475,6 @@ protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) { return null; }); } - /** * Base method for removeReplicatorDispatchRate v1 and v2. * Notion: don't re-use this logic. @@ -2598,4 +2561,39 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu return null; }); } + + protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) { + validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies -> + asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build()) + )).exceptionally(ex -> { + log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse, + boolean permissionOnSubscriptionRequired) { + CompletableFuture isAuthorized; + if (permissionOnSubscriptionRequired) { + isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION); + } else { + isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION); + } + isAuthorized + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired); + return policies; + })).thenAccept(__ -> { + log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(), + namespaceName, permissionOnSubscriptionRequired); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } }