Skip to content

Commit

Permalink
Copy test from apache#15576
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Apr 4, 2024
1 parent d1b9d3b commit 85e8409
Showing 1 changed file with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,83 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriberPermissionRequired() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub";
final String namespace = "my-property/ns-sub-auth-req";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

// Initialize cluster and configure namespace to require permission on subscription
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertFalse(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace), "Defaults to false.");
superAdmin.namespaces().setPermissionOnSubscriptionRequired(namespace, true);
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
assertNull(superAdmin.namespaces().getPublishRate(namespace));
assertTrue(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
try {
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
}
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
}

// Grant the role permission.
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));

// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down

0 comments on commit 85e8409

Please sign in to comment.