Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker]PIP-180 ShadowTopic - Part III - Add shadowTopics in TopicPolicy #17242

Merged
merged 1 commit into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -5382,4 +5383,65 @@ protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
}));
}

protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics) {
List<CompletableFuture<Void>> futures = new ArrayList<>(shadowTopics.size());
for (String shadowTopic : shadowTopics) {
try {
TopicName shadowTopicName = TopicName.get(shadowTopic);
if (!shadowTopicName.isPersistent()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Only persistent topic can be set as shadow topic"));
}
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
.thenAccept(isExists -> {
if (!isExists) {
throw new RestException(Status.PRECONDITION_FAILED,
"Shadow topic [" + shadowTopic + "] not exists.");
}
}));
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new RestException(Status.FORBIDDEN,
"Invalid shadow topic name: " + shadowTopic));
}
}
return FutureUtil.waitForAll(futures);
}

protected CompletableFuture<Void> internalSetShadowTopic(List<String> shadowTopics) {
if (!topicName.isPersistent()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Only persistent source topic is supported with shadow topics."));
}
if (CollectionUtils.isEmpty(shadowTopics)) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Cannot specify empty shadow topics, please use remove command instead."));
}
return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> validateShadowTopics(shadowTopics))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setShadowTopics(shadowTopics);
return pulsar().getTopicPoliciesService().
updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalDeleteShadowTopics() {
return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(shadowTopicName -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
List<String> shadowTopics = topicPolicies.getShadowTopics();
if (CollectionUtils.isEmpty(shadowTopics)) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.setShadowTopics(null);
return pulsar().getTopicPoliciesService().
updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4363,5 +4363,84 @@ public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Get the shadow topic list for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry")})
public void getShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
PolicyOperation.READ))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> asyncResponse.resume(op.map(TopicPolicies::getShadowTopics).orElse(null)))
.exceptionally(ex -> {
handleTopicPolicyException("getShadowTopics", ex, asyncResponse);
return null;
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Set shadow topic list for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void setShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "List of shadow topics", required = true) List<String> shadowTopics) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetShadowTopic(shadowTopics))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setShadowTopic", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Delete shadow topics for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void deleteShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalDeleteShadowTopics())
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteShadowTopic", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Slf4j
@Test(groups = "broker-admin")
Expand Down Expand Up @@ -3059,4 +3060,38 @@ public void testMaxMessageSizeWithChunking() throws Exception {
producer.send(new byte[2000]);
}

@Test(timeOut = 30000)
public void testShadowTopics() throws Exception {
final String sourceTopic = "persistent://" + myNamespace + "/source-test-" + UUID.randomUUID();
final String shadowTopic1 = "persistent://" + myNamespace + "/shadow-test1-" + UUID.randomUUID();
final String shadowTopic2 = "persistent://" + myNamespace + "/shadow-test2-" + UUID.randomUUID();

pulsarClient.newProducer().topic(sourceTopic).create().close();

Awaitility.await().untilAsserted(() ->
Assert.assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(sourceTopic))));

//shadow topic must exist
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, ()->
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1)));

//shadow topic must be persistent topic
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, ()->
admin.topics().setShadowTopics(sourceTopic,
Lists.newArrayList("non-persistent://" + myNamespace + "/shadow-test1-" + UUID.randomUUID())));

pulsarClient.newProducer().topic(shadowTopic1).create().close();
pulsarClient.newProducer().topic(shadowTopic2).create().close();

admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(admin.topics().getShadowTopics(sourceTopic),
Lists.newArrayList(shadowTopic1)));
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1, shadowTopic2));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(admin.topics().getShadowTopics(sourceTopic),
Lists.newArrayList(shadowTopic1, shadowTopic2)));

admin.topics().removeShadowTopics(sourceTopic);
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getShadowTopics(sourceTopic)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4335,4 +4335,48 @@ CompletableFuture<Message<byte[]>> examineMessageAsync(String topic, String init
* @param topic topic name
*/
CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic, boolean enable);

/**
* Set shadow topic list for a source topic.
*
* @param sourceTopic source topic name
* @param shadowTopics list of shadow topic name
*/
void setShadowTopics(String sourceTopic, List<String> shadowTopics) throws PulsarAdminException;

/**
* Remove all shadow topics for a source topic.
*
* @param sourceTopic source topic name
*/
void removeShadowTopics(String sourceTopic) throws PulsarAdminException;

/**
* Get shadow topic list of the source topic.
*
* @param sourceTopic source topic name
* @return shadow topic list
*/
List<String> getShadowTopics(String sourceTopic) throws PulsarAdminException;

/**
* Set shadow topic list for a source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<Void> setShadowTopicsAsync(String sourceTopic, List<String> shadowTopics);

/**
* Remove all shadow topics for a source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<Void> removeShadowTopicsAsync(String sourceTopic);

/**
* Get shadow topic list of the source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2714,5 +2714,41 @@ public CompletableFuture<Void> removeReplicationClustersAsync(String topic) {
return asyncDeleteRequest(path);
}

@Override
public void setShadowTopics(String sourceTopic, List<String> shadowTopics) throws PulsarAdminException {
sync(() -> setShadowTopicsAsync(sourceTopic, shadowTopics));
}

@Override
public void removeShadowTopics(String sourceTopic) throws PulsarAdminException {
sync(() -> removeShadowTopicsAsync(sourceTopic));
}

@Override
public List<String> getShadowTopics(String sourceTopic) throws PulsarAdminException {
return sync(() -> getShadowTopicsAsync(sourceTopic));
}

@Override
public CompletableFuture<Void> setShadowTopicsAsync(String sourceTopic, List<String> shadowTopics) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncPutRequest(path, Entity.entity(shadowTopics, MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> removeShadowTopicsAsync(String sourceTopic) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncGetRequest(path, new FutureCallback<List<String>>(){});
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,16 @@ public boolean matches(Long timestamp) {

cmdTopics.run(split("remove-replication-clusters persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeReplicationClusters("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-shadow-topics persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getShadowTopics("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("set-shadow-topics persistent://myprop/clust/ns1/ds1 -t test"));
verify(mockTopics).setShadowTopics("persistent://myprop/clust/ns1/ds1", Lists.newArrayList("test"));

cmdTopics.run(split("remove-shadow-topics persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeShadowTopics("persistent://myprop/clust/ns1/ds1");

}

private static LedgerInfo newLedger(long id, long entries, long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-replication-clusters", new SetReplicationClusters());
jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters());

jcommander.addCommand("get-shadow-topics", new GetShadowTopics());
jcommander.addCommand("set-shadow-topics", new SetShadowTopics());
jcommander.addCommand("remove-shadow-topics", new RemoveShadowTopics());

jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

Expand Down Expand Up @@ -1653,6 +1657,47 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the shadow topics for a topic")
private class GetShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopics().getShadowTopics(persistentTopic));
}
}

@Parameters(commandDescription = "Set the shadow topics for a topic")
private class SetShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--topics",
"-t" }, description = "Shadow topic list (comma separated values)", required = true)
private String shadowTopics;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
List<String> topics = Lists.newArrayList(shadowTopics.split(","));
getTopics().setShadowTopics(persistentTopic, topics);
}
}

@Parameters(commandDescription = "Remove the shadow topics for a topic")
private class RemoveShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopics().removeShadowTopics(persistentTopic);
}
}

@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ public enum PolicyName {
TTL,
MAX_TOPICS,
RESOURCEGROUP,
ENTRY_FILTERS
ENTRY_FILTERS,
SHADOW_TOPIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TopicPolicies {
@Builder.Default
private List<SubType> subscriptionTypesEnabled = new ArrayList<>();
private List<String> replicationClusters;
private List<String> shadowTopics;
private Boolean isGlobal = false;
private PersistencePolicies persistence;
private RetentionPolicies retentionPolicies;
Expand Down