-
Notifications
You must be signed in to change notification settings - Fork 7
TopicIds
Several filters need to do things based on the topic name, but Kafka is in the process of changing many RPCs to use topic ids. Topic names are unique in "space" (the topic namespace). Topic ids are unique in both space and time, allowing to distinguish two topics with the same name and in the same cluster that exist(ed) at different times.
-
KAFKA-10551 will add support for topic ids in
Produce
requests. - Version 13 of
Fetch
request added support for topic Ids. - See KAFKA-8872 for other RPCs
The filter instances need to know the mapping of id to name. But in deployment with multiple proxy instances (e.g. HA), a client might have made its Metadata
request via some other proxy instance, so the proxy instance handling e.g. Fetch
might not have previously observed a Metadata
response which included that topic id.
If filters can make their own Metadata
requests, then they can handle any request which uses an unknown topic id by making a Metadata
request, before continuing the processing of the original request. In this way they could maintain a topic id → topic name
cache.
This works OK because topic ids never change, and because Kafka doesn't (yet) support renaming topics.
Together these things mean the cache topic id → topic name
mapping cannot become stale.
But there might also need to be a periodic process to remove topic ids which no longer exist (possible because another proxy instance handled the delete).
What if Kafka added support for topic renaming? In this case the cache could become stale. For example support the mapping in proxy process 1 contains 1234 → foo
, then via another proxy in process 2 a client renames foo → bar
and creates a new topic bar
which gets allocated id 5678; the mapping in process 1 is now incorrect. To handle this properly the proxy would need to invalidate the cache based on metadata changes in the cluster. E.g.
- For a reverse proxy by also intercepting interbroker RPCs, thus observing topic name changes.
- For a reverse proxy, making the assumption that all metadata changes were mediated via the same proxy cluster and passing state between proxy instances
- Via some other mechanism for observing cluster metadata changes (KRaft observer, or some as-yet undefined Kafka API).
We need to avoid blocking. Consider:
- some filter has logic that requires the topic name as an input, and a request that has a topic id, so needs to lookup the name
- It's not in the cache, so we need to look it up.
We don't want to block the pipeline while we send a Metadata
request and await the response.
That would be terrible for performance and in the worst case could prevent the proxy processing any request while awaiting a bunch of Metadata
response.
So we need the filter needs to send the Metadata
request, stop processing the original message, and then resume processing the original message once the Metadata
response has been received.