-
Notifications
You must be signed in to change notification settings - Fork 337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support custom assignment strategy #846
Support custom assignment strategy #846
Conversation
424677b
to
992839c
Compare
This commit adds the feature that allows users to specify custom assignment strategy. In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
992839c
to
5bf2547
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think clients of the library should have to use Protocol::MemberAssignment
directly. How about a simpler protocol, or perhaps some kind of imperative API, e.g.
def assign(members:, topics:)
...
assign_partition(topic: ..., partition: ..., member: ...)
end
Then extract the full assignment table after that? Or maybe something else, something that doesn’t require using stuff in Protocol
directly (because that may change with the protocol...)
README.md
Outdated
assignment_strategy_builder = ->(cluster) do | ||
CustomAssignmentStrategy.new(cluster, "some-host-information") | ||
end | ||
consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ the docs in the README!
cbab110
to
34d6928
Compare
Thank you for your review!
I agree with you. I've introduced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I'm open to merging pretty much as-is, but added a suggestion about a different API that you might consider – as a potential user you're probably better able to evaluate that. In general, simplifying the strategy object interface as much as possible would be nice.
Can you add a note to the changelog as well? This could be a risky change, and I'll probably do a beta release, so it's nice with a good description there.
Array(partitions).each do |partition| | ||
group_assignment[member_id].assign(partition.topic, [partition.partition_id]) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call to split the boilerplate from the real assignment strategy, given it's sufficiently flexible to handle real world strategies.
README.md
Outdated
end | ||
|
||
# @return [String] | ||
def protocol_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to register these strategies under the protocol names and refer to them by name? For example:
class RandomStrategy
def initialize(cluster:, user_data:)
...
end
def assign(...)
...
end
end
Kafka::Consumer.register_assignment_strategy(:random, RandomStrategy)
consumer = kafka.consumer(.., assignment_strategy: :random)
That would allow users to set the strategies up "statically", and avoids protocol name collisions. Might not be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would actually allow us to avoid using a class altogether, I think:
Kafka::Consumer.register_assignment_strategy(:random) do |cluster, user_data, members, partitions|
# strategy goes here
end
We could wrap with an object that exposes e.g. user_data
internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried implementing Kafka::ConsumerGroup::Assignor.register_strategy
in 0f7a9e9.
However, I wonder if this implementation is good or not for the following reasons:
- Developers who implement strategies should understand what
protocol_name
means, but they might consider it as just a strategy name by registering it via the method. - Developers should write tests of the strategy, but they won't if the strategy is defined in a block.
- Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors.
What do you think?
By the way, I want to pass the strategy directly, not the name, because I want to change the parameters of the strategy on each application. If we can't pass the strategy directly, we have to pass arguments with it like below:
class CustomAssignmentStrategy
def initialize(params)
...
end
...
end
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: {
name: :custom,
params: {
...
},
})
In the preceding example, we can use neither positional arguments nor keyword arguments. It is not cool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With regards to init params, can't you do this?
strategy = CustomAssignmentStrategy.new(1, 2, 3)
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |...|
strategy.call(...)
end
Seeing as that's a more complex use case, I don't think the additional boilerplate is a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Developers who implement strategies should understand what
protocol_name
means, but they might consider it as just a strategy name by registering it via the method.
I think it makes sense to couple the strategy name and protocol name – what other use would we have for the protocol name?
- Developers should write tests of the strategy, but they won't if the strategy is defined in a block.
I think it's perfectly reasonable to provide an accessible way for developers to get stuff done, and let them do testing on their own terms. For example, by extracting the logic to a method or class and call it from the block.
- Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors.
I think raising an exception when trying to re-define a strategy makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment! I reimplemented the feature: f1a8162.
With regards to init params, can't you do this?
If we can't pass the strategy directly and the strategy has the method user_data
(that means each consumer generates its user data and the leader consumer assigns partitions using the data), we have to introduce a class that creates a strategy class like below:
factory = StrategyClassFactory.new
strategy_class1 = factroy.create_strategy_class(params)
Kafka::Consumer.register_assignment_strategy(:strategy1, strategy_class1)
strategy_class2 = factroy.create_class(another_params)
Kafka::Consumer.register_assignment_strategy(:strategy2, strategy_class2)
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: :strategy1)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: :strategy2)
or we have to introduce an option for user_data
:
strategy1 = CustomAssignmentStrategy.new(params)
strategy2 = CustomAssignmentStrategy.new(another_params)
Kafka::ConsumerGroup::Assignor.register_strategy(:strategy1, user_data: -> { strategy1.user_data }) do |...|
strategy1.call(...)
end
Kafka::ConsumerGroup::Assignor.register_strategy(:strategy2, user_data: -> { strategy2.user_data }) do |...|
strategy2.call(...)
end
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: :strategy1)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: :strategy2)
Note that we have to instantiate the strategy in the same scope as we call the method Kafka::ConsumerGroup::Assignor.register_strategy
.
I think passing the strategy directly will make the code simpler:
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)
strategy1 = CustomAssignmentStrategy.new(params)
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: strategy1)
strategy2 = CustomAssignmentStrategy.new(another_params)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: strategy2)
That also allows us to provide gems that registers some strategies. If a gem provides the strategy, the user only have to write:
strategy1 = CustomAssignmentStrategy.new(params)
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: strategy1)
strategy2 = CustomAssignmentStrategy.new(another_params)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: strategy2)
If we can't pass the strategy directly, it is difficult for gems to register strategies with custom parameters.
Anyway, any options allow me to remove the monkey patches on https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.3/lib/kafka/ec2/ext, and I guess you prefer supporting only the block call fasion, not both the block call fasion and the strategy name fasion:
# Good
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |...|
...
end
# Bad
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, :strategy_class)
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, strategy)
This is why I added the option user_data
to the method register_strategy
in the commit f1a8162.
what other use would we have for the protocol name?
This is related to "Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors."
For example, supposing someone introduced a strategy on the first release like below:
class CustomStrategy
def user_data
Socket.ip_address_list.find { |i| i.ipv4_private? }.ip_address
end
# Assigns partitions considering the network topology
def assign(cluster:, members:, partitions:)
member_ids = members.flat_map do |id, metadata|
weight = metadata.user_data.split(".")[2] == 0 ? 2 : 1
[id] * weight
end
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end
partitions_per_member
end
end
Kafka::Consumer.register_assignment_strategy(:custom_strategy, CustomStrategy)
but on the next release, another person changed the strategy with the same name as below:
class CustomCountStrategy
def user_data
Concurrent.processor_count
end
# Assigns partitions considering the number of cores
def assign(cluster:, members:, partitions:)
member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end
partitions_per_member
end
end
Kafka::Consumer.register_assignment_strategy(:custom_strategy, CustomStrategy)
In this case, the strategy name "custom_strategy" was used only once on each release, but different strategies that require different user data were registered. That might cause a problem.
For example, by extracting the logic to a method or class and call it from the block.
That makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, you've convinced me! How about this: allow the assignment_strategy:
parameter, but drop the registration and naming for the time being. Strategies must implement #call
and may implement #user_data
.
Let's see how that API gets used before complicating things further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion! I tried implementing that: 1b10aed.
First, could you read the README to make sure that the changes are as expected?
https://github.com/zendesk/ruby-kafka/pull/846/files#diff-04c6e90faac2675aa89e2176d2eec7d8
If they are as expected, I appreciate it if you review the whole PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
|
||
```ruby | ||
class CustomAssignmentStrategy | ||
def initialize(user_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def initialize(user_data) | |
def initialize(cluster, user_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to delete @cluster
💦
I deleted the variable and force-pushed (https://github.com/zendesk/ruby-kafka/compare/34d69288a22d5593826c29ec8b787221efe237de..8e92d34b545b819b89595fbeafcb756b384bfea6).
The assignor is responsible for getting partition information and building an assignment from the result of its strategy. That simplifies assignment strategies.
34d6928
to
8e92d34
Compare
46583cd
to
475964b
Compare
This method prevents us from using the same protocol name and makes us register custom strategies easily.
475964b
to
0f7a9e9
Compare
I added a note in the commit fbd121d.
As I mentioned in #846 (comment), I tried implementing your suggestion. Could you take a look? |
253cc4c
to
f212602
Compare
f212602
to
f1a8162
Compare
See zendesk#846 (comment) for the reason.
See zendesk#846 (comment) for the reason.
a6c1ef4
to
1b10aed
Compare
Awesome work @abicky, thank you for the patience! |
ruby-kafka 1.3.0 includes the changes on zendesk/ruby-kafka#846, and they make ruby-kafka-ec2 much simpler.
This PR adds the feature that allows users to specify custom assignment strategy.
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
For a concrete example, I'm actually using a custom assignment strategy described in https://github.com/abicky/ruby-kafka-ec2/blob/v0.1.1/README.md#kafkaec2mixedinstanceassignmentstrategy. To use it, I wrote some monkey patches as you can see on https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.1/lib/kafka/ec2/ext. I can remove the patches if ruby-kafka supports this feature.