Skip to content

Commit

Permalink
fix isAuthorized
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Oct 30, 2023
1 parent a00f624 commit 6caf1ec
Showing 1 changed file with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@
package org.apache.pulsar.websocket;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.base.Splitter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Subscribing for multi-topic.
Expand All @@ -36,6 +46,38 @@ public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest re
super(service, request, response);
}

@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
try {
AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData,
this.subscription);
if (topics != null) {
List<String> topicNames = Splitter.on(",").splitToList(topics);
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (String topicName : topicNames) {
futures.add(service.getAuthorizationService()
.allowTopicOperationAsync(TopicName.get(topicName),
TopicOperation.CONSUME, authRole, subscription));
}
FutureUtil.waitForAll(futures)
.get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
return futures.stream().allMatch(f -> f.join());
} else {
return service.getAuthorizationService()
.allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription)
.get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
}
} catch (TimeoutException e) {
log.warn("Time-out {} sec while checking authorization on {} ",
service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic);
throw e;
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic,
e.getMessage());
throw e;
}
}

@Override
protected void extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI();
Expand Down Expand Up @@ -72,4 +114,6 @@ public String extractSubscription(HttpServletRequest request) {

return Codec.decode(parts.get(4));
}

private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerHandler.class);
}

0 comments on commit 6caf1ec

Please sign in to comment.