Skip to content

Commit

Permalink
Add support for reading/writing commit metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
dsabadie-datadog authored and benesch committed Oct 18, 2021
1 parent 3aa7f92 commit ec2cc06
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 5 deletions.
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

Thanks, [@SreeniIO].

* Support reading and writing commit metadata via
`TopicPartitionListElem::metadata` and `TopicPartitionListElem::set_metadata`,
respectively ([#391]).

Thanks, [@phaazon].

[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#95]: https://github.com/fede1024/rust-rdkafka/issues/95
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360
[#364]: https://github.com/fede1024/rust-rdkafka/issues/364
[#367]: https://github.com/fede1024/rust-rdkafka/issues/367
[#391]: https://github.com/fede1024/rust-rdkafka/issues/391
[@djKooks]: https://github.com/djKooks
[@phaazon]: https://github.com/phaazon
[@SreeniIO]: https://github.com/SreeniIO

<a name="0.26.0"></a>
Expand Down
35 changes: 30 additions & 5 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;
use std::str;

use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{KafkaDrop, NativePtr};
use crate::util::{self, KafkaDrop, NativePtr};

const PARTITION_UNASSIGNED: i32 = -1;

Expand Down Expand Up @@ -134,13 +136,32 @@ impl<'a> TopicPartitionListElem<'a> {
)),
}
}

/// Returns the optional metadata associated with the entry.
pub fn metadata(&self) -> &str {
let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
str::from_utf8(bytes).expect("Metadata is not UTF-8")
}

/// Sets the optional metadata associated with the entry.
pub fn set_metadata<M>(&mut self, metadata: M)
where
M: AsRef<str>,
{
let metadata = metadata.as_ref();
let buf = unsafe { libc::malloc(metadata.len()) };
unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
self.ptr.metadata = buf;
self.ptr.metadata_size = metadata.len();
}
}

impl<'a> PartialEq for TopicPartitionListElem<'a> {
fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
self.topic() == other.topic()
&& self.partition() == other.partition()
&& self.offset() == other.offset()
&& self.metadata() == other.metadata()
}
}

Expand Down Expand Up @@ -360,14 +381,18 @@ impl Default for TopicPartitionList {
impl fmt::Debug for TopicPartitionList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TPL {{")?;
for elem in self.elements() {
for (i, elem) in self.elements().iter().enumerate() {
if i > 0 {
write!(f, "; ")?;
}
write!(
f,
"({}, {}): {:?}, ",
"{}/{}: offset={:?} metadata={:?}",
elem.topic(),
elem.partition(),
elem.offset()
)?
elem.offset(),
elem.metadata(),
)?;
}
write!(f, "}}")
}
Expand Down
56 changes: 56 additions & 0 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Test data consumption using high level consumers.
use std::collections::HashMap;
use std::error::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -372,3 +373,58 @@ async fn test_consumer_store_offset_commit() {
.unwrap();
assert_eq!(position, consumer.position().unwrap());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let topic_name = rand_test_topic();
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;

let create_consumer = || async {
// Disable auto-commit so we can manually drive the commits.
let mut config = HashMap::new();
config.insert("enable.auto.commit", "false");
let consumer = create_stream_consumer(&group_name, Some(config));

// Subscribe to the topic and wait for at least one message, which
// ensures that the consumer group has been joined and such.
consumer.subscribe(&[topic_name.as_str()])?;
let _ = consumer.stream().next().await;

Ok::<_, Box<dyn Error>>(consumer)
};

// Create a topic partition list where each element has some associated
// metadata.
let tpl = {
let mut tpl = TopicPartitionList::new();
let mut tpl1 = tpl.add_partition(&topic_name, 0);
tpl1.set_offset(Offset::Offset(1))?;
tpl1.set_metadata("one");
let mut tpl2 = tpl.add_partition(&topic_name, 1);
tpl2.set_offset(Offset::Offset(1))?;
tpl2.set_metadata("two");
let mut tpl3 = tpl.add_partition(&topic_name, 2);
tpl3.set_offset(Offset::Offset(1))?;
tpl3.set_metadata("three");
tpl
};

// Ensure that the commit state immediately includes the metadata.
{
let consumer = create_consumer().await?;
consumer.commit(&tpl, CommitMode::Sync)?;
assert_eq!(consumer.committed(None)?, tpl);
}

// Ensure that the commit state on a new consumer in the same group
// can see the same metadata.
{
let consumer = create_consumer().await?;
assert_eq!(consumer.committed(None)?, tpl);
}

Ok(())
}

0 comments on commit ec2cc06

Please sign in to comment.