-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Avert extensive time consumption during table view construction #21170
Conversation
…truction ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
@@ -235,20 +238,33 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { | |||
AtomicLong messagesRead = new AtomicLong(); | |||
|
|||
CompletableFuture<Reader<T>> future = new CompletableFuture<>(); | |||
readAllExistingMessages(reader, future, startTime, messagesRead); | |||
reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering do we have a test to cover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote a test, but I deleted it. This case is too extreme, and it might become wholly useless or unstable due to the testing machine and the configuration of the test data.
Do you have any ideas to test it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Demogorgon314, I added a test by Mockito. Please take a look. Thanks.
@@ -235,20 +238,33 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { | |||
AtomicLong messagesRead = new AtomicLong(); | |||
|
|||
CompletableFuture<Reader<T>> future = new CompletableFuture<>(); | |||
readAllExistingMessages(reader, future, startTime, messagesRead); | |||
reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { | |||
Optional<TopicMessageId> optionalTopicMessageId = lastMessageIds.stream().max(Comparator.naturalOrder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some things to concern:
First case: working on partitioned topics:
Imagine:
topic-partiton-1
- containing 1 million messagestopic-partition-2
- containing 1 message
When a producer writes a new message to topic-partition-2 and it gets to TableView, the method will be stopped.
Second case: work with compaction
- After the compaction, if the new client connects, the messages read first will have messageIds from the compacted ledger. Will compareTo work in that case?
Third case:
- Comparison of MessageId when delivered in batches (this should probably be OK now)
I think that the algorithm should store every last message from each partition (first issue). Then the stop mechanism should be based on the exact message delivery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second case: work with compaction
- After the compaction, if the new client connects, the messages read first will have messageIds from the compacted ledger. Will compareTo work in that case?
Messages read from a compacted topic by a normal client will be no different to messages read from a non-compacted topic, though there will be gaps in the message IDs. See https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the algorithm should store every last message from each partition (first issue). Then the stop mechanism should be based on the exact message delivery.
I totally agree. We only ensure that messages in the same partition are ordered, so the strategy of only judging the largest message ID is wrong. Thanks for your reminder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to consider:
If the compacted ledger is created after the ledger that holds the backlog, it may have the higher ledger Id, right?
- So you reed the latest message from the backlog
- You start reading from the beginning (compacted ledger, first message)
- The compacted ledger has higher ID, therefore the MessageId.equals won't work as expected
Is there a mechanism to prevent that?
if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) { | ||
maxMessageIds.remove(msg.getTopicName()); | ||
} | ||
if (maxMessageIds.size() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (maxMessageIds.size() == 0) { | |
if (maxMessageIds.isEmpty()) { |
Method readAllExistingMessagesMethod = TableViewImpl.class | ||
.getDeclaredMethod("readAllExistingMessages", Reader.class); | ||
readAllExistingMessagesMethod.setAccessible(true); | ||
CompletableFuture<Reader<?>> future = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we send more messages before invoking the readAllExistingMessages
method to ensure we did not receive the new messages?
…truction (#21270) Reopen #21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
…truction (#21270) Reopen #21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
…truction (#21270) Reopen #21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
…truction (apache#21270) Reopen apache#21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
…truction (apache#21270) Reopen apache#21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
…truction (apache#21270) Reopen apache#21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
Motivation
If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: