Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Do not write replicated snapshot marker when the topic which is not enable replication #21495

Merged
merged 9 commits into from
Nov 14, 2023

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Nov 1, 2023

Motivation

PIP 33 introduces a new concept Replicated subscriptions. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.


The mark will be written in the following two ways:

  1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
    public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
    this.topic = topic;
    this.localCluster = localCluster;
    timer = topic.getBrokerService().pulsar().getExecutor()
    .scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0,
    topic.getBrokerService().pulsar().getConfiguration()
    .getReplicatedSubscriptionsSnapshotFrequencyMillis(),
    TimeUnit.MILLISECONDS);
    }

    void start() {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Starting new snapshot {} - Clusters: {}", controller.topic().getName(), snapshotId,
    missingClusters);
    }
    startTimeMillis = clock.millis();
    controller.writeMarker(
    Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
    }
  2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves, if true, It will write a marker.
    public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
    if (log.isDebugEnabled()) {
    log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
    snapshot.getClustersList().stream()
    .map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
    cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
    .collect(Collectors.toList()));
    }
    Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
    for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
    ClusterMessageId cmid = snapshot.getClusterAt(i);
    clusterIds.put(cmid.getCluster(), cmid.getMessageId());
    }
    ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
    writeMarker(subscriptionUpdate);
    }
    private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
    // if replicator producer is already closed, restart it to send snapshot response
    Replicator replicator = topic.getReplicators().get(request.getSourceCluster());
    if (!replicator.isConnected()) {
    topic.startReplProducers();
    }
    // Send response containing the current last written message id. The response
    // marker we're publishing locally and then replicating will have a higher
    // message id.
    PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition();
    if (log.isDebugEnabled()) {
    log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
    }
    ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
    request.getSnapshotId(),
    request.getSourceCluster(),

Modifications

According to the topic policy to create or remove ReplicatedSubscriptionsController of this topic.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 1, 2023
1. add dataProvider for topic policy and namespacePolicy
2. replace awaitility with sleep 3s to check the backlog will not change after disable replication.
3. cover consumer acknowledge trigger write new marker cases.
1. Abstract the disable and enable replication cases into separate methods to facilitate testing.
2. Make sure there are two snapshot before testing acknowledge messages case.
3. Add a todo `fix replication can not be enabled at topic level`
@liangyepianzhou
Copy link
Contributor Author

The DataProvider of isTopicPolicyEnabled will be updated after the issue is fixed.

@liangyepianzhou liangyepianzhou merged commit 2322004 into apache:master Nov 14, 2023
52 checks passed
@liangyepianzhou liangyepianzhou deleted the replicatedsub branch November 14, 2023 12:35
liangyepianzhou added a commit that referenced this pull request Nov 17, 2023
…which is not enable replication (#21495)

[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.

(cherry picked from commit 2322004)
@Technoboy- Technoboy- added this to the 3.2.0 milestone Nov 20, 2023
@lhotari
Copy link
Member

lhotari commented Nov 24, 2023

Please also review #16651. That's a long out-standing PR that fixes issues with replicated snapshot marker messages.

Technoboy- pushed a commit that referenced this pull request Nov 28, 2023
…which is not enable replication (#21495)

### Motivation
[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

### Modifications
According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Dec 2, 2023
…which is not enable replication (apache#21495)

[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.

(cherry picked from commit 2322004)
Technoboy- pushed a commit that referenced this pull request Dec 4, 2023
…which is not enable replication (#21495)

### Motivation
[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

### Modifications
According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…which is not enable replication (apache#21495)

### Motivation
[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

### Modifications
According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…which is not enable replication (apache#21495)

### Motivation
[PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing.

---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves,  if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150

### Modifications
According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants