Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix getting stuck when adding producers #12202

Conversation

wuzhanpeng
Copy link
Contributor

Motivation

In our production environment, when the broker receives a large number of PRODUCER requests in a short period of time, we have observed that the broker will have a loop waiting problem when handling these requests, which will cause the broker to get stuck in severe cases and cause a huge amonut of other requests to time out.

To simplify the description of the problem, we assume that multiple producers initiate PRODUCER requests to a topic at the same time. As we can see in AbstractTopic#addProducer, we can simplify the process so that the process of addProducer in each PRODUCER request is broken down into:

  1. acquire the lock in thread#1
  2. load data from zk in thread#2 with timeout (i.e. AbstractTopic#isProducersExceeded in internal adding producer)
  3. return back to thread#1 to release the lock

It should be noted that these 3 processes are serial.

Assuming that the core size of the thread pool(actually is ForkJoinPool.commonPool()) that processes the above threads is only 1, and only one thread can successfully obtain the lock(AbstractTopic#lock) in the simultaneous PRODUCER requests, the remaining threads must be queued in the submission queue of the thread pool. Unfortunately, there is a high probability that thread#2 will be put into the queue waiting for scheduling. In this situation, the thread#1 that acquired the lock cannot complete because it needs to wait for the thread#2, and the other threads that have not acquired the lock need to acquire the lock first. This process cannot continue until the thread#2 times out and throws an exception.

For jstack result, we can easily see

"ForkJoinPool.commonPool-worker-110" #482 daemon prio=5 os_prio=0 tid=0x00007fd714021000 nid=0x61a3 waiting on condition  [0x00007fd562772000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00000006284caad0> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
        at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1798)
        at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3146)
        at java.util.concurrent.CompletableFuture.timedGet([email protected]/CompletableFuture.java:1868)
        at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2021)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.broker.service.AbstractTopic.isProducersExceeded(AbstractTopic.java:156)
        at org.apache.pulsar.broker.service.AbstractTopic.internalAddProducer(AbstractTopic.java:629)
        at org.apache.pulsar.broker.service.AbstractTopic.lambda$addProducer$8(AbstractTopic.java:405)
        at org.apache.pulsar.broker.service.AbstractTopic$$Lambda$1433/1422007940.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
        at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
        at org.apache.pulsar.broker.service.AbstractTopic.addProducer(AbstractTopic.java:392)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.addProducer(PersistentTopic.java:540)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$22(ServerCnx.java:1233)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$1428/932296811.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
        at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$6(BookkeeperSchemaStorage.java:217)
        at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$1421/1611023719.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniHandle([email protected]/CompletableFuture.java:930)
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire([email protected]/CompletableFuture.java:907)
        at java.util.concurrent.CompletableFuture$Completion.exec([email protected]/CompletableFuture.java:479)
        at java.util.concurrent.ForkJoinTask.doExec([email protected]/ForkJoinTask.java:290)
        at java.util.concurrent.ForkJoinPool.runWorker([email protected]/ForkJoinPool.java:1603)
        at java.util.concurrent.ForkJoinWorkerThread.run([email protected]/ForkJoinWorkerThread.java:177)

   Locked ownable synchronizers:
        - <0x0000000624e2e9a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

To make matters worse, in the default configuration, both zk timeout and client operation timeout are 30 seconds. This will cause each retry request to end with a timeout and iteratively.

Modifications

This problem is so hidden that it is very difficult to detect, and even we still have no way to reproduce it in the test environment. However, in our production environment, this problem is more likely to occur if the bundle re-load or broker restart operation is triggered frequently(This phenomenon may be more obvious in our production scenarios. Each of our independent topics may have thousands of producers). Once the problem occurs in the cluster, there will be a lot of operation timeout exceptions.

Below we give a solution to reduce the use of locks, because we think that for the conventional production model, it is sufficient to use read locks when adding producers in Shared mode.

@wuzhanpeng
Copy link
Contributor Author

@merlimat @sijie @eolivelli Could you help to check this?

Comment on lines +409 to +411
Lock producerLock = producer.getAccessMode() == ProducerAccessMode.Shared
? lock.readLock() : lock.writeLock();
producerLock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counter in internalAddProducer() is not thread-safe, it will be a problem if you use the read lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply! Let me take the liberty to ask, what does counter refer to in internalAddProducer?

@codelipenghui
Copy link
Contributor

codelipenghui commented Sep 27, 2021

@wuzhanpeng Do you have the complete stack information? Not sure if there is dead lock in the metadata cache. Have you tried to restart the broker to see if the problem can be resolved.

@wuzhanpeng
Copy link
Contributor Author

@wuzhanpeng Do you have the complete stack information? Not sure if there is dead lock in the metadata cache. Have you tried to restart the broker to see if the problem can be resolved.

@codelipenghui Sometimes restarting the problem broker can solve it, and sometimes it will cause the same problem on other brokers(when bundles are transferred to other brokers, the pressure of handling producers will also be transferred). As I mentioned in the above description, restarting the broker frequently can easily trigger this problem.

Because the complete jstack results of the production environment may contain some sensitive information, I am afraid that the full version cannot be uploaded. 😞

@hangc0276
Copy link
Contributor

hangc0276 commented Sep 28, 2021

I doubt the write lock maybe not the root cause of this issue.

When checking isProducersExceeded, it will load policy data from zk if topic policy not configured. Once the first thread call isProducersExceeded, the policy will be cached, and the following check won't be blocked.

So in my opinion, you'd better check the zk read latency or there are something wrong in ZooKeeperDataCache. You can also the stack about ZooKeeperDataCache.

@wuzhanpeng
Copy link
Contributor Author

broker.jstack.txt

@codelipenghui FYI. The file has been desensitized.

@wuzhanpeng
Copy link
Contributor Author

wuzhanpeng commented Sep 29, 2021

I doubt the write lock maybe not the root cause of this issue.

When checking isProducersExceeded, it will load policy data from zk if topic policy not configured. Once the first thread call isProducersExceeded, the policy will be cached, and the following check won't be blocked.

So in my opinion, you'd better check the zk read latency or there are something wrong in ZooKeeperDataCache. You can also the stack about ZooKeeperDataCache.

Thank you for your reminder~

We also checked why the caching strategy of ns policy did not take effect. The actual situation is that when the broker gets into a loop waiting problem, every time the ZooKeeperDataCache#get times out, it will invalidate the z-path by the way. In this way, the next time you get the ns strategy, you still have to get data from zk. Therefore, once a problem occurs, it is difficult to cache successfully and then get out of the predicament.

There are many ways to break the deadlock condition in this scenario. However, IMHO reducing the use of locks may be a more thorough solution. After all, if the producers of shared mode accounts for most of the topics, the existence of this lock itself is also reducing the overall performance. In addition, the logic involved in the cache layer is extensive, and avoiding modifying the current cache design may be a more secure solution.

@hangc0276
Copy link
Contributor

every time the ZooKeeperDataCache#get times out

@wuzhanpeng Would you address the reason of every time the ZooKeeperDataCache#get times out?

@wuzhanpeng
Copy link
Contributor Author

every time the ZooKeeperDataCache#get times out

@wuzhanpeng Would you address the reason of every time the ZooKeeperDataCache#get times out?

As I described in motivation, when the core pool is occupied by all those threads waiting for the lock(identified as thread#1 above), the thread(thread#2) that loads ns policy from zk can only wait in the submission queue until it times out.

@Anonymitaet
Copy link
Member

Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
@wuzhanpeng
Copy link
Contributor Author

Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@Anonymitaet Thanks for your reminder. No need to update the documentation.

@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Oct 8, 2021
@codelipenghui
Copy link
Contributor

@wuzhanpeng After taking a look at the complete stack, looks the issue is related to checking topic policies

"ForkJoinPool.commonPool-worker-110" #482 daemon prio=5 os_prio=0 tid=0x00007fd714021000 nid=0x61a3 waiting on condition  [0x00007fd562772000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x00000006284caad0> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
	at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1798)
	at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3146)
	at java.util.concurrent.CompletableFuture.timedGet([email protected]/CompletableFuture.java:1868)
	at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2021)
	at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
	at org.apache.pulsar.broker.service.AbstractTopic.isProducersExceeded(AbstractTopic.java:156)
	at org.apache.pulsar.broker.service.AbstractTopic.internalAddProducer(AbstractTopic.java:629)
	at org.apache.pulsar.broker.service.AbstractTopic.lambda$addProducer$8(AbstractTopic.java:405)
	at org.apache.pulsar.broker.service.AbstractTopic$$Lambda$1433/1422007940.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
	at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
	at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
	at org.apache.pulsar.broker.service.AbstractTopic.addProducer(AbstractTopic.java:392)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.addProducer(PersistentTopic.java:540)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$22(ServerCnx.java:1233)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$1428/932296811.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
	at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
	at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$6(BookkeeperSchemaStorage.java:217)
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$1421/1611023719.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniHandle([email protected]/CompletableFuture.java:930)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire([email protected]/CompletableFuture.java:907)
	at java.util.concurrent.CompletableFuture$Completion.exec([email protected]/CompletableFuture.java:479)
	at java.util.concurrent.ForkJoinTask.doExec([email protected]/ForkJoinTask.java:290)
	at java.util.concurrent.ForkJoinPool.runWorker([email protected]/ForkJoinPool.java:1603)
	at java.util.concurrent.ForkJoinWorkerThread.run([email protected]/ForkJoinWorkerThread.java:177)

   Locked ownable synchronizers:
	- <0x0000000624e2e9a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

After #13082, I think the issue has been fixed, but it only can fix the master branch and 2.10. For version < 2.10, I think we should change topicPolicies.getMaxProducersPerTopic() to only check the topic policies in the cache, it looks like getIfPresent. We should fix the branch-2.8 and branch-2.9.

@michaeljmarshall
Copy link
Member

Removing the release/2.8.3 label since this will miss the release.

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

github-actions bot commented Dec 9, 2022

@wuzhanpeng Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@tisonkun
Copy link
Member

Closed as stale and conflict. Please rebase and resubmit the patch if it's still relevant.

@tisonkun tisonkun closed this Dec 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-label-missing type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants