-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support to read from multiple kafka topics in same supervisor #14424
Add support to read from multiple kafka topics in same supervisor #14424
Conversation
...dexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
Fixed
Show fixed
Hide fixed
...indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java
Fixed
Show fixed
Hide fixed
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Fixed
Show fixed
Hide fixed
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Fixed
Show fixed
Hide fixed
661ec7c
to
2f98377
Compare
@abhishekagarwal87 , these are very interesting changes! Since the PR is already open for design feedback, could you please add some more details in the description:
|
I added some details. There is no need to declare separate column mapping for each topic. Just like there is no need to declare separate column mapping for each partition in a topic. Topics can have any number of partitions. Ingestion system doesn't care. Ingestion system tells Kakfa
Kafka gives
From ingestion perspective, tp1:0 is just another partition like tp3:2. |
That's pretty neat, @abhishekagarwal87 , thanks for the clarification! |
@abhishekagarwal87 In what order record from different topics are pulled ? Is it round robin ? What happens if one of the topics does not any record but other has, are we handling this case? |
@pjain1 - The answer any question of sort "How is data consumed across multiple topics if XYZ is happening with topic A and topic B" is same as the answer to the question of "How is data consumed across multiple partitions if XYZ is happening with partition A and partition B" Q - What happens if one of topics does not have any record The topic really is an irrelevant entity. For Kafka servers, consuming data from tp0:0 and tp1:0 is the same as consuming from tp0:0 and tp0:1. We ask for partitions for a list of topics and from there on, the work unit for code is partition. |
SeekableStreamSupervisor#emitLag tries to emit lag with the Stream as one of its dimensions. It may be helpful to understand topic-wise patterns by emitting lag metrics with separate streams in the dimensions instead of having the complete list. |
@AmatyaAvadhanula - Good point. In that case, we will be emitting "0:tp" as the partition dimension instead of "0". The stream dimension will still be set as comma separated list of topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is neat and would help with operational simplicity. Few thoughts and questions:
-
Will multi-stream processing work if the
ioConfig
is different for the topics? For example, the input topics span distinct brokers because different teams own them, the shape of data on the streams is different, or they've security settings configured differently.
To that effect, have we considered an array ofioConfig
instead of a comma-separated list of topics that share a singleioConfig
? I don't know how backwards compatibility would work in this case, though.EDIT: It seems like @kfaraz asked something similar. Is it reasonable to say that the scope of this change is to support multi-stream processing only if the topics share the same properties -- i.e.,
ioConfig
,dataSchema
, etc are the same? For all other use cases, I wonder what effort it would take and if it the design is extensible in the future. -
What are the implications on kafka ingestion metrics and supervisor operations like reset offsets in the multi-stream supervisor mode?
-
Fwiw, Kinesis also supports multi-streaming processing by a single consumer starting
2.3.1
. Once we bump up the current kinesis client SDK version from1.14.4
to the latest stable, we can add similar support for kinesis indexing as well :)
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Outdated
Show resolved
Hide resolved
@Nullable | ||
private final String topic; | ||
|
||
// This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: incompatibilty -> incompatibility
Also, I think this could straight-up be a javadoc for the property instead of a multi-line comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for making it a javadoc. Also the content needs updating given the current code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Done.
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Outdated
Show resolved
Hide resolved
A few questions on this:
|
The topics have to belong to the same cluster. The use-cases that I know of so far do not require reading from topics in different clusters. The shape of the data can be different, just like how it can be different for multiple partitions within a topic. This patch doesn't support a topic-specific tuning config. We can't have that anyway. A task doesn't really care what topic it is reading from. It just reads from a set of partitions. Those partitions can belong to same topic or different topics.
There is no reason for dataSchema to be uniform. Imagine that the ingestion system is emitting some metrics to one topic and query system is emitting some metrics to another topic. These topics will likely have different columns. If you have decided to put these data in same datasource, you have already assumed that you are ok with the merged schema. It's just like how you will ingest data from multiple files. A topic is an equivalent of a file.
partition-id dimension value will change but otherwise, they are expected to work as it is.
Nice. I will look at that. |
return new KafkaTopicPartition( | ||
true, | ||
str.substring(0, index), | ||
Integer.parseInt(str.substring(index + 1)) |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException
{ | ||
return new KafkaRecordSupplier( | ||
spec.getIoConfig().getConsumerProperties(), | ||
sortingMapper, | ||
spec.getIoConfig().getConfigOverrides() | ||
spec.getIoConfig().getConfigOverrides(), |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
{ | ||
return new KafkaRecordSupplier( | ||
spec.getIoConfig().getConsumerProperties(), | ||
sortingMapper, | ||
spec.getIoConfig().getConfigOverrides() | ||
spec.getIoConfig().getConfigOverrides(), | ||
spec.getIoConfig().isMultiTopic() |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
I added a |
Would it make senes to make the API such that
and
That would make the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay on this, @abhishekagarwal87 .
Overall, the changes look good. I have left some comments. The code formatting seems off in a couple of places, would be nice to fix those up.
I still need to go through some classes and try out the changes in a cluster.
@Nullable | ||
private final String topic; | ||
|
||
// This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for making it a javadoc. Also the content needs updating given the current code.
SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers, | ||
SeekableStreamEndSequenceNumbers<Integer, Long> endSequenceNumbers, | ||
SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> startSequenceNumbers, | ||
SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> endSequenceNumbers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space
...a-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
Show resolved
Hide resolved
...ndexing-service/src/test/java/org/apache/druid/data/input/kafka/KafkaTopicPartitionTest.java
Outdated
Show resolved
Hide resolved
@Override | ||
public int hashCode() | ||
{ | ||
if (multiTopicPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused about excluding the topic from equals and hashCode if multiTopic
is false.
In the impl here, new KafkaTopicPartition(false, "topicA", 1)
would be considered equal to new KafkaTopicPartition(false, "topicB", 1)
.
The argument against this possibility could be that we would never have a scenario where we are required to compare two such partitions. In single-topic mode, the supervisor (or a task) would be dealing with either topicA
or topicB
, never both.
But I guess a similar argument would apply when comparing new KafkaTopicPartition(false, "topicA", 1)
and new KafkaTopicPartition(false, null, 1)
. In other words, the supervisor would never have to compare two such objects (because either all tasks would be running new Druid version or all tasks would be running old Druid version from pov of supervisor due to order of upgrades), and a task would never have to compare two such objects, because in the lifecycle of the task, it would ever see only one format of partitions.
In short, the hashCode and equals methods can be simplified to just check all the 3 fields.
But maybe I am missing some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Good point. Its just a defensive check that if someone is passing a topic for a single-topic mode, we don't compare. I was seeing some test failures because of that. But I realize now that it's entirely in my control, how those objects get created. I will simplify these equals/hashcode methods. But in the constructor, I will set the topic to null if multi-topic mode is not enabled. what do you think?
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Show resolved
Hide resolved
} | ||
List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim()); | ||
if (partitions == null) { | ||
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw some kind of DruidException
instead?
} | ||
List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim()); | ||
if (partitions == null) { | ||
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rephrase
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim()); | |
throw new ISE("Could not find topic[%s] using Kafka consumer", topic.trim()); |
List<PartitionInfo> allPartitions = new ArrayList<>(); | ||
for (String topic : stream.split(",")) { | ||
if (!multiTopic && !allPartitions.isEmpty()) { | ||
throw InvalidInput.exception("Comma separated list of topics [%s] is not supported unless you enabled " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this validation should be done before the loop using !isMultiTopic and stream.contains(",")
.
{ | ||
return partitionId % spec.getIoConfig().getTaskCount(); | ||
return partitionId.partition() % spec.getIoConfig().getTaskCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this seems a little tricky.
This logic would always lead to a skew even with single topic ingestion, but the difference between most assigned taskGroupId and least assigned taskGroupId would have been only 1. With multiple topics and no limit on the number of topics, this difference can be anything.
Say there are 5 topics A, B, C, D, E with 6 partitions each and task count is 4.
Then taskGroupId 0 and 1 would get 10 partitions each
whereas taskGroupId 2 and 3 would get 5 partitions each.
The skew can be reduced in two ways:
- Get total partition count of each topic and then assign them new integer ids. e.g. A:1 -> 1, A:2 -> 2... A:5 -> 5, then B:1 -> 6, B:2 -> 7 and so on. Then do a module of these new overall IDs instead of the
partitionId.partition()
. This is very cumbersome though. - A simpler approach could be to offset the assignment for every topic. So A starts assignment at taskGroupId 0, but B starts assignment at taskGroupId 1 and so on.
- While constructing this class, we just need to get the list of topics from the stream.
- Then the code here could become
topics = List of topics
return (partitionId.partition() + topics.getIndexOf(partitionId.topic())) % taskCount;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow. Nice catch. I am wary of that logic as I don't want the assignment to change if someone adds one more topic to the stream. I am thinking something like below
(31*topic.hashCode() + partition) % taskCount.
but that may not lead to an optimal assignment.
@vogievetsky , yeah, it might be nice to have but I don't think it would be very straightforward. For now, we should ensure that the metrics such as |
@vogievetsky - Two reasons why I don't want that to be an array like
Edit - I will make that change now itself so stream becomes a regex. |
...ka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀 , the remaining items and improvements can be addressed in follow-up PRs.
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", stream); | ||
List<PartitionInfo> allPartitions; | ||
if (multiTopic) { | ||
Pattern pattern = Pattern.compile(stream); |
Check failure
Code scanning / CodeQL
Regular expression injection
.build("Topic [%s] is not found." | ||
+ "Check that the topic exists in Kafka cluster", stream); |
Check notice
Code scanning / CodeQL
Missing space in string literal
{ | ||
return partitionId % spec.getIoConfig().getTaskCount(); | ||
Integer taskCount = spec.getIoConfig().getTaskCount(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
Going to merge this since failure was unrelated. |
This is an exciting new feature! Just to check — this feature does not allow you to ingest from multiple sets of brokers into a single data source, right? (For example, when migrating from one Kafka cluster to another, you can't set up a single data source that temporarily ingests from both clusters, right?) |
That is correct. You can not ingest from multiple Kafka clusters using this feature. |
@abhishekagarwal87 |
@maytasm - Supporting ingestion from different clusters will be a bit more involved. If you want to try it out, the best way to do is by modifying KafkaRecordSupplier class. As far as I can see, all the kafka consumer operations are done through this class. So you could keep a map of in this class assuming you have topic specific bootstrap address. Then for any call to a KafkaRecordSupplier method, rather than calling There is #poll call and for that you would keep calling poll method on each consumer. You will have to assume that the topic names are different across clusters. If they are same, then you need to add cluster id to the partition spec. |
Description
This PR adds support to read from multiple Kafka topics in the same supervisor. A multi-topic ingestion can be useful in scenarios where a cluster admin has no control over input streams. Different teams in an org may create different input topics that they can write the data to. However, the cluster admin wants all this data to be queryable in one data source.
Implementing this change required
0
for topic tp1 can be distinguished from partition0
of topic tp2.The refactoring was straightforward as well however the main challenge is maintaining backward compatibility. As the Integer partition id is part of serialized metadata and just simply changing the partition id type would fail existing supervisors. Admins should also be able to roll back to previous versions without running into any compatibility issues.
In order to achieve that, KafkaTopicPartition has an additional field called
multiTopicPartition
. This field will be set to true if a particular supervisor is running in multi-topic mode. That mode gets triggered if user is using a comma-separated list of topics such astp1,tp2
instead of a single topic.This field must be set explicitly anywhere in the code where
KafkaTopicPartition
object is created. So the code is refactored accordingly so we know in those places whether the supervisor is multi-topic or single-topic. If this field is false, the partition id is serialized in the old format. if the field is false, thentopic
field in this class is completely ignored for all purposes such as comparison, hashCode, etc. If this field is true, then the partition id is serialized in a new format. Below are some examplesKafkaTopicPartition ( multiTopicPartition : false, topic: "tp", partition: 0)
KafkaTopicPartition ( multiTopicPartition : false, topic: null, partition: 0)
KafkaTopicPartition ( multiTopicPartition : true, topic: "tp", partition: 0)
It's also possible that someone changes the
stream
in a running supervisor and tries to switch between multiple streams and single streams. I will probably add some code to disallow that kind of update.We could also enhance the serialization for KafkaTopicPartition so it can write a format version too. Though I don't think that we will really be changing that anytime soon.
With these changes, there should be no impact on currently running supervisors. If an admin tries to roll back a supervisor running in multi-topic mode, that supervisor will start failing (I am yet to explicitly test this scenario)
There is no limit on number of topics that a user can set. There is really no overhead that comes from multi-topic support. We didn't have any limit on the number of partitions that user can ingest data from. and we don't have such limit now. It's just that we store extra bits for a partition id.
There is also no change required on core task execution engine. A task is assigned the partitions the same way that it was assigned partitions before. Just the partition id is changed.
Other todos
docs changesPossibly adding a feature flag to turn on multi-topicLooking for any design feedback before I finish these todos.
Release note
Now you can ingest data from multiple Kafka topics into one datasource using a single supervisor. Before this release, you needed to union multiple streams into one stream before ingesting into druid.
Key changed/added classes in this PR
KafkaDatasourceMetadata
KafkaTopicPartition
This PR has: