Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom assignment strategy #846

Merged
merged 7 commits into from
Aug 17, 2020

Conversation

abicky
Copy link
Contributor

@abicky abicky commented Jun 27, 2020

This PR adds the feature that allows users to specify custom assignment strategy.
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.

For a concrete example, I'm actually using a custom assignment strategy described in https://github.com/abicky/ruby-kafka-ec2/blob/v0.1.1/README.md#kafkaec2mixedinstanceassignmentstrategy. To use it, I wrote some monkey patches as you can see on https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.1/lib/kafka/ec2/ext. I can remove the patches if ruby-kafka supports this feature.

@abicky abicky force-pushed the support-custom-assignment-strategy branch from 424677b to 992839c Compare June 27, 2020 11:21
This commit adds the feature that allows users to specify custom
assignment strategy. In some cases, you might want to assign more
partitions to some consumers. For example, in applications
inserting some records to a database, the consumers running on
hosts nearby the database can process more messages than the
consumers running on other hosts.
@abicky abicky force-pushed the support-custom-assignment-strategy branch from 992839c to 5bf2547 Compare June 27, 2020 11:33
Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think clients of the library should have to use Protocol::MemberAssignment directly. How about a simpler protocol, or perhaps some kind of imperative API, e.g.

def assign(members:, topics:)
  ...

  assign_partition(topic: ..., partition: ..., member: ...)
end

Then extract the full assignment table after that? Or maybe something else, something that doesn’t require using stuff in Protocol directly (because that may change with the protocol...)

README.md Outdated
assignment_strategy_builder = ->(cluster) do
CustomAssignmentStrategy.new(cluster, "some-host-information")
end
consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ the docs in the README!

README.md Outdated Show resolved Hide resolved
@abicky abicky force-pushed the support-custom-assignment-strategy branch 2 times, most recently from cbab110 to 34d6928 Compare June 28, 2020 10:11
@abicky
Copy link
Contributor Author

abicky commented Jun 28, 2020

Thank you for your review!

I don’t think clients of the library should have to use Protocol::MemberAssignment directly.

I agree with you. I've introduced Kafka::ConsumerGroup::Assignor in the commit 34d6928 to adopt the Builder pattern. Now, the strategy object has a simpler interface, and we can change the strategy by changing the strategy object.
I used the namespace Kafka::ConsumerGroup to avoid confusion with Kafka::Partitioner. I think it is better to rename Kafka::RoundRobinAssignmentStrategy to Kafka::ConsumerGroup::RoundRobinAssignmentStrategy, but I didn't because some developers might dislike renaming to preserver the result of git blame.

Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I'm open to merging pretty much as-is, but added a suggestion about a different API that you might consider – as a potential user you're probably better able to evaluate that. In general, simplifying the strategy object interface as much as possible would be nice.

Can you add a note to the changelog as well? This could be a risky change, and I'll probably do a beta release, so it's nice with a good description there.

Array(partitions).each do |partition|
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call to split the boilerplate from the real assignment strategy, given it's sufficiently flexible to handle real world strategies.

README.md Outdated
end

# @return [String]
def protocol_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to register these strategies under the protocol names and refer to them by name? For example:

class RandomStrategy
  def initialize(cluster:, user_data:)
    ...
  end

  def assign(...)
    ...
  end
end

Kafka::Consumer.register_assignment_strategy(:random, RandomStrategy)

consumer = kafka.consumer(.., assignment_strategy: :random)

That would allow users to set the strategies up "statically", and avoids protocol name collisions. Might not be worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would actually allow us to avoid using a class altogether, I think:

Kafka::Consumer.register_assignment_strategy(:random) do |cluster, user_data, members, partitions|
  # strategy goes here
end

We could wrap with an object that exposes e.g. user_data internally.

Copy link
Contributor Author

@abicky abicky Jul 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried implementing Kafka::ConsumerGroup::Assignor.register_strategy in 0f7a9e9.
However, I wonder if this implementation is good or not for the following reasons:

  • Developers who implement strategies should understand what protocol_name means, but they might consider it as just a strategy name by registering it via the method.
  • Developers should write tests of the strategy, but they won't if the strategy is defined in a block.
  • Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors.

What do you think?

By the way, I want to pass the strategy directly, not the name, because I want to change the parameters of the strategy on each application. If we can't pass the strategy directly, we have to pass arguments with it like below:

class CustomAssignmentStrategy
  def initialize(params)
    ...
  end

  ...
end
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)

consumer = kafka.consumer(group_id: "some-group", assignment_strategy: {
  name: :custom,
  params: {
    ...
  },
})

In the preceding example, we can use neither positional arguments nor keyword arguments. It is not cool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to init params, can't you do this?

strategy = CustomAssignmentStrategy.new(1, 2, 3)

Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |...|
  strategy.call(...)
end

Seeing as that's a more complex use case, I don't think the additional boilerplate is a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Developers who implement strategies should understand what protocol_name means, but they might consider it as just a strategy name by registering it via the method.

I think it makes sense to couple the strategy name and protocol name – what other use would we have for the protocol name?

  • Developers should write tests of the strategy, but they won't if the strategy is defined in a block.

I think it's perfectly reasonable to provide an accessible way for developers to get stuff done, and let them do testing on their own terms. For example, by extracting the logic to a method or class and call it from the block.

  • Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors.

I think raising an exception when trying to re-define a strategy makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment! I reimplemented the feature: f1a8162.

With regards to init params, can't you do this?

If we can't pass the strategy directly and the strategy has the method user_data (that means each consumer generates its user data and the leader consumer assigns partitions using the data), we have to introduce a class that creates a strategy class like below:

factory = StrategyClassFactory.new
strategy_class1 = factroy.create_strategy_class(params)
Kafka::Consumer.register_assignment_strategy(:strategy1, strategy_class1)
strategy_class2 = factroy.create_class(another_params)
Kafka::Consumer.register_assignment_strategy(:strategy2, strategy_class2)

consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: :strategy1)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: :strategy2)

or we have to introduce an option for user_data:

strategy1 = CustomAssignmentStrategy.new(params)
strategy2 = CustomAssignmentStrategy.new(another_params)

Kafka::ConsumerGroup::Assignor.register_strategy(:strategy1, user_data: -> { strategy1.user_data }) do |...|
  strategy1.call(...)
end
Kafka::ConsumerGroup::Assignor.register_strategy(:strategy2, user_data: -> { strategy2.user_data }) do |...|
  strategy2.call(...)
end

consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: :strategy1)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: :strategy2)

Note that we have to instantiate the strategy in the same scope as we call the method Kafka::ConsumerGroup::Assignor.register_strategy.

I think passing the strategy directly will make the code simpler:

Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)

strategy1 = CustomAssignmentStrategy.new(params)
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: strategy1)

strategy2 = CustomAssignmentStrategy.new(another_params)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: strategy2)

That also allows us to provide gems that registers some strategies. If a gem provides the strategy, the user only have to write:

strategy1 = CustomAssignmentStrategy.new(params)
consumer1 = kafka.consumer(group_id: "group1", assignment_strategy: strategy1)

strategy2 = CustomAssignmentStrategy.new(another_params)
consumer2 = kafka.consumer(group_id: "group2", assignment_strategy: strategy2)

If we can't pass the strategy directly, it is difficult for gems to register strategies with custom parameters.

Anyway, any options allow me to remove the monkey patches on https://github.com/abicky/ruby-kafka-ec2/tree/v0.1.3/lib/kafka/ec2/ext, and I guess you prefer supporting only the block call fasion, not both the block call fasion and the strategy name fasion:

# Good
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |...|
  ...
end

# Bad
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, :strategy_class)
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, strategy)

This is why I added the option user_data to the method register_strategy in the commit f1a8162.

what other use would we have for the protocol name?

This is related to "Even if developers can't register two strategies with the same at the same time, they might replace the strategy with the same name, and that might cause some errors."
For example, supposing someone introduced a strategy on the first release like below:

class CustomStrategy
  def user_data
    Socket.ip_address_list.find { |i| i.ipv4_private? }.ip_address
  end

  # Assigns partitions considering the network topology
  def assign(cluster:, members:, partitions:)
    member_ids = members.flat_map do |id, metadata|
      weight = metadata.user_data.split(".")[2] == 0 ? 2 : 1
      [id] * weight
    end
    partitions_per_member = Hash.new {|h, k| h[k] = [] }
    partitions.each_with_index do |partition, index|
      partitions_per_member[member_ids[index % member_ids.count]] << partition
    end

    partitions_per_member
  end
end
Kafka::Consumer.register_assignment_strategy(:custom_strategy, CustomStrategy)

but on the next release, another person changed the strategy with the same name as below:

class CustomCountStrategy
  def user_data
    Concurrent.processor_count
  end

  # Assigns partitions considering the number of cores
  def assign(cluster:, members:, partitions:)
    member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
    partitions_per_member = Hash.new {|h, k| h[k] = [] }
    partitions.each_with_index do |partition, index|
      partitions_per_member[member_ids[index % member_ids.count]] << partition
    end

    partitions_per_member
  end
end
Kafka::Consumer.register_assignment_strategy(:custom_strategy, CustomStrategy)

In this case, the strategy name "custom_strategy" was used only once on each release, but different strategies that require different user data were registered. That might cause a problem.

For example, by extracting the logic to a method or class and call it from the block.

That makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you've convinced me! How about this: allow the assignment_strategy: parameter, but drop the registration and naming for the time being. Strategies must implement #call and may implement #user_data.

Let's see how that API gets used before complicating things further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion! I tried implementing that: 1b10aed.

First, could you read the README to make sure that the changes are as expected?
https://github.com/zendesk/ruby-kafka/pull/846/files#diff-04c6e90faac2675aa89e2176d2eec7d8

If they are as expected, I appreciate it if you review the whole PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!


```ruby
class CustomAssignmentStrategy
def initialize(user_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def initialize(user_data)
def initialize(cluster, user_data)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignor is responsible for getting partition information
and building an assignment from the result of its strategy.
That simplifies assignment strategies.
@abicky abicky force-pushed the support-custom-assignment-strategy branch from 34d6928 to 8e92d34 Compare July 5, 2020 15:25
@abicky abicky force-pushed the support-custom-assignment-strategy branch from 46583cd to 475964b Compare July 5, 2020 16:03
This method prevents us from using the same protocol name
and makes us register custom strategies easily.
@abicky abicky force-pushed the support-custom-assignment-strategy branch from 475964b to 0f7a9e9 Compare July 5, 2020 16:13
@abicky
Copy link
Contributor Author

abicky commented Jul 5, 2020

Can you add a note to the changelog as well?

I added a note in the commit fbd121d.

but added a suggestion about a different API that you might consider

As I mentioned in #846 (comment), I tried implementing your suggestion. Could you take a look?

abicky added a commit to abicky/ruby-kafka that referenced this pull request Aug 7, 2020
abicky added a commit to abicky/ruby-kafka that referenced this pull request Aug 7, 2020
@abicky abicky force-pushed the support-custom-assignment-strategy branch from 253cc4c to f212602 Compare August 7, 2020 20:27
@abicky abicky force-pushed the support-custom-assignment-strategy branch from f212602 to f1a8162 Compare August 7, 2020 21:11
abicky added a commit to abicky/ruby-kafka that referenced this pull request Aug 15, 2020
@abicky abicky force-pushed the support-custom-assignment-strategy branch from a6c1ef4 to 1b10aed Compare August 15, 2020 16:24
@dasch dasch merged commit c3e90bc into zendesk:master Aug 17, 2020
@dasch
Copy link
Contributor

dasch commented Aug 17, 2020

Awesome work @abicky, thank you for the patience!

@abicky abicky deleted the support-custom-assignment-strategy branch October 12, 2020 08:38
abicky added a commit to abicky/ruby-kafka-ec2 that referenced this pull request Mar 29, 2022
ruby-kafka 1.3.0 includes the changes on
zendesk/ruby-kafka#846, and they make
ruby-kafka-ec2 much simpler.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants