Skip to content

Commit

Permalink
[improve][pip] PIP-303: Add optional parameters for getPartitionedSta…
Browse files Browse the repository at this point in the history
…ts (apache#21228)
  • Loading branch information
crossoverJie authored Nov 20, 2023
1 parent e1318b2 commit 9fbb92a
Showing 1 changed file with 224 additions and 0 deletions.
224 changes: 224 additions & 0 deletions pip/pip-303.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@

# Motivation

When a topic has a large number of producers or consumers (over 1k), querying the `pulsarAdmin.topics().getPartitionedStats()` interface is slow and the response size is also large.
As a result, it's essential to give users the option of querying producer and consumer information.



# Goals

## In Scope

Add the API for `org.apache.pulsar.client.admin.Topics`
```java
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
String topic, boolean perPartition, GetStatsOptions getStatsOptions);

CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions);
```



## Out of Scope

None.


# High Level Design

Implement the `getPartitionedStatsAsync` method, and add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/partitioned-stats` API in `PersistentTopics` and `NonPersistentTopics`.

# Detailed Design

## Design & Implementation Details


Add two fields for `org.apache.pulsar.client.admin.GetStatsOptions`
```java
@Data
@Builder
public class GetStatsOptions {
/**
* Whether to exclude publishers.
*/
private final boolean excludePublishers;

/**
* Whether to exclude consumers.
*/
private final boolean excludeConsumers;

}
```

Implement the `getPartitionedStatsAsync` and `getStatsAsync` interface for `org.apache.pulsar.client.admin.internal.TopicsImpl`
```java
@Override
public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions getStatsOptions){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition)
.queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
.queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
.queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
.queryParam("excludePublishers", getStatsOptions.isExcludePublishers())
.queryParam("excludeConsumers", getStatsOptions.isExcludeConsumers());
}

@Override
public CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions getStatsOptions){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "stats")
.queryParam("getPreciseBacklog", getStatsOptions.isGetPreciseBacklog())
.queryParam("subscriptionBacklogSize", getStatsOptions.isSubscriptionBacklogSize())
.queryParam("getEarliestTimeInBacklog", getStatsOptions.isGetEarliestTimeInBacklog());
.queryParam("excludePublishers",getStatsOptions.isExcludePublishers())
.queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers());
}
```

Add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/partitioned-stats` API
```java
@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
public void getPartitionedStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers")
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)

```

Add the `excludePublishers` and `excludeConsumers` parameters to `{tenant}/{namespace}/{topic}/stats` API
```java
@GET
@Path("{tenant}/{namespace}/{topic}/stats")
public void getStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If exclude the publishers"),
@QueryParam("excludePublishers") @DefaultValue("false") boolean excludePublishers,
@ApiParam(value = "If exclude the consumers")
@QueryParam("excludeConsumers") @DefaultValue("false") boolean excludeConsumers)
```

Add a new method for `org.apache.pulsar.broker.service.Topic`
```java
CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);

@Data
@Builder
public class GetStatsOptions {
/**
* Set to true to get precise backlog, Otherwise get imprecise backlog.
*/
private final boolean getPreciseBacklog;

/**
* Whether to get backlog size for each subscription.
*/
private final boolean subscriptionBacklogSize;

/**
* Whether to get the earliest time in backlog.
*/
private final boolean getEarliestTimeInBacklog;

/**
* Whether to exclude publishers.
*/
private final boolean excludePublishers;

/**
* Whether to exclude consumers.
*/
private final boolean excludeConsumers;

}
```

Add the following logic in `org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats` and `org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats`:

```java
if (!excludePublishers){
stats.addPublisher(publisherStats);
}

if (!excludeConsumers){
subStats.consumers.add(consumerStats);
}
```

## Public-facing Changes


### Public API

### Binary protocol

### Configuration

### CLI

### Metrics



# Monitoring



# Security Considerations


# Backward & Forward Compatibility

## Revert


## Upgrade

# Alternatives

# General Notes

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/c92043zq6lyrsd5z1hnln48mx858n7vj
* Mailing List voting thread: https://lists.apache.org/thread/hjw3y7h5vd0x7st6zslj3btjcd6yf1lx

0 comments on commit 9fbb92a

Please sign in to comment.