Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][admin] PIP-219 Part-1 Add admin API for trimming topic #19094

Merged
merged 7 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GET_BUNDLE:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
case UNSUBSCRIBE:
case TRIM_TOPIC:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
namespaceName, role, authData, AuthAction.consume);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4612,6 +4612,69 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut
return null;
});
}
protected CompletableFuture<Void> internalTrimTopic(AsyncResponse asyncResponse, boolean authoritative) {
if (!topicName.isPersistent()) {
log.info("[{}] Trim on a non-persistent topic {} is not allowed", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Trim on a non-persistent topic is not allowed"));
return null;
}
if (topicName.isPartitioned()) {
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC).thenCompose((x)
-> trimNonPartitionedTopic(asyncResponse, topicName, authoritative));
}
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC)
.thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return trimPartitionedTopic(asyncResponse, metadata);
}
return trimNonPartitionedTopic(asyncResponse, topicName, authoritative);
});
}

private CompletableFuture<Void> trimNonPartitionedTopic(AsyncResponse asyncResponse,
TopicName topicName, boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.info("[{}] Trim on a non-persistent topic {} is not allowed", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Trim on a non-persistent topic is not allowed"));
return CompletableFuture.completedFuture(null);
}
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
if (managedLedger == null) {
asyncResponse.resume(null);
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<>();
managedLedger.trimConsumedLedgersInBackground(result);
return result.whenComplete((res, e) -> {
if (e != null) {
asyncResponse.resume(e);
} else {
asyncResponse.resume(res);
}
});
});
}

private CompletableFuture<Void> trimPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata metadata) {
List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicNamePartition, e);
throw new RestException(e);
}
}
return FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
}

protected CompletableFuture<DispatchRateImpl> internalGetDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3269,6 +3269,44 @@ public void getLastMessageId(
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/trim")
@ApiOperation(value = " Trim a topic")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void trimTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Get dispatch rate configuration for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -179,4 +184,74 @@ public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
assertEquals(messageIdAfterTrim, MessageId.earliest);

}

@Test
public void TestAdminTrimLedgers() throws Exception {
conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
conf.setDefaultNumberOfNamespaceBundles(1);
super.baseSetup();
final String topicName = "persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
final String subscriptionName = "my-sub";
final int maxEntriesPerLedger = 2;
final int partitionedNum = 3;

admin.topics().createPartitionedTopic(topicName, partitionedNum);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("producer-name")
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
for (int i = 0; i < partitionedNum; i++) {
String topic = TopicName.get(topicName).getPartition(i).toString();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
}
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
int msgNum = 50;
for (int i = 0; i < msgNum; i++) {
producer.send(new byte[0]);
}
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() > 1);
for (int i = 0; i < msgNum; i++) {
Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
}
//consumed ledger should be cleaned
admin.topics().trimTopic(topicName);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));

}

@Test
public void trimNonPersistentTopic() throws Exception {
super.baseSetup();
String topicName = "non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
int partitionedNum = 3;
admin.topics().createPartitionedTopic(topicName, partitionedNum);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("producer-name")
.create();
try {
admin.topics().trimTopic(topicName);
fail("should failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException.NotAllowedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,19 @@ CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String
*/
CompletableFuture<Void> triggerCompactionAsync(String topic);

/**
* Trigger topic trimming.
* @param topic The topic to trim
* @throws PulsarAdminException
*/
void trimTopic(String topic) throws PulsarAdminException;

/**
* Trigger topic trimming asynchronously.
* @param topic The topic to trim
*/
CompletableFuture<Void> trimTopicAsync(String topic);

/**
* Check the status of an ongoing compaction for a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ public CompletableFuture<Void> triggerCompactionAsync(String topic) {
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public void trimTopic(String topic) throws PulsarAdminException {
sync(() -> trimTopicAsync(topic));
}

@Override
public CompletableFuture<Void> trimTopicAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "trim");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,9 @@ public void topics() throws Exception {
cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1", "sub1", Optional.empty());

cmdTopics.run(split("trim-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).trimTopic("persistent://myprop/clust/ns1/ds1");

// jcommander is stateful, you cannot parse the same command twice
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b -p x=y,z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

jcommander.addCommand("trim-topic", new TrimTopic());

initDeprecatedCommands();
}

Expand Down Expand Up @@ -3111,4 +3113,15 @@ void run() throws PulsarAdminException {
getAdmin().topics().setSchemaValidationEnforced(topic, enable);
}
}
@Parameters(commandDescription = "Trim a topic")
private class TrimTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
getAdmin().topics().trimTopic(topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ public enum NamespaceOperation {
UNSUBSCRIBE,

PACKAGES,
TRIM_TOPIC,
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ public enum TopicOperation {

SET_REPLICATED_SUBSCRIPTION_STATUS,
GET_REPLICATED_SUBSCRIPTION_STATUS,
TRIM_TOPIC,
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}