-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-307 Expose inflight state waiting time and service channel monitor interval configs. Handle AddEntry failure during topic transfer #21668
Conversation
… time configs. Handle AddEntry failure during topic transfer
@heesung-sn Please add the following content to your PR description and select a checkbox:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, left some comments.
this.stateTombstoneDelayTimeInSeconds = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() | ||
* 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this shouldn't be multiplied by 1000 anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh thanks for this catch. The name should stateTombstoneDelayTimeInMillis
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Outdated
Show resolved
Hide resolved
if (isClosingOrDeleting | ||
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(getBrokerService().pulsar())) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Failed to persist msg in store: {} while closing or deleting.", | ||
topic, exception.getMessage(), exception); | ||
} | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks correct, but is there anything to be done about the PublishContext
callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the callback is used to signal clients to additionally handle this case on the clients as well. In this case, we don't need to send any to the clients.
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { | ||
assertTrue(producer.isConnected()); | ||
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify as suggested below? We aren't expecting any new calls to lookup.getBroker(topicName)
, we could go further and replace the condition with never()
.
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { | |
assertTrue(producer.isConnected()); | |
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName); | |
}); | |
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(producer::isConnected); | |
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -654,12 +654,12 @@ public void splitAndRetryTest() throws Exception { | |||
FieldUtils.writeDeclaredField(channel1, | |||
"inFlightStateWaitingTimeInMillis", 30 * 1000, true); | |||
FieldUtils.writeDeclaredField(channel1, | |||
"semiTerminalStateWaitingTimeInMillis", 300 * 1000, true); | |||
"stateTombstoneDelayTimeInSeconds", 300 * 1000, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and below: are these values still correct, since we switched to a different time unit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
37abda8
to
66a50f6
Compare
66a50f6
to
6e168bd
Compare
// if the topic is transferring, we don't send error code to the clients. | ||
if (producer.getTopic().isTransferring()) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Received producer exception: {} while transferring.", | ||
producer.getTopic().getName(), exception.getMessage(), exception); | ||
} | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we still need to execute the code inside the lambda below, except the error sending? There's other cleanup operations being performed that can avoid resource leaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Moving this logic into the lambda and continue to call the cleanups. Thank you.
@@ -1473,6 +1486,9 @@ public CompletableFuture<Void> close( | |||
|
|||
lock.writeLock().lock(); | |||
try { | |||
if (!disconnectClients) { | |||
transferring = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just confirming that this flag is not meant to ever go back to false
. Is that the intent? As it is right now, publishing a message to a topic that was transferred would lead to a TopicClosedException
, whereas in the current proposal the exception would be silenced forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, transferring
will not go back to false.
In the worst case, if transferring is stuck, the leader monitor will send a msg to the source broker to fix the stuck state with handleOwnEvent
at the } else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
. In this case, the topics will be forcefully closed with disconnectClients=true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, the topics will be forcefully closed as you mentioned, but further messages produced will still be ignored instead of responded to with TopicClosedException
.
Instead, we will rely on the service unit state channel's bundle(topic) transfer protocol. | ||
At the end of the transfer protocol, at Owned state, the source broker should close the topic properly. | ||
*/ | ||
if (transferring) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change relevant to the flaky test? If not, a separate PR might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is required. Otherwise,testTransferClientReconnectionWithoutLookup
will be flaky.
+ "by reassigning the ownerships if stuck too long, longer than this period." | ||
+ "(only used in load balancer extension logics)" | ||
) | ||
private long loadBalancerInFlightServiceUnitStateWaitingTimeInMillis = 30 * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a PIP to add configuration? /cc @codelipenghui
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a PIP for this config, as this is a minor addition (and these need to be tuned rarely). If required, I think we could update the config list in the PIP-192.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, update PIP-192 makes sense. It’s better to share in the mailing list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PIP-192 to list the ExtensibleLoadBalancer's Service Configurations.
I shared this info in the mailing list. https://lists.apache.org/thread/zz612q2bhh6rccl04w3jz2mvt0z5kch8
@@ -157,6 +157,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP | |||
protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); | |||
protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); | |||
protected volatile Pair<String, List<EntryFilter>> entryFilters; | |||
protected volatile boolean transferring = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse isFenced
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I think we should have a different state here. isFenced=true
can happen in other cases(e.g. when bk write fails), so we need to define a new state separately.
Looks good to me, thanks! |
@@ -338,7 +338,7 @@ default boolean isSystemTopic() { | |||
|
|||
boolean isPersistent(); | |||
|
|||
boolean isFenced(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep this interface here? It might break some protocol handler compatibility if removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This only introduced for pip-307. Since we dont use it any more, we are deleting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for explaining.
Fixes: #21654
PIP: #307
Motivation
We want to expose the in-flight state waiting time and service channel monitor interval configs to further control the Extensible Load Balancer behavior. We don't expose these vars in the broker.conf in this PR, as their tuning is rare.
Improve the flakiness of the ExtensibleLoadBalancerImplTest by removing static mock.
Add AddEntry failure handler logic for the PIP-307 to cover the edge-case.
Modifications
Expose the in-flight state waiting time and service channel monitor interval configs in ServiceConfiguration.
Removed static mock in the ExtensibleLoadBalancerImplTest
Added retries in ExtensibleLoadBalancerImplTest test cases.
Add AddEntry failure handler logic for the PIP-307 to cover the edgecase. (added transferring state in AbstracTopic)
Verifying this change
Make sure that the change passes the CI checks.
Add AddEntry failure handler logic for the PIP-307 to cover the edgecase.
logic is covered bytestTransferClientReconnectionWithoutLookup
(this test was flaky because of the AddEntry failure while the ledger is fenced by the unloading)Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: heesung-sn#54