Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribing after unsubscribing no longer working in v0.24.0 #294

Closed
athre0z opened this issue Oct 3, 2020 · 6 comments
Closed

Subscribing after unsubscribing no longer working in v0.24.0 #294

athre0z opened this issue Oct 3, 2020 · 6 comments

Comments

@athre0z
Copy link
Contributor

athre0z commented Oct 3, 2020

use futures_util::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{Offset, TopicPartitionList};

static TOPIC: &str = "mytopic";

#[tokio::main]
async fn main() {
    let consumer: StreamConsumer<_> = ClientConfig::new()
        .set("group.id", "blahblahgroup")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create()
        .unwrap();

    let mut stream = consumer.start();

    let mut part_list = TopicPartitionList::new();
    part_list.add_partition_offset(TOPIC, 0, Offset::Beginning);
    consumer.assign(&part_list).unwrap();
    dbg!(stream.next().await);

    consumer.unsubscribe();

    let mut part_list = TopicPartitionList::new();
    part_list.add_partition_offset(TOPIC, 0, Offset::Beginning);
    consumer.assign(&part_list).unwrap();
    dbg!(stream.next().await);
}

Assuming mytopic exists, the code above produces:

[src/main.rs:24] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7fb2e4804560 },
    ),
)
[src/main.rs:31] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7fb2e4804ad0 },
    ),
)

when run with rdkafka the latest v0.23, but gets stuck after printing

[src/main.rs:24] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7feb6fe040f0 },
    ),
)
^C

when using v0.24.0 or the latest commit from master as of writing.

Sample crate for repro -- just switch the version in Cargo.toml:
rdkafka-debug.tar.gz

@scooter-dangle
Copy link

I'm debugging a similar issue where the drop implementation of the consumer is hanging. I was able to show that not dropping lets the program proceed by forget-ing the consumer. The nearest related issue I found was #48.

Apologies if this ends up being a separate issue.

@benesch
Copy link
Collaborator

benesch commented Jan 5, 2021

@athre0z, mixing calls to assign/subscribe like that is odd. What are you trying to do, exactly?

I can reproduce this behavior on the BaseConsumer too, which suggests it's something in librdkafka. I wouldn't be surprised if confluentinc/librdkafka#2455 is part of the problem. There is probably a way to do what you're after that doesn't involve following a call to unsubscribe with a call to assign.

@athre0z
Copy link
Contributor Author

athre0z commented Jan 5, 2021

I pretty much have my own (ghetto) Flink-esque stream processing lib and it has a three topics for each stream: an input topic, a state topic where the stream processors full state is dumped to every 10mil events processed and one output topic. On startup, the code gathers the last message from the state topic which contains the offset to which the input topic must be rewound, then obtains the last output message written (which contains the offset of the corresponding input message) and combining this information can begin replaying messages to update it's state until it finally reaches the point where it is back in sync with the output stream and can start producing messages again.

In short: I use assign/unsubscribe to get the last message from the output & state topics & then finally assign to subscribe to the input topic. It's very much possible that this counts as "doing it wrong" and I should create separate consumers for each of these.

@benesch
Copy link
Collaborator

benesch commented Jan 5, 2021

Ohh, I see. You're intending to use unsubscribe to undo the effects of the first call to assign, yeah? I think that's probably your issue, and you should just use assign twice.

// Load state.
let mut part_list = TopicPartitionList::new();
part_list.add_partition_offset(state_topic, 0, Offset::Beginning);
consumer.assign(&part_list).unwrap();
let state: SomeStateThing = consumer.recv().await.parse();

// Process input.
let mut part_list = TopicPartitionList::new();
part_list.add_partition_offset(input_topic, 0, Offset::Offset(state.offset));
consumer.assign(&part_list).unwrap();

loop {
    match consumer.recv().await { /* ... */ }
}

Every call to assign overwrites the previous assignment—they're not additive. So if the second call to assign does not include a toppar that was present in the first call, then that toppar will be "unassigned." (Note: not unsubscribed, as that's something slightly different.)

subscribe/unsubscribe are a sort of separate API from assign. When you use subscribe, you ask Kafka to automatically assign you a fair share of the partitions from the subscribed topics (the "consumer group" protocol). When you use assign, you're telling Kafka that you're manually assigning them yourself, and to deactivate the consumer group protocol.

This is all very confusing. It's not well explained in the librdkafka documentation because librdkafka assumes familiarity with the Java API, and it's doubly not well-explained in the rust-rdkafka documentation because we assume both familiarity with librdkafka and the Java API. But, the good news is the Java API docs are pretty good! I highly recommend reading the sections in the Java KafkaConsumer docs that pertain to topic subscriptions and manual partition assignment.

@athre0z
Copy link
Contributor Author

athre0z commented Jan 7, 2021

Wow, thanks for the excellent explanation -- this whole API makes a lot more sense to me now. I had indeed assumed that assigning a second time would be additive. Closing this since it appears to have been a usage error!

@athre0z athre0z closed this as completed Jan 7, 2021
@benesch
Copy link
Collaborator

benesch commented Jan 7, 2021

Awesome, glad I could help! 🥂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants