Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Lookup can not find broker address in k8s #633

Open
wangjialing218 opened this issue Aug 5, 2021 · 7 comments
Open

[BUG] Lookup can not find broker address in k8s #633

wangjialing218 opened this issue Aug 5, 2021 · 7 comments
Labels

Comments

@wangjialing218
Copy link
Contributor

Describe the bug
Deploy broker with kop in k8s, expose ports with nodeport.
when advertisedAddress is not same with the first address in advertisedListeners, lookup can not find broker address in k8s

To Reproduce
I use nodeip:nodeport to expose the address of brokers in k8s.
Here is the service stats of broker.
kop

configuration of broker.conf:
advertisedAddress: {worknode_ip}:30873
advertisedListeners: internal:{headless service domain}:6650,external:pulsar://{worknode_ip}:31964
kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://{worknode_ip}:31553

Pulsar client can connect to pulsar outside k8s with pulsar://{worknode_ip}:31964 by setting listenerName with external
But when use kafka client outside k8s connect to pulsar with {worknode_ip}:31553, it will fail to get topic metadata due to broker lookup failed.

Additional context
Current lookup logic of kop:

  1. use pulsarService.getClient().getLookup().getBroker() to find the broker of topic, result is InetSocketAddress
  2. list all broker address in zk under /loadbalance/brokers/
  3. find match brokers in step 2 which host name is same with the InetSocketAddress in step 1
  4. find the broker which pulsarServiceUrl and port or WebServiceUrl and port is same with InetSocketAddress
  5. return kafkaAdvertisedListeners of that broker in step 4

In my condition, in step 1 the lookup result is the first address in advertisedListeners. which is {headless service domain}:6650, and in step 2 the broker address under /loadbalance/brokers/ is {worknode_ip}:30873. because the broker address here will be used in admin request redirection, I need to set the address here with nodeport in order to handle admin request outside.
Then because {headless service domain} is not same with {worknode_ip}, step 3 will fail with no match broker.
kop2

Current workaround:
In step 1, I create a pulsar client for topic lookup with listenerName extenrnal instead of use the pulsar client of pulsarService.getClient(), so the result could be {worknode_ip}:31964

Suggestion:
I'm not sure why kop use such lookup logic. That may have problem when advertisedAddress is not same with the first address in advertisedListeners, which is default lookup result of pulsarService.getClient().getLookup().getBroker().
Can we use the lookup logic of advertisedListeners in pulsar to handle the topic lookup in kop?

  1. deside the listenerName for kop, for example kafka-listener.
  2. set advertisedListeners with kafka-listener: pulsar-internal:{headless service domain}:6650,pulsar-external:pulsar://{worknode_ip}:31964,kafka-listener:pulsar://{worknode_ip}:31553
  3. create pulsar client for topic lookup with listenerName kafka-listener,we could get the expect lookup result {worknode_ip}:31553 in one step.
@BewareMyPower
Copy link
Collaborator

It's a legacy design to implement the basic functions quickly. It's right that the current lookup design can be optimized to reuse broker's logic. We also found other problems introduced by using the builtin PulsarClient for topic lookup recently. For example, if many topics/partitions need to be looked up, there would be many lookup requests in the same PulsarClient and TooManyRequestsException might be thrown.

Would you like to contribute a PR to fix it?

@wangjialing218
Copy link
Contributor Author

Current advertisedListeners has a validation that address must start with pulsar://, so we can not regist kafka-listener start with PLAINTEXT://

Could we consider a total solution with #574? We could regist multi kafka-listeners into advertisedListeners with different listenerName and port, but may be we need to make some change in broker side to remove the validation of pulsar://.

@BewareMyPower
Copy link
Collaborator

Current advertisedListeners has a validation that address must start with pulsar://

Which part of code do you mean? Do you mean in Pulsar side, what the getProtocolDataToAdvertise() returns cannot be registered?

Currently the work for #574 hasn't started, it may take some time to look into this issue.

@wangjialing218
Copy link
Contributor Author

Current advertisedListeners has a validation that address must start with pulsar://

Which part of code do you mean? Do you mean in Pulsar side, what the getProtocolDataToAdvertise() returns cannot be registered?

In Pulsar side there is a Class MultipleListenerValidator, Currently we can only regist advertisedListeners start with pulsar://, otherwise the validator will throw exception.
If we want to reuse the advertisedListeners in broker for kop topic lookup, we need to regist kafkaAdvertisedListeners there.

@BewareMyPower
Copy link
Collaborator

I'm afraid it's not easy to reuse the advertisedListeners of Pulsar.

Regarding to

But when use kafka client outside k8s connect to pulsar with {worknode_ip}:31553, it will fail to get topic metadata due to broker lookup failed.

could you provide some help since I remember you have tried the NodePort mode before? @gaoran10

@gaoran10
Copy link
Contributor

gaoran10 commented Aug 10, 2021

The configurations advertisedAddress and advertisedListeners couldn't work together.

Maybe we could use these configurations? Using the configuration advertisedAddress is enough?

advertisedAddress: {headless service domain}:6650
kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://{headless service domain}:9092

The lookup result is {headless service domain}:9092, we should specify the host for headless service domain, the IP is the external IP.

@wangjialing218
Copy link
Contributor Author

Client outside k8s can not connect to kop through the lookup result {headless service domain}:9092.
I'm working on the way to make client connect to broker in k8s without any proxy module and any hosts configration, that could be achieved in follow way:

  1. lookup result for pulsar client outside is {work node external ip}:{nodeport of 6650}
  2. redirect url for pulsar http request outside is {work node external ip}:{nodeport of 8080}
  3. lookup result for kafka client outside is {work node external ip}:{nodeport of 9092}

I have made 1 and 2 in my k8s, for 3 I have workaround way to make it possible currently.
I think #644 is a good solution to solve this problem.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants