Skip to content

Commit

Permalink
[improve] [pip] PIP-344 Correct the behavior of the public API pulsar…
Browse files Browse the repository at this point in the history
…Client.getPartitionsForTopic(topicName) (apache#22182)
  • Loading branch information
poorbarcode authored Mar 25, 2024
1 parent cba1600 commit afe4261
Showing 1 changed file with 127 additions and 0 deletions.
127 changes: 127 additions & 0 deletions pip/pip-344.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# PIP-344: Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName)

# Background knowledge

**Topic auto-creation**
- The partitioned topic auto-creation is dependent on `pulsarClient.getPartitionsForTopic`
- It triggers partitioned metadata creation by `pulsarClient.getPartitionsForTopic`
- And triggers the topic partition creation by producers' registration and consumers' registration.
- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will automatically create the partitioned topic metadata if it does not exist, either using `HttpLookupService` or `BinaryProtoLookupService`.

**Now `pulsarClient.getPartitionsForTopic`'s behavior**
| case | broker allow `auto-create` | param allow <br> `create if not exists` | non-partitioned topic | partitioned topic | current behavior |
| --- | --- | --- | --- | --- | --- |
| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 0`<br> Binary API: `partitions: 0` |
| 2 | `true/false` | `true/false` | | `exists: true` <br> `partitions: 3` | REST API: `partitions: 3`<br> Binary API: `partitions: 3` |
| 3 | `true` | `true` | | | REST API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `partitions: 3` <br> Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `partitions: 3` <br> |
| 4 | `true` | `false` | | | REST API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> Binary API: <br> &nbsp;&nbsp;not support <br> |
| 5 | `false` | `true` | | | REST API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> Binary API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> |

- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in `broker.conf`.
- Param allow <br> `create if not exists`
- Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an optional param which named `checkAllowAutoCreation,` and the default value is `false`.
- Regarding the `pulsar-admin` API, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `false` and can not be set manually.
- Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `true` and can not be set manually.
- Regarding the client API `BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to create partitioned metadata.
- `REST API & HTTP API`: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP API `PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and `HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call `BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API.

# Motivation

The param `create if not exists` of the Binary API is always `true.`

- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always tries to create the partitioned metadata, but the API name is `getxxx`.
- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a `0` partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed [here](https://github.com/apache/pulsar/issues/8813) before.
- BTW, [flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227) is using this API to create partitioned topic metadata.

# Goals

- Regarding the case 4: Add a new API `PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature that just get partitioned topic metadata and do not try to create one. See detail below.
- Regarding the case 5: Instead of returning a `0` partitioned metadata, respond to a not found error when calling `pulsarClient.getPartitionsForTopic(String)` if the topic does not exist.

# Detailed Design

## Public-facing Changes

When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will not create the partitioned metadata anymore.

### Public API
**LookupService.java**
```java

- CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName);

/**
* 1. Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; return "{partition: 0}" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
+ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled);
```

The behavior of the new API `LookupService.getPartitionedTopicMetadata(TopicName, boolean)`.

| case | client-side param: `createIfAutoCreationEnabled` | non-partitioned topic | partitioned topic | broker-side: topic auto-creation strategy | current behavior |
|------|--------------------------------------------------|-----------------------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| 1 | `true/false` | `exists: true` | | | REST/Binary API: `{partitions: 0}` |
| 2 | `true/false` | | `exists: true` <br> `partitions: 2` | | REST/Binary API: `{partitions: 2}` |
| 3 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `non-partitioned` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `{partitions: 0}` |
| 4 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `partitioned` <br> `defaultNumPartitions`: `2` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `{partitions: 2}` |
| 5 | `false` | | | `allowAutoTopicCreation`: `true` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |
| 6 | `true` | | | `allowAutoTopicCreation`: `false` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |


**PulsarClient.java**
```java
// This API existed before. Not change it, thus ensuring compatibility.
+ @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(TopicName, boolean)}.
- CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+ default CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
+ getPartitionsForTopic(topic, true);
+ }

/**
* 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled);
```

The behavior of the new API `PulsarClient.getPartitionsForTopic(String, boolean)`.

| case | client-side param: `createIfAutoCreationEnabled` | non-partitioned topic | partitioned topic | broker-side: topic autp-creation strategy | current behavior |
|------|--------------------------------------------------|----------------------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 | `true/false` | `exists: true` | | | REST/Binary API: `["{tenat}/{ns}/topic"]` |
| 2 | `true/false` | | `exists: true` <br> `partitions: 2` | | REST/Binary `API`: `["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]` |
| 3 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `non-partitioned` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `["{tenat}/{ns}/topic"]` |
| 4 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `partitioned` <br> `defaultNumPartitions`: `2` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]` |
| 5 | `false` | | | `allowAutoTopicCreation`: `true` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |
| 5 | `true` | | | `allowAutoTopicCreation`: `false` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |



### Binary protocol

**CommandPartitionedTopicMetadata**
```
message CommandPartitionedTopicMetadata {
+ optional bool metadata_auto_creation_enabled = 6 [default = true];
}
```

**FeatureFlags**
```
message FeatureFlags {
+ optional bool supports_binary_api_get_partitioned_meta_with_param_created_false = 5 [default = false];
}
```

# Backward & Forward Compatibility

- Old version client and New version Broker: The client will call the old API.

- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false.

0 comments on commit afe4261

Please sign in to comment.