Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Allow setting listener name for topic lookup and optimize the lookup #646

Conversation

BewareMyPower
Copy link
Collaborator

@BewareMyPower BewareMyPower commented Aug 11, 2021

Fixes #644

Motivation

Using PulsarClient for topic lookup might encounter TooManyRequestException easily because all client connections share the same PulsarClient for topic lookup. It's also a waste of resource.

Modifications

Add a common method KafkaProtocolHandler#getTopic for all places that need topic lookup in KoP. In this method, first use NamespaceService.getBrokerServiceUrlAsync to get broker's Internet address.

However, when the namespace bundle is not owned by any broker, which usually happens during a bundle unload, the getBrokerServiceUrlAsync might return a LookupResult instance whose redirect field is true. Pulsar client can handle this field and resend the lookup request to another broker, but Kafka client cannot do it. In this case, this PR use PulsarClient for topic lookup as a fallback.

In addition, this PR adds a kafkaListenerName config and getPulsarClientImpl() method to create a builtin PulsarClient with listener name configured.

Since the DistributedClusterTest might fail if its three tests were run in a whole, this PR removes the lookup logic in OffsetAndTopicListener#onLoad.

@BewareMyPower BewareMyPower changed the title Use NamespaceService for topic lookup [WIP] Use NamespaceService for topic lookup Aug 12, 2021
@BewareMyPower
Copy link
Collaborator Author

I'll add a test for kafkaListenerName soon.

@BewareMyPower BewareMyPower force-pushed the bewaremypower/broker-service-lookup branch from dc2483c to e5672a6 Compare August 13, 2021 05:59
@BewareMyPower BewareMyPower changed the title [WIP] Use NamespaceService for topic lookup Allow setting listener name for topic lookup and optimize the lookup Aug 13, 2021
@BewareMyPower BewareMyPower changed the title Allow setting listener name for topic lookup and optimize the lookup [WIP] Allow setting listener name for topic lookup and optimize the lookup Aug 13, 2021
@BewareMyPower
Copy link
Collaborator Author

Except the known tests, there're still two tests that failed in CI but succeeded in my local env. I'll take a look.

Error:    DistributedClusterTest.testMutiBrokerUnloadReload:424->kafkaPublishMessage:197 » Execution
Error:    KafkaListenerNameTest.testListenerName » ThreadTimeout Method io.streamnative....

@BewareMyPower BewareMyPower changed the title [WIP] Allow setting listener name for topic lookup and optimize the lookup Allow setting listener name for topic lookup and optimize the lookup Aug 15, 2021
@BewareMyPower
Copy link
Collaborator Author

Now all tests passed. But the DistributedClusterTest is still unstable. Here're the test results in my local env. Each test case was run for 10 times, O represents success, X represents failure.

  1. Run the whole tests
mvn test -pl tests -Dtest='DistributedClusterTest'

OOXXO XXOOO

All failure reasons are the same:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kopMutiBrokerUnloadReload10-0: 10004 ms has passed since batch creation plus linger time
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.kafkaPublishMessage(DistributedClusterTest.java:197)

The broker logs are like

23:58:14.251 [bookkeeper-ml-scheduler-OrderedScheduler-4-0:org.apache.pulsar.broker.PulsarService@1011] INFO  org.apache.pulsar.broker.PulsarService - Loaded 4 topics on public/default/0x00000000_0x80000000 -- time taken: 0.003 seconds
23:58:17.145 [pulsar-load-manager-96-1:org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl@437] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to metadata store because maximum change 41.955193877220154% exceeded threshold 10%; time since last report written is 5.01 seconds
23:58:18.467 [pulsar-load-manager-137-1:org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl@437] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to metadata store because maximum change 38.13229501247406% exceeded threshold 10%; time since last report written is 5.01 seconds
23:58:24.243 [kafka-producer-network-thread | DemoKafkaOnPulsarProducer:org.apache.kafka.clients.NetworkClient@698] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=DemoKafkaOnPulsarProducer] Disconnecting from node 36016841 due to request timeout.
23:58:24.243 [pulsar-io-140-3:io.streamnative.pulsar.handlers.kop.KafkaRequestHandler@298] INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - channel inactive [id: 0x695fd4c4, L:/127.0.0.1:15011 ! R:/127.0.0.1:51123]
23:58:24.244 [pulsar-io-140-3:io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder@88] INFO  io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - close channel [id: 0x695fd4c4, L:/127.0.0.1:15011 ! R:/127.0.0.1:51123] with 1 pending responses

There's 1 pending response (METADATA) that has been pending for over 10 seconds, then Kafka client's send failed and the connection was closed.

  1. Only run the unload test
mvn test -pl tests -Dtest='DistributedClusterTest#testMutiBrokerUnloadReload'

OOOOO OOOOO

Never failed.

@@ -2044,7 +2044,7 @@ protected boolean isTransactionTopic(String topic) {
CompletableFuture<PartitionMetadata> returnFuture = new CompletableFuture<>();

topicManager.getTopicBroker(topic.toString())
.thenCompose(pair -> getProtocolDataToAdvertise(pair, topic))
.thenCompose(address -> getProtocolDataToAdvertise(address, topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a judgement in getProtocolDataToAdvertise? if kafkaListenerName is used, do not convert broker url to kafka advertised address in getProtocolDataToAdvertise, just use the address here combine with SecurityProtocol.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why not use getProtocolDataToAdvertise if kafkaListenerName is used? For example, in KafkaListenerNameTest, the address here is localhost:15002 and getProtocolDataToAdvertise(address, topic) will be completed with PLAINTEXT://localhost:15003,SSL://localhost:15004.

If we don't use getProtocolDataToAdvertise, which SecurityProtocol should we use? Could you give an example implementation and tell the reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the configuration in KafkaListenerNameTest is not correct. We should not set InternalListenerName same with KafkaListenerName. Broker will use the address in InternalListenerName as advertised address and regist to zk path /loadbalance/brokers.
If InternalListenerName is not set, default the first listener internal will be regist to zk path and getProtocolDataToAdvertise will failed with "No node for broker under zk://loadbalance"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give an example config in test environment? Since there's no proxy, broker can only listen on localhost:<brokerPort>, other listeners are unused.

Copy link
Collaborator Author

@BewareMyPower BewareMyPower Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean given following config

internalListenerName=internal
advertisedListeners=internal:pulsar://localhost:6650,external:PLAINTEXT://localhost:9092
kafkaListenerName=external

The returned address (stringOptional in following code) should be localhost:9092?

        topicManager.getTopicBroker(topic.toString())
            .thenCompose(address -> getProtocolDataToAdvertise(address, topic))
            .whenComplete((stringOptional, throwable) -> {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangjialing218 To correct my previous comment, the advertisedListeners cannot accept protocols other than pulsar or pulsar+ssl. Therefore, the internal:pulsar://localhost:6650,external:PLAINTEXT://localhost:9092 is invalid.

If I didn't understand wrong, maybe you want to configure

advertisedListeners=pulsar:pulsar://localhost:6650,kop:pulsar://localhost:9092

and return the PLAINTEXT://localhost:9092 as the listeners that will be passed to EndPoint.getSslEndPoint or EndPoint.getPlainTextEndPoint?

If so, I think you can open a new PR that is based on this PR. Because it introduced a new mapping rule from Pulsar's TopicDomain to Kafka's SecurityProtocol:

  • pulsar -> PLAINTEXT
  • pulsar+ssl -> SSL
  • SASL_PLAINTEXT and SASL_SSL are not supported in KoP because we use saslAllowedMechanisms config to differ whether the SASL is enabled.

And it should be documented as well. I think this PR's original purpose is to provide a dependent method for topic lookup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to configure like this:
advertisedListeners=pulsar:pulsar://192.168.0.1:6650,kop:pulsar://10.254.1.1:9092
192.168.0.1 is private network ip and 10.254.1.1 is public network ip.
In order to deploy pulsar cluster that can be accessed through both public network private network.
https://github.com/apache/pulsar/wiki/PIP-61:-Advertised-multiple-addresses

Current If I configure like this, getProtocolDataToAdvertise will failed.
I will open a new PR after this PR merged.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangjialing218 It's merged now, you can start the work.

@wangjialing218
Copy link
Contributor

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kopMutiBrokerUnloadReload10-0: 10004 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.kafkaPublishMessage(DistributedClusterTest.java:197)

I also got this error sometimes in my local env.
According to debug log of kop, there is some lookup operation blocked, FindBroker for topic kopMutiBrokerUnloadReload10, partitions found/all: only up to 9/10, and the bloked lookup operation is namespaceService.getBrokerServiceUrlAsync
I'm not sure if this problem is related with apache/pulsar#11310, do we base on the broker version with this pr?

@BewareMyPower BewareMyPower added the type/feature Indicates new functionality label Aug 16, 2021
@BewareMyPower BewareMyPower self-assigned this Aug 16, 2021
@BewareMyPower
Copy link
Collaborator Author

I'm not sure if this problem is related with apache/pulsar#11310, do we base on the broker version with this pr?

I'm afraid not. I upgraded the pulsar dependency to 2.8.0.12 that contains this PR and the test is still flaky. I suspected there's something wrong with the clean up phase. If we run unload test first (rename to test1MutiBrokerUnloadReload) it would never fail.

@BewareMyPower BewareMyPower force-pushed the bewaremypower/broker-service-lookup branch from 412262b to dc23ddb Compare August 16, 2021 08:39
@BewareMyPower BewareMyPower force-pushed the bewaremypower/broker-service-lookup branch from dc23ddb to 5d7d0e1 Compare August 17, 2021 01:35
@BewareMyPower BewareMyPower merged commit 3095d55 into streamnative:master Aug 18, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/broker-service-lookup branch August 18, 2021 02:21
BewareMyPower added a commit that referenced this pull request Aug 19, 2021
…646)

Fixes #644 

### Motivation

Using `PulsarClient` for topic lookup might encounter `TooManyRequestException` easily because all client connections share the same `PulsarClient` for topic lookup. It's also a waste of resource.

### Modifications

Add a common method `KafkaProtocolHandler#getTopic` for all places that need topic lookup in KoP. In this method, first use `NamespaceService.getBrokerServiceUrlAsync` to get broker's Internet address.

However, when the namespace bundle is not owned by any broker, which usually happens during a bundle unload, the `getBrokerServiceUrlAsync` might return a `LookupResult` instance whose `redirect` field is true. Pulsar client can handle this field and resend the lookup request to another broker, but Kafka client cannot do it. In this case, this PR use `PulsarClient` for topic lookup as a fallback.

In addition, this PR adds a `kafkaListenerName` config and `getPulsarClientImpl()` method to create a builtin `PulsarClient` with listener name configured.

Since the `DistributedClusterTest` might fail if its three tests were run in a whole, this PR removes the lookup logic in `OffsetAndTopicListener#onLoad`.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type/feature Indicates new functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement] Use BrokerService instead of PulsarClient for topic lookup
3 participants