From 9fbb92a69b5f392212e086cbac8cd24c49fecb29 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 20 Nov 2023 16:34:31 +0800 Subject: [PATCH] [improve][pip] PIP-303: Add optional parameters for getPartitionedStats (#21228) --- pip/pip-303.md | 224 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 pip/pip-303.md diff --git a/pip/pip-303.md b/pip/pip-303.md new file mode 100644 index 0000000000000..53861631cf9d0 --- /dev/null +++ b/pip/pip-303.md @@ -0,0 +1,224 @@ + +# Motivation + +When a topic has a large number of producers or consumers (over 1k), querying the `pulsarAdmin.topics().getPartitionedStats()` interface is slow and the response size is also large. +As a result, it's essential to give users the option of querying producer and consumer information. + + + +# Goals + +## In Scope + +Add the API for `org.apache.pulsar.client.admin.Topics` +```java +CompletableFuture getPartitionedStatsAsync( + String topic, boolean perPartition, GetStatsOptions getStatsOptions); + +CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions); +``` + + + +## Out of Scope + +None. + + +# High Level Design + +Implement the `getPartitionedStatsAsync` method, and add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/partitioned-stats` API in `PersistentTopics` and `NonPersistentTopics`. + +# Detailed Design + +## Design & Implementation Details + + +Add two fields for `org.apache.pulsar.client.admin.GetStatsOptions` +```java +@Data +@Builder +public class GetStatsOptions { + /** + * Whether to exclude publishers. + */ + private final boolean excludePublishers; + + /** + * Whether to exclude consumers. + */ + private final boolean excludeConsumers; + +} +``` + +Implement the `getPartitionedStatsAsync` and `getStatsAsync` interface for `org.apache.pulsar.client.admin.internal.TopicsImpl` +```java +@Override +public CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){ + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "partitioned-stats"); + path = path.queryParam("perPartition", perPartition) + .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog()) + .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize()) + .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog()); + .queryParam("excludePublishers", getStatsOptions.isExcludePublishers()) + .queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers()); +} + +@Override +public CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions){ + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "stats") + .queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog()) + .queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize()) + .queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog()); + .queryParam("excludePublishers",getStatsOptions.isExcludePublishers()) + .queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers()); +} +``` + +Add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/partitioned-stats` API +```java +@GET +@Path("{tenant}/{namespace}/{topic}/partitioned-stats") +public void getPartitionedStats( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Get per partition stats") + @QueryParam("perPartition") @DefaultValue("true") boolean perPartition, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "If return precise backlog or imprecise backlog") + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, + @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful " + + "not to use when there's heavy traffic.") + @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize, + @ApiParam(value = "If return the earliest time in backlog") + @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, + @ApiParam(value = "If exclude the publishers") + @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers, + @ApiParam(value = "If exclude the consumers") + @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) + +``` + +Add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/stats` API +```java +@GET +@Path("{tenant}/{namespace}/{topic}/stats") +public void getStats( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "If return precise backlog or imprecise backlog") + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, + @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful " + + "not to use when there's heavy traffic.") + @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize, + @ApiParam(value = "If return time of the earliest message in backlog") + @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, + @ApiParam(value = "If exclude the publishers"), + @QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers, + @ApiParam(value = "If exclude the consumers") + @QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers) +``` + +Add a new method for `org.apache.pulsar.broker.service.Topic` +```java +CompletableFuture asyncGetStats(GetStatsOptions getStatsOptions); + +@Data +@Builder +public class GetStatsOptions { + /** + * Set to true to get precise backlog, Otherwise get imprecise backlog. + */ + private final boolean getPreciseBacklog; + + /** + * Whether to get backlog size for each subscription. + */ + private final boolean subscriptionBacklogSize; + + /** + * Whether to get the earliest time in backlog. + */ + private final boolean getEarliestTimeInBacklog; + + /** + * Whether to exclude publishers. + */ + private final boolean excludePublishers; + + /** + * Whether to exclude consumers. + */ + private final boolean excludeConsumers; + +} +``` + +Add the following logic in `org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats` and `org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats`: + +```java + if (!excludePublishers){ + stats.addPublisher(publisherStats); + } + + if (!excludeConsumers){ + subStats.consumers.add(consumerStats); + } +``` + +## Public-facing Changes + + +### Public API + +### Binary protocol + +### Configuration + +### CLI + +### Metrics + + + +# Monitoring + + + +# Security Considerations + + +# Backward & Forward Compatibility + +## Revert + + +## Upgrade + +# Alternatives + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/c92043zq6lyrsd5z1hnln48mx858n7vj +* Mailing List voting thread: https://lists.apache.org/thread/hjw3y7h5vd0x7st6zslj3btjcd6yf1lx