From ec2cc0699f0fdc24137632b0ec056a0299947314 Mon Sep 17 00:00:00 2001 From: Dimitri Sabadie Date: Mon, 20 Sep 2021 11:31:09 +0200 Subject: [PATCH] Add support for reading/writing commit metadata --- changelog.md | 8 ++++++ src/topic_partition_list.rs | 35 ++++++++++++++++++---- tests/test_high_consumers.rs | 56 ++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/changelog.md b/changelog.md index 4ab66b800..058765e09 100644 --- a/changelog.md +++ b/changelog.md @@ -38,12 +38,20 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). Thanks, [@SreeniIO]. +* Support reading and writing commit metadata via + `TopicPartitionListElem::metadata` and `TopicPartitionListElem::set_metadata`, + respectively ([#391]). + + Thanks, [@phaazon]. + [#89]: https://github.com/fede1024/rust-rdkafka/issues/89 [#95]: https://github.com/fede1024/rust-rdkafka/issues/95 [#360]: https://github.com/fede1024/rust-rdkafka/issues/360 [#364]: https://github.com/fede1024/rust-rdkafka/issues/364 [#367]: https://github.com/fede1024/rust-rdkafka/issues/367 +[#391]: https://github.com/fede1024/rust-rdkafka/issues/391 [@djKooks]: https://github.com/djKooks +[@phaazon]: https://github.com/phaazon [@SreeniIO]: https://github.com/SreeniIO diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 0e3a15c83..3eda6499a 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -6,12 +6,14 @@ use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::fmt; use std::slice; +use std::str; +use libc::c_void; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::error::{IsError, KafkaError, KafkaResult}; -use crate::util::{KafkaDrop, NativePtr}; +use crate::util::{self, KafkaDrop, NativePtr}; const PARTITION_UNASSIGNED: i32 = -1; @@ -134,6 +136,24 @@ impl<'a> TopicPartitionListElem<'a> { )), } } + + /// Returns the optional metadata associated with the entry. + pub fn metadata(&self) -> &str { + let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) }; + str::from_utf8(bytes).expect("Metadata is not UTF-8") + } + + /// Sets the optional metadata associated with the entry. + pub fn set_metadata(&mut self, metadata: M) + where + M: AsRef, + { + let metadata = metadata.as_ref(); + let buf = unsafe { libc::malloc(metadata.len()) }; + unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) }; + self.ptr.metadata = buf; + self.ptr.metadata_size = metadata.len(); + } } impl<'a> PartialEq for TopicPartitionListElem<'a> { @@ -141,6 +161,7 @@ impl<'a> PartialEq for TopicPartitionListElem<'a> { self.topic() == other.topic() && self.partition() == other.partition() && self.offset() == other.offset() + && self.metadata() == other.metadata() } } @@ -360,14 +381,18 @@ impl Default for TopicPartitionList { impl fmt::Debug for TopicPartitionList { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TPL {{")?; - for elem in self.elements() { + for (i, elem) in self.elements().iter().enumerate() { + if i > 0 { + write!(f, "; ")?; + } write!( f, - "({}, {}): {:?}, ", + "{}/{}: offset={:?} metadata={:?}", elem.topic(), elem.partition(), - elem.offset() - )? + elem.offset(), + elem.metadata(), + )?; } write!(f, "}}") } diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index a276ef7ca..6f30edc47 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -1,6 +1,7 @@ //! Test data consumption using high level consumers. use std::collections::HashMap; +use std::error::Error; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; @@ -372,3 +373,58 @@ async fn test_consumer_store_offset_commit() { .unwrap(); assert_eq!(position, consumer.position().unwrap()); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_consumer_commit_metadata() -> Result<(), Box> { + let _ = env_logger::try_init(); + + let topic_name = rand_test_topic(); + let group_name = rand_test_group(); + populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; + + let create_consumer = || async { + // Disable auto-commit so we can manually drive the commits. + let mut config = HashMap::new(); + config.insert("enable.auto.commit", "false"); + let consumer = create_stream_consumer(&group_name, Some(config)); + + // Subscribe to the topic and wait for at least one message, which + // ensures that the consumer group has been joined and such. + consumer.subscribe(&[topic_name.as_str()])?; + let _ = consumer.stream().next().await; + + Ok::<_, Box>(consumer) + }; + + // Create a topic partition list where each element has some associated + // metadata. + let tpl = { + let mut tpl = TopicPartitionList::new(); + let mut tpl1 = tpl.add_partition(&topic_name, 0); + tpl1.set_offset(Offset::Offset(1))?; + tpl1.set_metadata("one"); + let mut tpl2 = tpl.add_partition(&topic_name, 1); + tpl2.set_offset(Offset::Offset(1))?; + tpl2.set_metadata("two"); + let mut tpl3 = tpl.add_partition(&topic_name, 2); + tpl3.set_offset(Offset::Offset(1))?; + tpl3.set_metadata("three"); + tpl + }; + + // Ensure that the commit state immediately includes the metadata. + { + let consumer = create_consumer().await?; + consumer.commit(&tpl, CommitMode::Sync)?; + assert_eq!(consumer.committed(None)?, tpl); + } + + // Ensure that the commit state on a new consumer in the same group + // can see the same metadata. + { + let consumer = create_consumer().await?; + assert_eq!(consumer.committed(None)?, tpl); + } + + Ok(()) +}