Skip to content

Commit

Permalink
[feat][admin] PIP-219 Part-1 Add admin API for trimming topic (#19094)
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Jan 19, 2023
1 parent a9b6519 commit 29c244a
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GET_BUNDLE:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
case UNSUBSCRIBE:
case TRIM_TOPIC:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
namespaceName, role, authData, AuthAction.consume);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4612,6 +4612,69 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut
return null;
});
}
protected CompletableFuture<Void> internalTrimTopic(AsyncResponse asyncResponse, boolean authoritative) {
if (!topicName.isPersistent()) {
log.info("[{}] Trim on a non-persistent topic {} is not allowed", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Trim on a non-persistent topic is not allowed"));
return null;
}
if (topicName.isPartitioned()) {
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC).thenCompose((x)
-> trimNonPartitionedTopic(asyncResponse, topicName, authoritative));
}
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC)
.thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return trimPartitionedTopic(asyncResponse, metadata);
}
return trimNonPartitionedTopic(asyncResponse, topicName, authoritative);
});
}

private CompletableFuture<Void> trimNonPartitionedTopic(AsyncResponse asyncResponse,
TopicName topicName, boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.info("[{}] Trim on a non-persistent topic {} is not allowed", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Trim on a non-persistent topic is not allowed"));
return CompletableFuture.completedFuture(null);
}
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
if (managedLedger == null) {
asyncResponse.resume(null);
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<>();
managedLedger.trimConsumedLedgersInBackground(result);
return result.whenComplete((res, e) -> {
if (e != null) {
asyncResponse.resume(e);
} else {
asyncResponse.resume(res);
}
});
});
}

private CompletableFuture<Void> trimPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata metadata) {
List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicNamePartition, e);
throw new RestException(e);
}
}
return FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
}

protected CompletableFuture<DispatchRateImpl> internalGetDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3269,6 +3269,44 @@ public void getLastMessageId(
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/trim")
@ApiOperation(value = " Trim a topic")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void trimTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Get dispatch rate configuration for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -179,4 +184,74 @@ public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
assertEquals(messageIdAfterTrim, MessageId.earliest);

}

@Test
public void TestAdminTrimLedgers() throws Exception {
conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
conf.setDefaultNumberOfNamespaceBundles(1);
super.baseSetup();
final String topicName = "persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
final String subscriptionName = "my-sub";
final int maxEntriesPerLedger = 2;
final int partitionedNum = 3;

admin.topics().createPartitionedTopic(topicName, partitionedNum);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("producer-name")
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
for (int i = 0; i < partitionedNum; i++) {
String topic = TopicName.get(topicName).getPartition(i).toString();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
}
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
int msgNum = 50;
for (int i = 0; i < msgNum; i++) {
producer.send(new byte[0]);
}
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() > 1);
for (int i = 0; i < msgNum; i++) {
Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
}
//consumed ledger should be cleaned
admin.topics().trimTopic(topicName);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));

}

@Test
public void trimNonPersistentTopic() throws Exception {
super.baseSetup();
String topicName = "non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
int partitionedNum = 3;
admin.topics().createPartitionedTopic(topicName, partitionedNum);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("producer-name")
.create();
try {
admin.topics().trimTopic(topicName);
fail("should failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException.NotAllowedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,19 @@ CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String
*/
CompletableFuture<Void> triggerCompactionAsync(String topic);

/**
* Trigger topic trimming.
* @param topic The topic to trim
* @throws PulsarAdminException
*/
void trimTopic(String topic) throws PulsarAdminException;

/**
* Trigger topic trimming asynchronously.
* @param topic The topic to trim
*/
CompletableFuture<Void> trimTopicAsync(String topic);

/**
* Check the status of an ongoing compaction for a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ public CompletableFuture<Void> triggerCompactionAsync(String topic) {
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public void trimTopic(String topic) throws PulsarAdminException {
sync(() -> trimTopicAsync(topic));
}

@Override
public CompletableFuture<Void> trimTopicAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "trim");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,9 @@ public void topics() throws Exception {
cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1", "sub1", Optional.empty());

cmdTopics.run(split("trim-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).trimTopic("persistent://myprop/clust/ns1/ds1");

// jcommander is stateful, you cannot parse the same command twice
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b -p x=y,z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

jcommander.addCommand("trim-topic", new TrimTopic());

initDeprecatedCommands();
}

Expand Down Expand Up @@ -3111,4 +3113,15 @@ void run() throws PulsarAdminException {
getAdmin().topics().setSchemaValidationEnforced(topic, enable);
}
}
@Parameters(commandDescription = "Trim a topic")
private class TrimTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getAdmin().topics().trimTopic(topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ public enum NamespaceOperation {
UNSUBSCRIBE,

PACKAGES,
TRIM_TOPIC,
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ public enum TopicOperation {

SET_REPLICATED_SUBSCRIPTION_STATUS,
GET_REPLICATED_SUBSCRIPTION_STATUS,
TRIM_TOPIC,
}

0 comments on commit 29c244a

Please sign in to comment.