Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crc32 Partitioning Scheme Incompatible with librdkafka #2430

Closed
csm8118 opened this issue Feb 3, 2023 · 6 comments
Closed

Crc32 Partitioning Scheme Incompatible with librdkafka #2430

csm8118 opened this issue Feb 3, 2023 · 6 comments
Labels

Comments

@csm8118
Copy link
Contributor

csm8118 commented Feb 3, 2023

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
v1.29.0 1.15

Problem Description

There is a difference between how sarama partitions when using the Crc32 hash and what is done in librdkafka (and ruby-kafka). It looks like this issue was reported some time ago but nothing came of it (#1213). The main issue is that librdkafka/ruby-kafka treat the hashed value from Crc32 as an unsigned value, whereas sarama casts it to a signed value. However, when using other hashing algorithms, the signed cast is necessary.

  • librdkafka
  • ruby-kafka
  • sarama

Here are some sample inputs that show the difference:

#ruby: key of "SheetJS", num partitions: 100 -> partition 26
 DEV [8] pry(main)> Zlib.crc32('SheetJS') % 100
=> 26
//go: key of "SheetJS", num partitions: 100 -> partition 70
sc := sarama.NewConfig()
sc.Producer.Partitioner = sarama.NewCustomHashPartitioner(crc32.NewIEEE)
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
	Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) 
// prints 70

// Using the "reference abs" partitioner
sc := sarama.NewConfig()
sc.Producer.Partitioner = sarama.NewCustomPartitioner(
	sarama.WithAbsFirst(),
	sarama.WithCustomHashFunction(crc32.NewIEEE),
)
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
	Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) 
// prints 78

I created a custom partitioner (and tests) that replicates the ruby/librdkafka behavior. The relevant snippet of that code is below.

func NewRubyKafkaCompatiblePartitioner(topic string) sarama.Partitioner {
	p := new(rubyKafkaCompatiblePartitioner)
	p.random = sarama.NewRandomPartitioner(topic)
	p.hasher = crc32.NewIEEE()
	return p
}

func (p *rubyKafkaCompatiblePartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
	if message.Key == nil {
		return p.random.Partition(message, numPartitions)
	}
	bytes, err := message.Key.Encode()
	if err != nil {
		return -1, err
	}
	p.hasher.Reset()
	_, err = p.hasher.Write(bytes)
	if err != nil {
		return -1, err
	}

	partition := p.hasher.Sum32() % uint32(numPartitions)

	return int32(partition), nil
}

Sample testing output:

sc := sarama.NewConfig()
sc.Producer.Partitioner = NewRubyKafkaCompatiblePartitioner
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
	Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) // prints 26

I would like to contribute this code to the sarama codebase, but I wanted to open this issue and have a discussion about it prior to creating a pull request.

@github-actions

This comment was marked as outdated.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Jul 17, 2023
@dnwe dnwe added bug and removed stale Issues and pull requests without any recent activity labels Jul 17, 2023
@dnwe
Copy link
Collaborator

dnwe commented Jul 22, 2023

@csm8118 thanks for raising this issue. Is it something you’re still interested in contributing? Presumably we’d have retain the old (wrong) method with a config option to switch to the compatible version

@csm8118
Copy link
Contributor Author

csm8118 commented Jul 24, 2023

@dnwe I'd be happy to contribute this. My initial thinking was to introduce something like NewRubyKafkaCompatiblePartitioner as I described in the comment. If that sounds good to you, I'll get a PR going with those changes.

@dnwe
Copy link
Collaborator

dnwe commented Jul 24, 2023

I quite like the names that librdkafka assigns to it’s available partitioners:

Partitioner: random - random distribution, consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition), consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned), murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.), fnv1a - FNV-1a hash of key (NULL keys are mapped to single partition), fnv1a_random - FNV-1a hash of key (NULL keys are randomly partitioned).
Type: string

Especially interesting as I hadn’t necessarily realised before that we are FNV-1a by default, librdkafka and its brethren are all CRC32 by default and Java is murmur2 by default

@csm8118
Copy link
Contributor Author

csm8118 commented Aug 4, 2023

I created #2560 to implement this change. I tried to pick good names for things. Let me know what you think of the changes!

@csm8118
Copy link
Contributor Author

csm8118 commented Aug 8, 2023

Thanks @dnwe! Closing this issue out.

@csm8118 csm8118 closed this as completed Aug 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants