Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support reading and writing commit metadata. #391

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

Thanks, [@SreeniIO].

* Support reading and writing commit metadata via
`TopicPartitionListElem::metadata` and `TopicPartitionListElem::set_metadata`,
respectively ([#391]).

Thanks, [@phaazon].

[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#95]: https://github.com/fede1024/rust-rdkafka/issues/95
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360
[#364]: https://github.com/fede1024/rust-rdkafka/issues/364
[#367]: https://github.com/fede1024/rust-rdkafka/issues/367
[#391]: https://github.com/fede1024/rust-rdkafka/issues/391
[@djKooks]: https://github.com/djKooks
[@phaazon]: https://github.com/phaazon
[@SreeniIO]: https://github.com/SreeniIO

<a name="0.26.0"></a>
Expand Down
35 changes: 30 additions & 5 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;
use std::str;

use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{KafkaDrop, NativePtr};
use crate::util::{self, KafkaDrop, NativePtr};

const PARTITION_UNASSIGNED: i32 = -1;

Expand Down Expand Up @@ -134,13 +136,32 @@ impl<'a> TopicPartitionListElem<'a> {
)),
}
}

/// Returns the optional metadata associated with the entry.
pub fn metadata(&self) -> &str {
hadronized marked this conversation as resolved.
Show resolved Hide resolved
let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
str::from_utf8(bytes).expect("Metadata is not UTF-8")
}

/// Sets the optional metadata associated with the entry.
pub fn set_metadata<M>(&mut self, metadata: M)
where
M: AsRef<str>,
{
let metadata = metadata.as_ref();
let buf = unsafe { libc::malloc(metadata.len()) };
unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
self.ptr.metadata = buf;
self.ptr.metadata_size = metadata.len();
}
}

impl<'a> PartialEq for TopicPartitionListElem<'a> {
fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
self.topic() == other.topic()
&& self.partition() == other.partition()
&& self.offset() == other.offset()
&& self.metadata() == other.metadata()
}
}

Expand Down Expand Up @@ -360,14 +381,18 @@ impl Default for TopicPartitionList {
impl fmt::Debug for TopicPartitionList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TPL {{")?;
for elem in self.elements() {
for (i, elem) in self.elements().iter().enumerate() {
if i > 0 {
write!(f, "; ")?;
}
write!(
f,
"({}, {}): {:?}, ",
"{}/{}: offset={:?} metadata={:?}",
elem.topic(),
elem.partition(),
elem.offset()
)?
elem.offset(),
elem.metadata(),
)?;
}
write!(f, "}}")
}
Expand Down
56 changes: 56 additions & 0 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Test data consumption using high level consumers.

use std::collections::HashMap;
use std::error::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -372,3 +373,58 @@ async fn test_consumer_store_offset_commit() {
.unwrap();
assert_eq!(position, consumer.position().unwrap());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let topic_name = rand_test_topic();
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;

let create_consumer = || async {
// Disable auto-commit so we can manually drive the commits.
let mut config = HashMap::new();
config.insert("enable.auto.commit", "false");
let consumer = create_stream_consumer(&group_name, Some(config));

// Subscribe to the topic and wait for at least one message, which
// ensures that the consumer group has been joined and such.
consumer.subscribe(&[topic_name.as_str()])?;
let _ = consumer.stream().next().await;

Ok::<_, Box<dyn Error>>(consumer)
};

// Create a topic partition list where each element has some associated
// metadata.
let tpl = {
let mut tpl = TopicPartitionList::new();
let mut tpl1 = tpl.add_partition(&topic_name, 0);
tpl1.set_offset(Offset::Offset(1))?;
tpl1.set_metadata("one");
let mut tpl2 = tpl.add_partition(&topic_name, 1);
tpl2.set_offset(Offset::Offset(1))?;
tpl2.set_metadata("two");
let mut tpl3 = tpl.add_partition(&topic_name, 2);
tpl3.set_offset(Offset::Offset(1))?;
tpl3.set_metadata("three");
tpl
};

// Ensure that the commit state immediately includes the metadata.
{
let consumer = create_consumer().await?;
consumer.commit(&tpl, CommitMode::Sync)?;
assert_eq!(consumer.committed(None)?, tpl);
}

// Ensure that the commit state on a new consumer in the same group
// can see the same metadata.
{
let consumer = create_consumer().await?;
assert_eq!(consumer.committed(None)?, tpl);
}

Ok(())
}