Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-167][Authorization] Make it Configurable to Require Subscription Permission #15576

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,21 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(policies -> {
if (!policies.isPresent()) {
// TODO this case seems like it could bypass authorization checks.
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// Reject request if role is unauthorized to access subscription.
// If subscriptionAuthRequired is enabled, role must be in the set of roles.
// Otherwise, set of roles must be null or empty, or role must be in set of roles.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
boolean isUnauthorized = policies.get().auth_policies.isSubscriptionAuthRequired()
? (roles == null || roles.isEmpty() || !roles.contains(role))
: (roles != null && !roles.isEmpty() && !roles.contains(role));
if (isUnauthorized) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down Expand Up @@ -483,6 +488,8 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GET_TOPICS:
case GET_BUNDLE:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
// TODO these only require ability to consume on namespace; ignore namespace's subscription
// permission.
case UNSUBSCRIBE:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
Expand Down Expand Up @@ -537,6 +544,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
return canLookupAsync(topicName, role, authData);
case PRODUCE:
return canProduceAsync(topicName, role, authData);
// TODO consume from single subscription lets role view all subscriptions on a topic
case GET_SUBSCRIPTIONS:
case CONSUME:
case SUBSCRIBE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2561,4 +2561,39 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) {
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies ->
asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build())
)).exceptionally(ex -> {
log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse,
boolean permissionOnSubscriptionRequired) {
CompletableFuture<Void> isAuthorized;
if (permissionOnSubscriptionRequired) {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
} else {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION);
}
isAuthorized
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired);
return policies;
})).thenAccept(__ -> {
log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(),
namespaceName, permissionOnSubscriptionRequired);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,37 @@ public void revokePermissionOnSubscription(@Suspended AsyncResponse asyncRespons
});
}

@POST
@Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(hidden = true, value = "Set whether a role requires explicit permission to consume from a "
+ "subscription that has no subscription permission defined in the namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void setPermissionOnSubscriptionRequired(
@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
boolean permissionOnSubscriptionRequired) {
validateNamespaceName(property, cluster, namespace);
internalSetPermissionOnSubscriptionRequired(asyncResponse, permissionOnSubscriptionRequired);
}

@GET
@Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(value = "Get whether a role requires explicit permission to consume from a "
+ "subscription that has no subscription permission defined in the namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
getPermissionOnSubscriptionRequired(asyncResponse);
}

@GET
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(hidden = true, value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,36 @@ public void revokePermissionOnSubscription(@Suspended AsyncResponse asyncRespons
});
}

@POST
@Path("/{property}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void setPermissionOnSubscriptionRequired(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("namespace") String namespace,
boolean required) {
validateNamespaceName(property, namespace);
internalSetPermissionOnSubscriptionRequired(asyncResponse, required);
}

@GET
@Path("/{property}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(value = "Get permission on subscription required for namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
getPermissionOnSubscriptionRequired(asyncResponse);
}

@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -365,6 +366,83 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriberPermissionRequired() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub";
final String namespace = "my-property/ns-sub-auth-req";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

// Initialize cluster and configure namespace to require permission on subscription
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertFalse(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace), "Defaults to false.");
superAdmin.namespaces().setPermissionOnSubscriptionRequired(namespace, true);
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
assertNull(superAdmin.namespaces().getPublishRate(namespace));
assertTrue(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
try {
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
}
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
}

// Grant the role permission.
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));

// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,42 @@ CompletableFuture<Void> grantPermissionOnSubscriptionAsync(
*/
CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role);

/**
* Get whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
*
* @param namespace Pulsar namespace name
* @return
* @throws PulsarAdminException
*/
boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException;

/**
* Get whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Boolean> getPermissionOnSubscriptionRequiredAsync(String namespace);

/**
* Set whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @throws PulsarAdminException
*/
void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired)
throws PulsarAdminException;

/**
* Set whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Void> setPermissionOnSubscriptionRequiredAsync(String namespace,
boolean permissionOnSubscriptionRequired);

/**
* Get the replication clusters for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface AuthPolicies {
Map<String, Map<String, Set<AuthAction>>> getTopicAuthentication();
Map<String, Set<String>> getSubscriptionAuthentication();

/**
* Whether an empty set of subscription authentication roles returned by {@link #getSubscriptionAuthentication()}
* requires explicit permission to consume from the target subscription.
* @return
*/
boolean isSubscriptionAuthRequired();
void setSubscriptionAuthRequired(boolean subscriptionAuthRequired);

static Builder builder() {
return ReflectionUtils.newBuilder("org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl");
}
Expand All @@ -39,5 +47,6 @@ interface Builder {
Builder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication);
Builder topicAuthentication(Map<String, Map<String, Set<AuthAction>>> topicAuthentication);
Builder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication);
Builder subscriptionAuthRequired(boolean subscriptionAuthRequired);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,45 @@ public CompletableFuture<Void> revokePermissionOnSubscriptionAsync(
return asyncDeleteRequest(path);
}

@Override
public void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired)
throws PulsarAdminException {
sync(() -> setPermissionOnSubscriptionRequiredAsync(namespace, permissionOnSubscriptionRequired));
}

@Override
public CompletableFuture<Void> setPermissionOnSubscriptionRequiredAsync(String namespace,
boolean permissionOnSubscriptionRequired) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired");
return asyncPostRequest(path, Entity.entity(permissionOnSubscriptionRequired, MediaType.APPLICATION_JSON));
}

@Override
public boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException {
return sync(() -> getPermissionOnSubscriptionRequiredAsync(namespace));
}

@Override
public CompletableFuture<Boolean> getPermissionOnSubscriptionRequiredAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired");
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Boolean>() {
@Override
public void completed(Boolean enabled) {
future.complete(enabled);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
return sync(() -> getNamespaceReplicationClustersAsync(namespace));
Expand Down
Loading