Skip to content

Commit

Permalink
Allowing topic metadata to propagate for both unknown and existing to…
Browse files Browse the repository at this point in the history
…pics (#2)
  • Loading branch information
marcin-krystianc authored Jan 3, 2025
1 parent cb8c19c commit 506c1b5
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt,
(rkt->rkt_rk->rk_conf.metadata_propagation_max_ms * 1000)) -
rkt->rkt_ts_metadata;

if (!permanent && rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN &&
if (!permanent && (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN || rkt->rkt_state == RD_KAFKA_TOPIC_S_EXISTS) &&
remains_us > 0) {
/* Still allowing topic metadata to propagate. */
rd_kafka_dbg(
Expand Down Expand Up @@ -1326,7 +1326,10 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ ||
mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID)
rd_kafka_topic_set_notexists(rkt, mdt->err);
{
if (!rd_kafka_topic_set_notexists(rkt, mdt->err))
goto cleanup;
}
else if (mdt->partition_cnt > 0)
rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS);
else if (mdt->err)
Expand Down Expand Up @@ -1437,6 +1440,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rkt,
mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);

cleanup:
rd_kafka_topic_wrunlock(rkt);

/* Loose broker references */
Expand Down

0 comments on commit 506c1b5

Please sign in to comment.