Skip to content

Commit

Permalink
Introduce Kafka::ConsumerGroup::Assignor
Browse files Browse the repository at this point in the history
The assignor is responsible for getting partition information
and building an assignment from the result of its strategy.
That simplifies assignment strategies.
  • Loading branch information
abicky committed Jun 28, 2020
1 parent 5bf2547 commit 34d6928
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 119 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,11 @@ end
#### Customizing Partition Assignment Strategy

In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
You can implement a custom assignment strategy and use it by passing a proc object that create the strategy as the argument `assignment_strategy_builder` like below:
You can implement a custom assignment strategy and use it by passing an object that implements `#protocol_name`, `#user_data`, and `#assign` as the argument `assignment_strategy` like below:

```ruby
class CustomAssignmentStrategy
def initialize(cluster, user_data)
def initialize(user_data)
@cluster = cluster
@user_data = user_data
end
Expand All @@ -748,20 +748,19 @@ class CustomAssignmentStrategy

# Assign the topic partitions to the group members.
#
# @param members [Hash<String, Protocol::JoinGroupResponse::Metadata>] a hash
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata
# @param topics [Array<String>] topics
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
# partitions the consumer group processes
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def assign(members:, partitions:)
...
end
end

assignment_strategy_builder = ->(cluster) do
CustomAssignmentStrategy.new(cluster, "some-host-information")
end
consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder)
strategy = CustomAssignmentStrategy.new(cluster, "some-host-information")
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
```

### Thread Safety
Expand Down
14 changes: 4 additions & 10 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
# If it is n (n > 0), the topic list will be refreshed every n seconds
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
# `call(Kafka::FetchedBatch)`.
# @param assignment_strategy_builder [Proc] a procedure that implements
# `call(Kafka::Cluster)` and creates a assignment strategy.
# @param assignment_strategy [Object] a partition assignment strategy that
# implements `protocol_type()`, `user_data()`, and `assign(members:, partitions:)`
# @return [Consumer]
def consumer(
group_id:,
Expand All @@ -364,7 +364,7 @@ def consumer(
fetcher_max_queue_size: 100,
refresh_topic_interval: 0,
interceptors: [],
assignment_strategy_builder: nil
assignment_strategy: nil
)
cluster = initialize_cluster

Expand All @@ -375,12 +375,6 @@ def consumer(
# The Kafka protocol expects the retention time to be in ms.
retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1

if assignment_strategy_builder
assignment_strategy = assignment_strategy_builder.call(cluster)
else
assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: cluster)
end

group = ConsumerGroup.new(
cluster: cluster,
logger: @logger,
Expand All @@ -389,7 +383,7 @@ def consumer(
rebalance_timeout: rebalance_timeout,
retention_time: retention_time,
instrumenter: instrumenter,
assignment_strategy: assignment_strategy,
assignment_strategy: assignment_strategy
)

fetcher = Fetcher.new(
Expand Down
12 changes: 8 additions & 4 deletions lib/kafka/consumer_group.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "set"
require "kafka/consumer_group/assignor"
require "kafka/round_robin_assignment_strategy"

module Kafka
Expand All @@ -19,7 +20,10 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
@members = {}
@topics = Set.new
@assigned_partitions = {}
@assignment_strategy = assignment_strategy
@assignor = Assignor.new(
cluster: cluster,
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
)
@retention_time = retention_time
end

Expand Down Expand Up @@ -144,8 +148,8 @@ def join_group
rebalance_timeout: @rebalance_timeout,
member_id: @member_id,
topics: @topics,
protocol_name: @assignment_strategy.protocol_name,
user_data: @assignment_strategy.user_data,
protocol_name: @assignor.protocol_name,
user_data: @assignor.user_data,
)

Protocol.handle_error(response.error_code)
Expand Down Expand Up @@ -182,7 +186,7 @@ def synchronize
if group_leader?
@logger.info "Chosen as leader of group `#{@group_id}`"

group_assignment = @assignment_strategy.assign(
group_assignment = @assignor.assign(
members: @members,
topics: @topics,
)
Expand Down
63 changes: 63 additions & 0 deletions lib/kafka/consumer_group/assignor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

require "kafka/protocol/member_assignment"

module Kafka
class ConsumerGroup

# A consumer group partition assignor
class Assignor
Partition = Struct.new(:topic, :partition_id)

# @param cluster [Kafka::Cluster]
# @param strategy [Object] an object that implements #protocol_type,
# #user_data, and #assign.
def initialize(cluster:, strategy:)
@cluster = cluster
@strategy = strategy
end

def protocol_name
@strategy.protocol_name
end

def user_data
@strategy.user_data
end

# Assign the topic partitions to the group members.
#
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata.
# @param topics [Array<String>] topics
# @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
topic_partitions = topics.flat_map do |topic|
begin
partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
rescue UnknownTopicOrPartition
raise UnknownTopicOrPartition, "unknown topic #{topic}"
end
partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
end

group_assignment = {}

members.each_key do |member_id|
group_assignment[member_id] = Protocol::MemberAssignment.new
end
@strategy.assign(members: members, partitions: topic_partitions).each do |member_id, partitions|
Array(partitions).each do |partition|
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
end
end

group_assignment
rescue Kafka::LeaderNotAvailable
sleep 1
retry
end
end
end
end
51 changes: 11 additions & 40 deletions lib/kafka/round_robin_assignment_strategy.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
# frozen_string_literal: true

require "kafka/protocol/member_assignment"

module Kafka

# A consumer group partition assignment strategy that assigns partitions to
# consumers in a round-robin fashion.
class RoundRobinAssignmentStrategy
def initialize(cluster:)
@cluster = cluster
end

def protocol_name
"roundrobin"
end
Expand All @@ -21,43 +15,20 @@ def user_data

# Assign the topic partitions to the group members.
#
# @param members [Hash<String, Protocol::JoinGroupResponse::Metadata>] a hash
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata
# @param topics [Array<String>] topics
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
group_assignment = {}

members.each_key do |member_id|
group_assignment[member_id] = Protocol::MemberAssignment.new
end

topic_partitions = topics.flat_map do |topic|
begin
partitions = @cluster.partitions_for(topic).map(&:partition_id)
rescue UnknownTopicOrPartition
raise UnknownTopicOrPartition, "unknown topic #{topic}"
end
Array.new(partitions.count) { topic }.zip(partitions)
end

partitions_per_member = topic_partitions.group_by.with_index do |_, index|
index % members.count
end.values

members.keys.zip(partitions_per_member).each do |member_id, member_partitions|
unless member_partitions.nil?
member_partitions.each do |topic, partition|
group_assignment[member_id].assign(topic, [partition])
end
end
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
# partitions the consumer group processes
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def assign(members:, partitions:)
member_ids = members.keys
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end

group_assignment
rescue Kafka::LeaderNotAvailable
sleep 1
retry
partitions_per_member
end
end
end
48 changes: 48 additions & 0 deletions spec/consumer_group/assignor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

describe Kafka::ConsumerGroup::Assignor do
let(:cluster) { double(:cluster) }
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
let(:strategy) do
klass = Class.new do
def protocol_name
"test"
end

def user_data
nil
end

def assign(members:, partitions:)
assignment = {}
partition_count_per_member = (partitions.count.to_f / members.count).ceil
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
assignment[members.keys[index]] = chunk
end

assignment
end
end
klass.new
end

it "assigns all partitions" do
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
topics = ["greetings"]
partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) }

allow(cluster).to receive(:partitions_for) { partitions }

assignments = assignor.assign(members: members, topics: topics)

partitions.each do |partition|
member = assignments.values.find {|assignment|
assignment.topics.find {|topic, partitions|
partitions.include?(partition.partition_id)
}
}

expect(member).to_not be_nil
end
end
end
29 changes: 9 additions & 20 deletions spec/functional/consumer_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,7 @@
received_messages = []

assignment_strategy_class = Class.new do
def initialize(cluster, weight)
@cluster = cluster
def initialize(weight)
@weight = weight
end

Expand All @@ -469,32 +468,22 @@ def user_data
@weight.to_s
end

def assign(members:, topics:)
group_assignment = members.each_key.with_object({}) do |member_id, assignment|
assignment[member_id] = Kafka::Protocol::MemberAssignment.new
def assign(members:, partitions:)
member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end

member_ids = members.flat_map { |id, metadata| [id] * metadata.user_data.to_i }
partition_index = 0
topics.each_with_index do |topic, i|
@cluster.partitions_for(topic).each_with_index do |partition, j|
member_id = member_ids[partition_index % member_ids.size]
group_assignment[member_id].assign(topic, [partition.partition_id])
partition_index += 1
end
end

group_assignment
partitions_per_member
end
end

consumers = 2.times.map do |i|
assignment_strategy_builder = ->(cluster) {
assignment_strategy_class.new(cluster, i + 1)
}
assignment_strategy = assignment_strategy_class.new(i + 1)

kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy_builder: assignment_strategy_builder)
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
consumer.subscribe(topic)
consumer
end
Expand Down
Loading

0 comments on commit 34d6928

Please sign in to comment.