Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Merged
merged 6 commits into from
Jan 2, 2025

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Nov 14, 2024

Implementation PR for PIP-393: #23601.

Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:

  • the memory occupation is high.
  • the code execution efficiency is low.
  • the redelivery time is not accurate.
  • multiple negative ack for messages in the same entry(batch) will interfere with each other.
    All of these problem is severe and need to be solved.

Memory occupation is high

After the improvement of #23582, we have reduce half more memory occupation
of NegativeAcksTracker by replacing HashMap with ConcurrentLongLongPairHashMap. With 100w entry, the memory occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation decrease from 1132Mb to 512Mb.
The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte.

But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h redelivery delay for each message,
the memory occupation of NegativeAcksTracker will be 3600*10000*53/1024/1024/1024=1.77GB, if the delay is 5h,
the required memory is 3600*10000*53/1024/1024/1024*5=8.88GB, which increase too fast.

Code execution efficiency is low

Currently, each time the timer task is triggered, it will iterate all the entries in NegativeAcksTracker.nackedMessages,
which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered.

Redelivery time is not accurate

Currently, the redelivery time is controlled by the timerIntervalNanos, which is 1/3 of the negativeAckRedeliveryDelay.
That means, if the negativeAckRedeliveryDelay is 1h, the check interval time will be 20min, which is unacceptable.

Multiple negative ack for messages in the same entry(batch) will interfere with each other

Currently, NegativeAcksTracker#nackedMessages map (ledgerId, entryId) to timestamp, which means multiple nacks from messages in the same batch share single one timestamp.
If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. msg1 will not be redelivered 10s later as the timestamp recorded in NegativeAcksTracker#nackedMessages is overrode by the second nack call.

we can reproduce this problem with test code below:

Consumer consumer = client.newConsumer()
                .topic("persistent://public/default/testNack")
                .subscriptionName("sub2")
                .subscriptionType(SubscriptionType.Shared)
                .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // fixed delay with 20s.
                .subscribe();
        // receive first message and nack it.
        Message msg = consumer.receive();
        MessageIdAdv batchMessageId = (MessageIdAdv) msg.getMessageId();
        int batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        consumer.negativeAcknowledge(msg);
        
        // receive the secode message and sleep for 10s, then nack it.
        msg = consumer.receive();
        batchMessageId = (MessageIdAdv) msg.getMessageId();
        batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        Thread.sleep(10000);
        consumer.negativeAcknowledge(msg);

We expect the second message redelivered 10s later than the first message, as it call nack 10s later than the first one.
However, we will receive two messages together.
image

You can also reproduce this problem with the test code in this PR: org.apache.pulsar.client.impl.NegativeAcksTest#testNegativeAcksWithBatch

Modifications

Refactor the NegativeAcksTracker to solve the above problems.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#64

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency.
Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.

@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

@thetumbled
Copy link
Member Author

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@thetumbled In certain cases when tracking existence (true/false), it's worth considering to use space efficient bit maps. In Pulsar, we use the RoaringBitmap library.
I think that it should be used for storing nacks.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I think that it should be used for storing nacks.

I guess it's not applicable in this case.

@thetumbled I checked the NegativeAcksTracker class and it seems that the actual key is (ledgerId, entryId).
The partitionIndex and timestamp are part of the value.
partitionIndex doesn't have to be a long value.

It's easy to implement (ledgerId, entryId) as map of maps.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

This is a very poor solution in the current implementation:

nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});

There should be a separate datastructure (a list or queue) which contains the entries in timestamp order. The benefit of that is that iterating could stop after the timestamp condition no longer holds.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

@thetumbled
Copy link
Member Author

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html.
For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html
or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

@thetumbled thetumbled changed the title [fix][client] fix multiple nack from messages in the same batch interfere each other. [improve][client] PIP-393: Improve performance of Negative Acknowledgement Nov 15, 2024
@thetumbled
Copy link
Member Author

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html. For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

I propose a pip to fix several issues with nack tracker, with a new data structure :

Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>();

This PR become the implementation PR for PIP-393: #23601.
I will implement PIP-393 soon.

pom.xml Outdated Show resolved Hide resolved
pulsar-client/pom.xml Outdated Show resolved Hide resolved
@thetumbled
Copy link
Member Author

There are some issues with the licence.

Run src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz
it.unimi.dsi-fastutil-core-8.5.14.jar unaccounted for in LICENSE

It looks like there are issues with the LICENSE/NOTICE.

Should we replace fastutil-core with fastutil? @lhotari

@lhotari
Copy link
Member

lhotari commented Nov 18, 2024

Should we replace fastutil-core with fastutil?

@thetumbled That's fine to complete the experiment. For a proper solution there will need to be a way to avoid the shaded jar files growing significantly. The proper solution would require a separate module for fastutil which minimizes the number of classes. I tried to explain that in my previous comment, #23600 (comment) . Minimizing all classes would most likely be a breaking change. Since you cannot specify minimizeJar for a specific dependency in maven-shade-plugin configuration, it's necessary to have a separate intermediate module for minimizing just fastutil.

@lhotari lhotari changed the title [improve][pip] PIP-393: Improve performance of Negative Acknowledgement [improve][client] PIP-393: Improve performance of Negative Acknowledgement Nov 26, 2024
@thetumbled
Copy link
Member Author

@thetumbled Please don't include [pip] in the title of the implementation PR. It's only for the PIP document PR.

Thanks for reminder.

@thetumbled thetumbled changed the title [improve][client] PIP-393: Improve performance of Negative Acknowledgement [improve][client] PIP-393: Improve performance of Negative Acknowledgement Part1 Dec 5, 2024
@thetumbled
Copy link
Member Author

Hi, fellows in pulsar communtiy. Could you help to move this feature forward, thanks.
@lhotari @nodece @Technoboy- @codelipenghui @dao-jun @poorbarcode @congbobo184

@thetumbled
Copy link
Member Author

Failed to execute goal org.owasp:dependency-check-maven:10.0.2:aggregate (default) on project pulsar:
Error:
Error: One or more dependencies were identified with vulnerabilities that have a CVSS score greater than or equal to '7.0':
Error:
Error: alluxio-core-common-2.9.3.jar: CVE-2023-38889(9.8)
Error: azure-identity-1.10.4.jar: CVE-2023-36415(8.8)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.google.protobuf/protobuf-java/pom.xml: CVE-2024-7254(8.699999809265137)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.squareup.okio/okio/pom.xml: CVE-2023-3635(7.5)
Error: clickhouse-jdbc-0.4.6-all.jar: CVE-2022-44010(7.5)
Error: grpc-core-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-core-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: json-20201115.jar: CVE-2023-5072(7.5), CVE-2022-45688(7.5)
Error: kafka-clients-2.8.1.jar: CVE-2024-31141(7.099999904632568)
Error: kafka-clients-3.4.0.jar: CVE-2024-31141(7.099999904632568)
Error: kaml-0.20.0.jar: CVE-2023-28118(7.5)
Error: lucene-core-9.11.1.jar: CVE-2024-45772(8.0)
Error: mockserver-netty-no-dependencies-5.14.0.jar: CVE-2021-32827(9.6)
Error: mysql-connector-java-8.0.30.jar: CVE-2023-22102(8.3)
Error: nimbus-jose-jwt-9.30.2.jar: CVE-2023-52428(7.5)

Do you know the reason why this error throw out? @lhotari

@lhotari
Copy link
Member

lhotari commented Dec 24, 2024

Failed to execute goal org.owasp:dependency-check-maven:10.0.2:aggregate (default) on project pulsar:
Error:
Error: One or more dependencies were identified with vulnerabilities that have a CVSS score greater than or equal to '7.0':
Error:
Error: alluxio-core-common-2.9.3.jar: CVE-2023-38889(9.8)
Error: azure-identity-1.10.4.jar: CVE-2023-36415(8.8)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.google.protobuf/protobuf-java/pom.xml: CVE-2024-7254(8.699999809265137)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.squareup.okio/okio/pom.xml: CVE-2023-3635(7.5)
Error: clickhouse-jdbc-0.4.6-all.jar: CVE-2022-44010(7.5)
Error: grpc-core-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-core-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: json-20201115.jar: CVE-2023-5072(7.5), CVE-2022-45688(7.5)
Error: kafka-clients-2.8.1.jar: CVE-2024-31141(7.099999904632568)
Error: kafka-clients-3.4.0.jar: CVE-2024-31141(7.099999904632568)
Error: kaml-0.20.0.jar: CVE-2023-28118(7.5)
Error: lucene-core-9.11.1.jar: CVE-2024-45772(8.0)
Error: mockserver-netty-no-dependencies-5.14.0.jar: CVE-2021-32827(9.6)
Error: mysql-connector-java-8.0.30.jar: CVE-2023-22102(8.3)
Error: nimbus-jose-jwt-9.30.2.jar: CVE-2023-52428(7.5)

Do you know the reason why this error throw out? @lhotari

@thetumbled We haven't addressed these issues. The OWASP dependency check has been failing for quite some time. Most of these issues are in Pulsar IO Connectors. For GRPC, there has been discussions on the dev mailing list.

@thetumbled
Copy link
Member Author

Failed to execute goal org.owasp:dependency-check-maven:10.0.2:aggregate (default) on project pulsar:
Error:
Error: One or more dependencies were identified with vulnerabilities that have a CVSS score greater than or equal to '7.0':
Error:
Error: alluxio-core-common-2.9.3.jar: CVE-2023-38889(9.8)
Error: azure-identity-1.10.4.jar: CVE-2023-36415(8.8)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.google.protobuf/protobuf-java/pom.xml: CVE-2024-7254(8.699999809265137)
Error: clickhouse-jdbc-0.4.6-all.jar/META-INF/maven/com.squareup.okio/okio/pom.xml: CVE-2023-3635(7.5)
Error: clickhouse-jdbc-0.4.6-all.jar: CVE-2022-44010(7.5)
Error: grpc-core-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-core-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.37.0.jar: CVE-2023-44487(7.5), CVE-2023-4785(7.5), CVE-2023-33953(7.5)
Error: grpc-protobuf-1.56.1.jar: CVE-2023-44487(7.5), CVE-2023-33953(7.5)
Error: json-20201115.jar: CVE-2023-5072(7.5), CVE-2022-45688(7.5)
Error: kafka-clients-2.8.1.jar: CVE-2024-31141(7.099999904632568)
Error: kafka-clients-3.4.0.jar: CVE-2024-31141(7.099999904632568)
Error: kaml-0.20.0.jar: CVE-2023-28118(7.5)
Error: lucene-core-9.11.1.jar: CVE-2024-45772(8.0)
Error: mockserver-netty-no-dependencies-5.14.0.jar: CVE-2021-32827(9.6)
Error: mysql-connector-java-8.0.30.jar: CVE-2023-22102(8.3)
Error: nimbus-jose-jwt-9.30.2.jar: CVE-2023-52428(7.5)

Do you know the reason why this error throw out? @lhotari

@thetumbled We haven't addressed these issues. The OWASP dependency check has been failing for quite some time. Most of these issues are in Pulsar IO Connectors. For GRPC, there has been discussions on the dev mailing list.

Can we proceed to review this pr without handing these issues? and one more approve is needed, could you help to review it? thanks.

@lhotari
Copy link
Member

lhotari commented Dec 27, 2024

Can we proceed to review this pr without handing these issues? and one more approve is needed, could you help to review it? thanks.

Yes, Owasp check failure doesn't block merging of the PR.

@thetumbled thetumbled requested a review from lhotari January 2, 2025 06:26
@codecov-commenter
Copy link

codecov-commenter commented Jan 2, 2025

Codecov Report

Attention: Patch coverage is 88.23529% with 6 lines in your changes missing coverage. Please review.

Project coverage is 74.22%. Comparing base (bbc6224) to head (a03f415).
Report is 826 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pulsar/client/impl/NegativeAcksTracker.java 88.00% 3 Missing and 3 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23600      +/-   ##
============================================
+ Coverage     73.57%   74.22%   +0.65%     
+ Complexity    32624    32193     -431     
============================================
  Files          1877     1853      -24     
  Lines        139502   143450    +3948     
  Branches      15299    16290     +991     
============================================
+ Hits         102638   106479    +3841     
+ Misses        28908    28594     -314     
- Partials       7956     8377     +421     
Flag Coverage Δ
inttests 26.70% <7.84%> (+2.12%) ⬆️
systests 23.23% <7.84%> (-1.10%) ⬇️
unittests 73.75% <88.23%> (+0.91%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 79.91% <100.00%> (+2.34%) ⬆️
...ar/client/impl/conf/ConsumerConfigurationData.java 72.72% <ø> (-19.83%) ⬇️
...apache/pulsar/client/impl/NegativeAcksTracker.java 86.74% <88.00%> (-11.30%) ⬇️

... and 1024 files with indirect coverage changes

@lhotari
Copy link
Member

lhotari commented Jan 2, 2025

@thetumbled I'll push the shading configuration changes directly to this PR.

@lhotari
Copy link
Member

lhotari commented Jan 2, 2025

@thetumbled I added 2 commits to this PR to address the shading. The first commit abcfe5c adds a solution for minimizing the number of classes that are included from fastutil to the shaded jars. The second commit a03f415 addresses RoaringBitmap shading which was missing.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari changed the title [improve][client] PIP-393: Improve performance of Negative Acknowledgement Part1 [improve][client] PIP-393: Improve performance of Negative Acknowledgement Jan 2, 2025
@lhotari lhotari added this to the 4.1.0 milestone Jan 2, 2025
@lhotari lhotari merged commit d377bc9 into apache:master Jan 2, 2025
53 of 54 checks passed
lhotari pushed a commit that referenced this pull request Jan 2, 2025
…ement (#23600)

Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit d377bc9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants