-
Notifications
You must be signed in to change notification settings - Fork 138
Allow setting listener name for topic lookup and optimize the lookup #646
Allow setting listener name for topic lookup and optimize the lookup #646
Conversation
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java
Outdated
Show resolved
Hide resolved
I'll add a test for |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java
Outdated
Show resolved
Hide resolved
dc2483c
to
e5672a6
Compare
Except the known tests, there're still two tests that failed in CI but succeeded in my local env. I'll take a look.
|
Now all tests passed. But the
mvn test -pl tests -Dtest='DistributedClusterTest' OOXXO XXOOO All failure reasons are the same:
The broker logs are like
There's 1 pending response (
mvn test -pl tests -Dtest='DistributedClusterTest#testMutiBrokerUnloadReload' OOOOO OOOOO Never failed. |
@@ -2044,7 +2044,7 @@ protected boolean isTransactionTopic(String topic) { | |||
CompletableFuture<PartitionMetadata> returnFuture = new CompletableFuture<>(); | |||
|
|||
topicManager.getTopicBroker(topic.toString()) | |||
.thenCompose(pair -> getProtocolDataToAdvertise(pair, topic)) | |||
.thenCompose(address -> getProtocolDataToAdvertise(address, topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a judgement in getProtocolDataToAdvertise
? if kafkaListenerName
is used, do not convert broker url to kafka advertised address in getProtocolDataToAdvertise
, just use the address
here combine with SecurityProtocol
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why not use getProtocolDataToAdvertise
if kafkaListenerName
is used? For example, in KafkaListenerNameTest
, the address
here is localhost:15002
and getProtocolDataToAdvertise(address, topic)
will be completed with PLAINTEXT://localhost:15003,SSL://localhost:15004
.
If we don't use getProtocolDataToAdvertise
, which SecurityProtocol
should we use? Could you give an example implementation and tell the reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the configuration in KafkaListenerNameTest
is not correct. We should not set InternalListenerName
same with KafkaListenerName
. Broker will use the address in InternalListenerName
as advertised address and regist to zk path /loadbalance/brokers
.
If InternalListenerName
is not set, default the first listener internal
will be regist to zk path and getProtocolDataToAdvertise
will failed with "No node for broker under zk://loadbalance"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give an example config in test environment? Since there's no proxy, broker can only listen on localhost:<brokerPort>
, other listeners are unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean given following config
internalListenerName=internal
advertisedListeners=internal:pulsar://localhost:6650,external:PLAINTEXT://localhost:9092
kafkaListenerName=external
The returned address (stringOptional
in following code) should be localhost:9092
?
topicManager.getTopicBroker(topic.toString())
.thenCompose(address -> getProtocolDataToAdvertise(address, topic))
.whenComplete((stringOptional, throwable) -> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangjialing218 To correct my previous comment, the advertisedListeners
cannot accept protocols other than pulsar
or pulsar+ssl
. Therefore, the internal:pulsar://localhost:6650,external:PLAINTEXT://localhost:9092
is invalid.
If I didn't understand wrong, maybe you want to configure
advertisedListeners=pulsar:pulsar://localhost:6650,kop:pulsar://localhost:9092
and return the PLAINTEXT://localhost:9092
as the listeners that will be passed to EndPoint.getSslEndPoint
or EndPoint.getPlainTextEndPoint
?
If so, I think you can open a new PR that is based on this PR. Because it introduced a new mapping rule from Pulsar's TopicDomain
to Kafka's SecurityProtocol
:
pulsar
->PLAINTEXT
pulsar+ssl
->SSL
SASL_PLAINTEXT
andSASL_SSL
are not supported in KoP because we usesaslAllowedMechanisms
config to differ whether the SASL is enabled.
And it should be documented as well. I think this PR's original purpose is to provide a dependent method for topic lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to configure like this:
advertisedListeners=pulsar:pulsar://192.168.0.1:6650,kop:pulsar://10.254.1.1:9092
192.168.0.1 is private network ip and 10.254.1.1 is public network ip.
In order to deploy pulsar cluster that can be accessed through both public network private network.
https://github.com/apache/pulsar/wiki/PIP-61:-Advertised-multiple-addresses
Current If I configure like this, getProtocolDataToAdvertise
will failed.
I will open a new PR after this PR merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangjialing218 It's merged now, you can start the work.
I also got this error sometimes in my local env. |
I'm afraid not. I upgraded the pulsar dependency to 2.8.0.12 that contains this PR and the test is still flaky. I suspected there's something wrong with the clean up phase. If we run unload test first (rename to |
412262b
to
dc23ddb
Compare
dc23ddb
to
5d7d0e1
Compare
…646) Fixes #644 ### Motivation Using `PulsarClient` for topic lookup might encounter `TooManyRequestException` easily because all client connections share the same `PulsarClient` for topic lookup. It's also a waste of resource. ### Modifications Add a common method `KafkaProtocolHandler#getTopic` for all places that need topic lookup in KoP. In this method, first use `NamespaceService.getBrokerServiceUrlAsync` to get broker's Internet address. However, when the namespace bundle is not owned by any broker, which usually happens during a bundle unload, the `getBrokerServiceUrlAsync` might return a `LookupResult` instance whose `redirect` field is true. Pulsar client can handle this field and resend the lookup request to another broker, but Kafka client cannot do it. In this case, this PR use `PulsarClient` for topic lookup as a fallback. In addition, this PR adds a `kafkaListenerName` config and `getPulsarClientImpl()` method to create a builtin `PulsarClient` with listener name configured. Since the `DistributedClusterTest` might fail if its three tests were run in a whole, this PR removes the lookup logic in `OffsetAndTopicListener#onLoad`.
Fixes #644
Motivation
Using
PulsarClient
for topic lookup might encounterTooManyRequestException
easily because all client connections share the samePulsarClient
for topic lookup. It's also a waste of resource.Modifications
Add a common method
KafkaProtocolHandler#getTopic
for all places that need topic lookup in KoP. In this method, first useNamespaceService.getBrokerServiceUrlAsync
to get broker's Internet address.However, when the namespace bundle is not owned by any broker, which usually happens during a bundle unload, the
getBrokerServiceUrlAsync
might return aLookupResult
instance whoseredirect
field is true. Pulsar client can handle this field and resend the lookup request to another broker, but Kafka client cannot do it. In this case, this PR usePulsarClient
for topic lookup as a fallback.In addition, this PR adds a
kafkaListenerName
config andgetPulsarClientImpl()
method to create a builtinPulsarClient
with listener name configured.Since the
DistributedClusterTest
might fail if its three tests were run in a whole, this PR removes the lookup logic inOffsetAndTopicListener#onLoad
.