From 34d69288a22d5593826c29ec8b787221efe237de Mon Sep 17 00:00:00 2001 From: abicky Date: Sun, 28 Jun 2020 18:00:33 +0900 Subject: [PATCH] Introduce Kafka::ConsumerGroup::Assignor The assignor is responsible for getting partition information and building an assignment from the result of its strategy. That simplifies assignment strategies. --- README.md | 21 ++++--- lib/kafka/client.rb | 14 ++--- lib/kafka/consumer_group.rb | 12 ++-- lib/kafka/consumer_group/assignor.rb | 63 ++++++++++++++++++++ lib/kafka/round_robin_assignment_strategy.rb | 51 ++++------------ spec/consumer_group/assignor_spec.rb | 48 +++++++++++++++ spec/functional/consumer_group_spec.rb | 29 +++------ spec/round_robin_assignment_strategy_spec.rb | 63 +++++++++----------- 8 files changed, 182 insertions(+), 119 deletions(-) create mode 100644 lib/kafka/consumer_group/assignor.rb create mode 100644 spec/consumer_group/assignor_spec.rb diff --git a/README.md b/README.md index 689910723..bb8386d83 100644 --- a/README.md +++ b/README.md @@ -727,11 +727,11 @@ end #### Customizing Partition Assignment Strategy In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts. -You can implement a custom assignment strategy and use it by passing a proc object that create the strategy as the argument `assignment_strategy_builder` like below: +You can implement a custom assignment strategy and use it by passing an object that implements `#protocol_name`, `#user_data`, and `#assign` as the argument `assignment_strategy` like below: ```ruby class CustomAssignmentStrategy - def initialize(cluster, user_data) + def initialize(user_data) @cluster = cluster @user_data = user_data end @@ -748,20 +748,19 @@ class CustomAssignmentStrategy # Assign the topic partitions to the group members. # - # @param members [Hash] a hash + # @param members [Hash] a hash # mapping member ids to metadata - # @param topics [Array] topics - # @return [Hash] a hash mapping member - # ids to assignments. - def assign(members:, topics:) + # @param partitions [Array] a list of + # partitions the consumer group processes + # @return [Hash] a hash + # mapping member ids to partitions. + def assign(members:, partitions:) ... end end -assignment_strategy_builder = ->(cluster) do - CustomAssignmentStrategy.new(cluster, "some-host-information") -end -consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder) +strategy = CustomAssignmentStrategy.new(cluster, "some-host-information") +consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy) ``` ### Thread Safety diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 219eb8d4f..317ab9d4a 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -350,8 +350,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: # If it is n (n > 0), the topic list will be refreshed every n seconds # @param interceptors [Array] a list of consumer interceptors that implement # `call(Kafka::FetchedBatch)`. - # @param assignment_strategy_builder [Proc] a procedure that implements - # `call(Kafka::Cluster)` and creates a assignment strategy. + # @param assignment_strategy [Object] a partition assignment strategy that + # implements `protocol_type()`, `user_data()`, and `assign(members:, partitions:)` # @return [Consumer] def consumer( group_id:, @@ -364,7 +364,7 @@ def consumer( fetcher_max_queue_size: 100, refresh_topic_interval: 0, interceptors: [], - assignment_strategy_builder: nil + assignment_strategy: nil ) cluster = initialize_cluster @@ -375,12 +375,6 @@ def consumer( # The Kafka protocol expects the retention time to be in ms. retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1 - if assignment_strategy_builder - assignment_strategy = assignment_strategy_builder.call(cluster) - else - assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: cluster) - end - group = ConsumerGroup.new( cluster: cluster, logger: @logger, @@ -389,7 +383,7 @@ def consumer( rebalance_timeout: rebalance_timeout, retention_time: retention_time, instrumenter: instrumenter, - assignment_strategy: assignment_strategy, + assignment_strategy: assignment_strategy ) fetcher = Fetcher.new( diff --git a/lib/kafka/consumer_group.rb b/lib/kafka/consumer_group.rb index 567426d4f..fe04affa1 100644 --- a/lib/kafka/consumer_group.rb +++ b/lib/kafka/consumer_group.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "set" +require "kafka/consumer_group/assignor" require "kafka/round_robin_assignment_strategy" module Kafka @@ -19,7 +20,10 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout @members = {} @topics = Set.new @assigned_partitions = {} - @assignment_strategy = assignment_strategy + @assignor = Assignor.new( + cluster: cluster, + strategy: assignment_strategy || RoundRobinAssignmentStrategy.new + ) @retention_time = retention_time end @@ -144,8 +148,8 @@ def join_group rebalance_timeout: @rebalance_timeout, member_id: @member_id, topics: @topics, - protocol_name: @assignment_strategy.protocol_name, - user_data: @assignment_strategy.user_data, + protocol_name: @assignor.protocol_name, + user_data: @assignor.user_data, ) Protocol.handle_error(response.error_code) @@ -182,7 +186,7 @@ def synchronize if group_leader? @logger.info "Chosen as leader of group `#{@group_id}`" - group_assignment = @assignment_strategy.assign( + group_assignment = @assignor.assign( members: @members, topics: @topics, ) diff --git a/lib/kafka/consumer_group/assignor.rb b/lib/kafka/consumer_group/assignor.rb new file mode 100644 index 000000000..559cbaa80 --- /dev/null +++ b/lib/kafka/consumer_group/assignor.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require "kafka/protocol/member_assignment" + +module Kafka + class ConsumerGroup + + # A consumer group partition assignor + class Assignor + Partition = Struct.new(:topic, :partition_id) + + # @param cluster [Kafka::Cluster] + # @param strategy [Object] an object that implements #protocol_type, + # #user_data, and #assign. + def initialize(cluster:, strategy:) + @cluster = cluster + @strategy = strategy + end + + def protocol_name + @strategy.protocol_name + end + + def user_data + @strategy.user_data + end + + # Assign the topic partitions to the group members. + # + # @param members [Hash] a hash + # mapping member ids to metadata. + # @param topics [Array] topics + # @return [Hash] a hash mapping member + # ids to assignments. + def assign(members:, topics:) + topic_partitions = topics.flat_map do |topic| + begin + partition_ids = @cluster.partitions_for(topic).map(&:partition_id) + rescue UnknownTopicOrPartition + raise UnknownTopicOrPartition, "unknown topic #{topic}" + end + partition_ids.map {|partition_id| Partition.new(topic, partition_id) } + end + + group_assignment = {} + + members.each_key do |member_id| + group_assignment[member_id] = Protocol::MemberAssignment.new + end + @strategy.assign(members: members, partitions: topic_partitions).each do |member_id, partitions| + Array(partitions).each do |partition| + group_assignment[member_id].assign(partition.topic, [partition.partition_id]) + end + end + + group_assignment + rescue Kafka::LeaderNotAvailable + sleep 1 + retry + end + end + end +end diff --git a/lib/kafka/round_robin_assignment_strategy.rb b/lib/kafka/round_robin_assignment_strategy.rb index 62129dec4..c4cc815c6 100644 --- a/lib/kafka/round_robin_assignment_strategy.rb +++ b/lib/kafka/round_robin_assignment_strategy.rb @@ -1,16 +1,10 @@ # frozen_string_literal: true -require "kafka/protocol/member_assignment" - module Kafka # A consumer group partition assignment strategy that assigns partitions to # consumers in a round-robin fashion. class RoundRobinAssignmentStrategy - def initialize(cluster:) - @cluster = cluster - end - def protocol_name "roundrobin" end @@ -21,43 +15,20 @@ def user_data # Assign the topic partitions to the group members. # - # @param members [Hash] a hash + # @param members [Hash] a hash # mapping member ids to metadata - # @param topics [Array] topics - # @return [Hash] a hash mapping member - # ids to assignments. - def assign(members:, topics:) - group_assignment = {} - - members.each_key do |member_id| - group_assignment[member_id] = Protocol::MemberAssignment.new - end - - topic_partitions = topics.flat_map do |topic| - begin - partitions = @cluster.partitions_for(topic).map(&:partition_id) - rescue UnknownTopicOrPartition - raise UnknownTopicOrPartition, "unknown topic #{topic}" - end - Array.new(partitions.count) { topic }.zip(partitions) - end - - partitions_per_member = topic_partitions.group_by.with_index do |_, index| - index % members.count - end.values - - members.keys.zip(partitions_per_member).each do |member_id, member_partitions| - unless member_partitions.nil? - member_partitions.each do |topic, partition| - group_assignment[member_id].assign(topic, [partition]) - end - end + # @param partitions [Array] a list of + # partitions the consumer group processes + # @return [Hash] a hash + # mapping member ids to partitions. + def assign(members:, partitions:) + member_ids = members.keys + partitions_per_member = Hash.new {|h, k| h[k] = [] } + partitions.each_with_index do |partition, index| + partitions_per_member[member_ids[index % member_ids.count]] << partition end - group_assignment - rescue Kafka::LeaderNotAvailable - sleep 1 - retry + partitions_per_member end end end diff --git a/spec/consumer_group/assignor_spec.rb b/spec/consumer_group/assignor_spec.rb new file mode 100644 index 000000000..d6d253697 --- /dev/null +++ b/spec/consumer_group/assignor_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +describe Kafka::ConsumerGroup::Assignor do + let(:cluster) { double(:cluster) } + let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) } + let(:strategy) do + klass = Class.new do + def protocol_name + "test" + end + + def user_data + nil + end + + def assign(members:, partitions:) + assignment = {} + partition_count_per_member = (partitions.count.to_f / members.count).ceil + partitions.each_slice(partition_count_per_member).with_index do |chunk, index| + assignment[members.keys[index]] = chunk + end + + assignment + end + end + klass.new + end + + it "assigns all partitions" do + members = Hash[(0...10).map {|i| ["member#{i}", nil] }] + topics = ["greetings"] + partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) } + + allow(cluster).to receive(:partitions_for) { partitions } + + assignments = assignor.assign(members: members, topics: topics) + + partitions.each do |partition| + member = assignments.values.find {|assignment| + assignment.topics.find {|topic, partitions| + partitions.include?(partition.partition_id) + } + } + + expect(member).to_not be_nil + end + end +end diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 045e4788e..6c056a736 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -456,8 +456,7 @@ received_messages = [] assignment_strategy_class = Class.new do - def initialize(cluster, weight) - @cluster = cluster + def initialize(weight) @weight = weight end @@ -469,32 +468,22 @@ def user_data @weight.to_s end - def assign(members:, topics:) - group_assignment = members.each_key.with_object({}) do |member_id, assignment| - assignment[member_id] = Kafka::Protocol::MemberAssignment.new + def assign(members:, partitions:) + member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i } + partitions_per_member = Hash.new {|h, k| h[k] = [] } + partitions.each_with_index do |partition, index| + partitions_per_member[member_ids[index % member_ids.count]] << partition end - member_ids = members.flat_map { |id, metadata| [id] * metadata.user_data.to_i } - partition_index = 0 - topics.each_with_index do |topic, i| - @cluster.partitions_for(topic).each_with_index do |partition, j| - member_id = member_ids[partition_index % member_ids.size] - group_assignment[member_id].assign(topic, [partition.partition_id]) - partition_index += 1 - end - end - - group_assignment + partitions_per_member end end consumers = 2.times.map do |i| - assignment_strategy_builder = ->(cluster) { - assignment_strategy_class.new(cluster, i + 1) - } + assignment_strategy = assignment_strategy_class.new(i + 1) kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) - consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy_builder: assignment_strategy_builder) + consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy) consumer.subscribe(topic) consumer end diff --git a/spec/round_robin_assignment_strategy_spec.rb b/spec/round_robin_assignment_strategy_spec.rb index c5b7ff53a..312639c5c 100644 --- a/spec/round_robin_assignment_strategy_spec.rb +++ b/spec/round_robin_assignment_strategy_spec.rb @@ -1,22 +1,18 @@ # frozen_string_literal: true describe Kafka::RoundRobinAssignmentStrategy do - let(:cluster) { double(:cluster) } - let(:strategy) { described_class.new(cluster: cluster) } + let(:strategy) { described_class.new } it "assigns all partitions" do members = Hash[(0...10).map {|i| ["member#{i}", nil] }] - topics = ["greetings"] - partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) } + partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) } - allow(cluster).to receive(:partitions_for) { partitions } - - assignments = strategy.assign(members: members, topics: topics) + assignments = strategy.assign(members: members, partitions: partitions) partitions.each do |partition| - member = assignments.values.find {|assignment| - assignment.topics.find {|topic, partitions| - partitions.include?(partition.partition_id) + member = assignments.values.find {|assigned_partitions| + assigned_partitions.find {|assigned_partition| + assigned_partition == partition } } @@ -25,29 +21,26 @@ end it "spreads all partitions between members" do - cluster = double(:cluster) - strategy = described_class.new(cluster: cluster) - members = Hash[(0...10).map {|i| ["member#{i}", nil] }] topics = ["topic1", "topic2"] - partitions = (0...5).map {|i| double(:"partition#{i}", partition_id: i) } - - allow(cluster).to receive(:partitions_for) { partitions } + partitions = topics.product((0...5).to_a).map {|topic, i| + double(:"partition#{i}", topic: topic, partition_id: i) + } - assignments = strategy.assign(members: members, topics: topics) + assignments = strategy.assign(members: members, partitions: partitions) partitions.each do |partition| - member = assignments.values.find {|assignment| - assignment.topics.find {|topic, partitions| - partitions.include?(partition.partition_id) + member = assignments.values.find {|assigned_partitions| + assigned_partitions.find {|assigned_partition| + assigned_partition == partition } } expect(member).to_not be_nil end - num_partitions_assigned = assignments.values.map do |assignment| - assignment.topics.values.map(&:count).inject(:+) + num_partitions_assigned = assignments.values.map do |assigned_partitions| + assigned_partitions.count end expect(num_partitions_assigned).to all eq(1) @@ -86,13 +79,13 @@ } ].each do |name:, topics:, members:| it name do - allow(cluster).to receive(:partitions_for) do |topic| - topics.fetch(topic).map do |partition_id| - double(:"partition#{partition_id}", partition_id: partition_id) - end - end + partitions = topics.flat_map {|topic, partition_ids| + partition_ids.map {|i| + double(:"partition#{i}", topic: topic, partition_id: i) + } + } - assignments = strategy.assign(members: members, topics: topics.keys) + assignments = strategy.assign(members: members, partitions: partitions) expect_all_partitions_assigned(topics, assignments) expect_even_assignments(topics, assignments) @@ -100,10 +93,12 @@ end def expect_all_partitions_assigned(topics, assignments) - topics.each do |topic, partitions| - partitions.each do |partition| - assigned = assignments.values.find do |assignment| - assignment.topics.fetch(topic, []).include?(partition) + topics.each do |topic, partition_ids| + partition_ids.each do |partition_id| + assigned = assignments.values.find do |assigned_partitions| + assigned_partitions.find {|assigned_partition| + assigned_partition.topic == topic && assigned_partition.partition_id == partition_id + } end expect(assigned).to_not be_nil end @@ -112,8 +107,8 @@ def expect_all_partitions_assigned(topics, assignments) def expect_even_assignments(topics, assignments) num_partitions = topics.values.flatten.count - assignments.values.each do |assignment| - num_assigned = assignment.topics.values.flatten.count + assignments.values.each do |assigned_partition| + num_assigned = assigned_partition.count expect(num_assigned).to be_within(1).of(num_partitions.to_f / assignments.count) end end