Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-307 Expose inflight state waiting time and service channel monitor interval configs. Handle AddEntry failure during topic transfer #21668

Merged
merged 4 commits into from
Dec 6, 2023

Conversation

heesung-sn
Copy link
Contributor

@heesung-sn heesung-sn commented Dec 4, 2023

Fixes: #21654

PIP: #307

Motivation

  • We want to expose the in-flight state waiting time and service channel monitor interval configs to further control the Extensible Load Balancer behavior. We don't expose these vars in the broker.conf in this PR, as their tuning is rare.

  • Improve the flakiness of the ExtensibleLoadBalancerImplTest by removing static mock.

  • Add AddEntry failure handler logic for the PIP-307 to cover the edge-case.

Modifications

  • Expose the in-flight state waiting time and service channel monitor interval configs in ServiceConfiguration.

  • Removed static mock in the ExtensibleLoadBalancerImplTest

  • Added retries in ExtensibleLoadBalancerImplTest test cases.

  • Add AddEntry failure handler logic for the PIP-307 to cover the edgecase. (added transferring state in AbstracTopic)

Verifying this change

  • Make sure that the change passes the CI checks.

  • Add AddEntry failure handler logic for the PIP-307 to cover the edgecase. logic is covered by testTransferClientReconnectionWithoutLookup (this test was flaky because of the AddEntry failure while the ledger is fenced by the unloading)

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: heesung-sn#54

… time configs. Handle AddEntry failure during topic transfer
Copy link

github-actions bot commented Dec 4, 2023

@heesung-sn Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@heesung-sn heesung-sn changed the title [improve][broker] PIP-307 Expose tombstone and inflight state waiting… [improve][broker] PIP-307 Expose tombstone and inflight state waiting time configs. Handle AddEntry failure during topic transfer Dec 4, 2023
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Dec 4, 2023
Copy link
Contributor

@dragosvictor dragosvictor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, left some comments.

Comment on lines 209 to 210
this.stateTombstoneDelayTimeInSeconds = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this shouldn't be multiplied by 1000 anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh thanks for this catch. The name should stateTombstoneDelayTimeInMillis

Comment on lines 661 to 668
if (isClosingOrDeleting
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(getBrokerService().pulsar())) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {} while closing or deleting.",
topic, exception.getMessage(), exception);
}
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct, but is there anything to be done about the PublishContext callback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the callback is used to signal clients to additionally handle this case on the clients as well. In this case, we don't need to send any to the clients.

Comment on lines 463 to 466
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(producer.isConnected());
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify as suggested below? We aren't expecting any new calls to lookup.getBroker(topicName), we could go further and replace the condition with never().

Suggested change
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(producer.isConnected());
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName);
});
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(producer::isConnected);
verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -654,12 +654,12 @@ public void splitAndRetryTest() throws Exception {
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
FieldUtils.writeDeclaredField(channel1,
"semiTerminalStateWaitingTimeInMillis", 300 * 1000, true);
"stateTombstoneDelayTimeInSeconds", 300 * 1000, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below: are these values still correct, since we switched to a different time unit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@heesung-sn heesung-sn self-assigned this Dec 4, 2023
Comment on lines 488 to 495
// if the topic is transferring, we don't send error code to the clients.
if (producer.getTopic().isTransferring()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received producer exception: {} while transferring.",
producer.getTopic().getName(), exception.getMessage(), exception);
}
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we still need to execute the code inside the lambda below, except the error sending? There's other cleanup operations being performed that can avoid resource leaks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Moving this logic into the lambda and continue to call the cleanups. Thank you.

@@ -1473,6 +1486,9 @@ public CompletableFuture<Void> close(

lock.writeLock().lock();
try {
if (!disconnectClients) {
transferring = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming that this flag is not meant to ever go back to false. Is that the intent? As it is right now, publishing a message to a topic that was transferred would lead to a TopicClosedException, whereas in the current proposal the exception would be silenced forever.

Copy link
Contributor Author

@heesung-sn heesung-sn Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, transferring will not go back to false.

In the worst case, if transferring is stuck, the leader monitor will send a msg to the source broker to fix the stuck state with handleOwnEvent at the } else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {. In this case, the topics will be forcefully closed with disconnectClients=true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the topics will be forcefully closed as you mentioned, but further messages produced will still be ignored instead of responded to with TopicClosedException.

Instead, we will rely on the service unit state channel's bundle(topic) transfer protocol.
At the end of the transfer protocol, at Owned state, the source broker should close the topic properly.
*/
if (transferring) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change relevant to the flaky test? If not, a separate PR might be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is required. Otherwise,testTransferClientReconnectionWithoutLookup will be flaky.

+ "by reassigning the ownerships if stuck too long, longer than this period."
+ "(only used in load balancer extension logics)"
)
private long loadBalancerInFlightServiceUnitStateWaitingTimeInMillis = 30 * 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a PIP to add configuration? /cc @codelipenghui

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a PIP for this config, as this is a minor addition (and these need to be tuned rarely). If required, I think we could update the config list in the PIP-192.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, update PIP-192 makes sense. It’s better to share in the mailing list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the PIP-192 to list the ExtensibleLoadBalancer's Service Configurations.
I shared this info in the mailing list. https://lists.apache.org/thread/zz612q2bhh6rccl04w3jz2mvt0z5kch8

@@ -157,6 +157,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
protected volatile Pair<String, List<EntryFilter>> entryFilters;
protected volatile boolean transferring = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse isFenced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I think we should have a different state here. isFenced=true can happen in other cases(e.g. when bk write fails), so we need to define a new state separately.

@dragosvictor
Copy link
Contributor

Looks good to me, thanks!

@@ -338,7 +338,7 @@ default boolean isSystemTopic() {

boolean isPersistent();

boolean isFenced();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep this interface here? It might break some protocol handler compatibility if removed

Copy link
Contributor Author

@heesung-sn heesung-sn Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This only introduced for pip-307. Since we dont use it any more, we are deleting it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for explaining.

@heesung-sn heesung-sn added this to the 3.2.0 milestone Dec 5, 2023
@heesung-sn heesung-sn closed this Dec 5, 2023
@heesung-sn heesung-sn reopened this Dec 5, 2023
@heesung-sn heesung-sn changed the title [improve][broker] PIP-307 Expose tombstone and inflight state waiting time configs. Handle AddEntry failure during topic transfer [improve][broker] PIP-307 Expose inflight state waiting time and service channel monitor interval configs. Handle AddEntry failure during topic transfer Dec 5, 2023
@codelipenghui codelipenghui merged commit 93df344 into apache:master Dec 6, 2023
73 of 76 checks passed
@heesung-sn heesung-sn deleted the pip-192-config branch April 2, 2024 17:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test type/flaky-tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: ClassCastException in ExtensibleLoadManagerImpl.checkOwnershipAsync
4 participants