Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Avert extensive time consumption during table view construction #21170

Closed
wants to merge 3 commits into from
Closed

[fix][client] Avert extensive time consumption during table view construction #21170

wants to merge 3 commits into from

Conversation

liangyepianzhou
Copy link
Contributor

Motivation

If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.

Modification

In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

…truction

### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
@@ -235,20 +238,33 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();

CompletableFuture<Reader<T>> future = new CompletableFuture<>();
readAllExistingMessages(reader, future, startTime, messagesRead);
reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering do we have a test to cover this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote a test, but I deleted it. This case is too extreme, and it might become wholly useless or unstable due to the testing machine and the configuration of the test data.
Do you have any ideas to test it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Demogorgon314, I added a test by Mockito. Please take a look. Thanks.

@@ -235,20 +238,33 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();

CompletableFuture<Reader<T>> future = new CompletableFuture<>();
readAllExistingMessages(reader, future, startTime, messagesRead);
reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
Optional<TopicMessageId> optionalTopicMessageId = lastMessageIds.stream().max(Comparator.naturalOrder());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things to concern:

First case: working on partitioned topics:
Imagine:

  • topic-partiton-1 - containing 1 million messages
  • topic-partition-2 - containing 1 message

When a producer writes a new message to topic-partition-2 and it gets to TableView, the method will be stopped.

Second case: work with compaction

  • After the compaction, if the new client connects, the messages read first will have messageIds from the compacted ledger. Will compareTo work in that case?

Third case:

  • Comparison of MessageId when delivered in batches (this should probably be OK now)

I think that the algorithm should store every last message from each partition (first issue). Then the stop mechanism should be based on the exact message delivery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second case: work with compaction

  • After the compaction, if the new client connects, the messages read first will have messageIds from the compacted ledger. Will compareTo work in that case?

Messages read from a compacted topic by a normal client will be no different to messages read from a non-compacted topic, though there will be gaps in the message IDs. See https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the algorithm should store every last message from each partition (first issue). Then the stop mechanism should be based on the exact message delivery.

I totally agree. We only ensure that messages in the same partition are ordered, so the strategy of only judging the largest message ID is wrong. Thanks for your reminder.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to consider:
If the compacted ledger is created after the ledger that holds the backlog, it may have the higher ledger Id, right?

  • So you reed the latest message from the backlog
  • You start reading from the beginning (compacted ledger, first message)
  • The compacted ledger has higher ID, therefore the MessageId.equals won't work as expected

Is there a mechanism to prevent that?

if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) {
maxMessageIds.remove(msg.getTopicName());
}
if (maxMessageIds.size() == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (maxMessageIds.size() == 0) {
if (maxMessageIds.isEmpty()) {

Method readAllExistingMessagesMethod = TableViewImpl.class
.getDeclaredMethod("readAllExistingMessages", Reader.class);
readAllExistingMessagesMethod.setAccessible(true);
CompletableFuture<Reader<?>> future =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send more messages before invoking the readAllExistingMessages method to ensure we did not receive the new messages?

@liangyepianzhou liangyepianzhou closed this by deleting the head repository Sep 27, 2023
liangyepianzhou added a commit that referenced this pull request Nov 6, 2023
…truction (#21270)

Reopen #21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
Technoboy- pushed a commit that referenced this pull request Nov 10, 2023
…truction (#21270)

Reopen #21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
Technoboy- pushed a commit that referenced this pull request Nov 10, 2023
…truction (#21270)

Reopen #21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
nborisov pushed a commit to nborisov/pulsar that referenced this pull request Nov 13, 2023
…truction (apache#21270)

Reopen apache#21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…truction (apache#21270)

Reopen apache#21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…truction (apache#21270)

Reopen apache#21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants