Skip to content

Commit

Permalink
Remove consumer check from those methods which are guaranteed anyway
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Apr 20, 2023
1 parent ba88908 commit b5aca40
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 20 deletions.
20 changes: 6 additions & 14 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -3050,17 +3050,14 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0(
void *opaque) {
struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
rd_kafka_op_res_t res;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
if (timeout_ms)
rd_kafka_app_poll_blocking(rkq->rkq_rk);

res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
rd_kafka_consume_cb, &ctx);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkq->rkq_rk);
rd_kafka_app_polled(rkq->rkq_rk);

return res;
}
Expand Down Expand Up @@ -3124,10 +3121,8 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_message_t *rkmessage = NULL;
rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
if (timeout_ms)
rd_kafka_app_poll_blocking(rk);

rd_kafka_yield_thread = 0;
Expand All @@ -3146,8 +3141,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
/* Callback called rd_kafka_yield(), we must
* stop dispatching the queue and return. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
rd_kafka_app_polled(rk);
return NULL;
}

Expand All @@ -3159,8 +3153,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
/* Timeout reached with no op returned. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
ETIMEDOUT);
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
rd_kafka_app_polled(rk);
return NULL;
}

Expand All @@ -3175,8 +3168,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {

rd_kafka_set_last_error(0, 0);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
rd_kafka_app_polled(rk);

return rkmessage;
}
Expand Down
8 changes: 2 additions & 6 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_q_t *fwdq;
struct timespec timeout_tspec;
int i;
rd_bool_t can_q_contain_fetched_msgs;

mtx_lock(&rkq->rkq_lock);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
Expand All @@ -651,11 +650,9 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
return cnt;
}

can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK);
mtx_unlock(&rkq->rkq_lock);

if (timeout_ms && can_q_contain_fetched_msgs)
if (timeout_ms)
rd_kafka_app_poll_blocking(rk);

rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
Expand Down Expand Up @@ -762,8 +759,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_op_destroy(rko);
}

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
rd_kafka_app_polled(rk);

return cnt;
}
Expand Down

0 comments on commit b5aca40

Please sign in to comment.