-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [client] call redeliver 1 msg but did 2 msgs #23943
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #23943 +/- ##
============================================
+ Coverage 73.57% 74.25% +0.68%
+ Complexity 32624 2375 -30249
============================================
Files 1877 1853 -24
Lines 139502 143735 +4233
Branches 15299 16334 +1035
============================================
+ Hits 102638 106730 +4092
+ Misses 28908 28596 -312
- Partials 7956 8409 +453
Flags with carried forward coverage won't be shown. Click here to find out more.
|
(cherry picked from commit 7a79c78)
(cherry picked from commit 7a79c78)
(cherry picked from commit 7a79c78)
(cherry picked from commit 7a79c78)
@poorbarcode Could you fix the build error on the branch-3.0?
This import is not used. |
@DataProvider | ||
public Object[][] enabledBatchSend() { | ||
return new Object[][] { | ||
{false}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it's false, even without this change, testBatchMessageNAck
could still pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it only affects bacthed messages
|
||
// Negative ack and verify result/ | ||
Message<byte[]> receive1 = consumer.receive(); | ||
consumer.pause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's meaningless here. pause()
should only affect the message listener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Java client, pause()
will pause sending new permits, so it's not specific to message listener.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Lines 1867 to 1880 in 1220951
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) { | |
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); | |
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) { | |
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] Sending permit-cmd to broker with available permits = {}", topic, available); | |
} | |
sendFlowPermitsToBroker(currentCnx, available); | |
break; | |
} else { | |
available = AVAILABLE_PERMITS_UPDATER.get(this); | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pause
is for stop receiving msgs from broker
Motivation
Issue: call redeliver
1
message, but it will redeliver2
messages, you can reproduce the issue by the new testtestBatchMessageNAck
Modifications
fix the bug
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x