diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ae3629d93..c9662399d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ librdkafka v1.9.0 is a feature release: ## Enhancements + * Improved producer queue scheduling. Fixes the performance regression + introduced in v1.7.0 for some produce patterns. (#3538, #2912) * Windows: Added native Win32 IO/Queue scheduling. This removes the internal TCP loopback connections that were previously used for timely queue wakeups. @@ -43,12 +45,17 @@ librdkafka v1.9.0 is a feature release: if not already created. * Bundled zlib upgraded to version 1.2.12. * Bundled OpenSSL upgraded to 1.1.1n. + * Added `test.mock.broker.rtt` to simulate RTT/latency for mock brokers. ## Fixes ### General fixes + * Fix various 1 second delays due to internal broker threads blocking on IO + even though there are events to handle. + These delays could be seen randomly in any of the non produce/consume + request APIs, such as `commit_transaction()`, `list_groups()`, etc. * Windows: some applications would crash with an error message like `no OPENSSL_Applink()` written to the console if `ssl.keystore.location` was configured. @@ -120,7 +127,10 @@ librdkafka v1.9.0 is a feature release: broker (added in Apache Kafka 2.8), which could cause the producer to seemingly hang. This error code is now correctly handled by raising a fatal error. - * The logic for enforcing that `message.timeout.ms` is greater than + * Improved producer queue wakeup scheduling. This should significantly + decrease the number of wakeups and thus syscalls for high message rate + producers. (#3538, #2912) + * The logic for enforcing that `message.timeout.ms` is greather than an explicitly configured `linger.ms` was incorrect and instead of erroring out early the lingering time was automatically adjusted to the message timeout, ignoring the configured `linger.ms`. diff --git a/STATISTICS.md b/STATISTICS.md index 0a21ee0842..392e2cf05a 100644 --- a/STATISTICS.md +++ b/STATISTICS.md @@ -106,7 +106,7 @@ rxidle | int | | Microseconds since last socket receive (or -1 if no receives ye req | object | | Request type counters. Object key is the request name, value is the number of requests sent. zbuf_grow | int | | Total number of decompression buffer size increases buf_grow | int | | Total number of buffer size increases (deprecated, unused) -wakeups | int | | Broker thread poll wakeups +wakeups | int | | Broker thread poll loop wakeups connects | int | | Number of connection attempts, including successful and failed, and name resolution failures. disconnects | int | | Number of disconnects (triggered by broker, network, load-balancer, etc.). int_latency | object | | Internal producer queue latency in microseconds. See *Window stats* below diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index c4ba0274b5..dc31c3e0f8 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -216,17 +216,23 @@ static void msg_delivered(rd_kafka_t *rk, !last || msgs_wait_cnt < 5 || !(msgs_wait_cnt % dr_disp_div) || (now - last) >= dispintvl * 1000 || verbosity >= 3) { if (rkmessage->err && verbosity >= 2) - printf("%% Message delivery failed: %s [%" PRId32 + printf("%% Message delivery failed (broker %" PRId32 + "): " + "%s [%" PRId32 "]: " "%s (%li remain)\n", + rd_kafka_message_broker_id(rkmessage), rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rd_kafka_err2str(rkmessage->err), msgs_wait_cnt); else if (verbosity > 2) printf("%% Message delivered (offset %" PRId64 + ", broker %" PRId32 "): " "%li remain\n", - rkmessage->offset, msgs_wait_cnt); + rkmessage->offset, + rd_kafka_message_broker_id(rkmessage), + msgs_wait_cnt); if (verbosity >= 3 && do_seq) printf(" --> \"%.*s\"\n", (int)rkmessage->len, (const char *)rkmessage->payload); @@ -1485,7 +1491,7 @@ int main(int argc, char **argv) { (int)RD_MAX(0, (next - rd_clock()) / 1000)); } while (next > rd_clock()); - } else { + } else if (cnt.msgs % 1000 == 0) { rd_kafka_poll(rk, 0); } diff --git a/src/rdkafka.c b/src/rdkafka.c index e4afab8e47..0d206b402c 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2312,6 +2312,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, /* Create Mock cluster */ rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0); if (rk->rk_conf.mock.broker_cnt > 0) { + const char *mock_bootstraps; rk->rk_mock.cluster = rd_kafka_mock_cluster_new(rk, rk->rk_conf.mock.broker_cnt); @@ -2323,16 +2324,18 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, goto fail; } + mock_bootstraps = + rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster), rd_kafka_log(rk, LOG_NOTICE, "MOCK", "Mock cluster enabled: " "original bootstrap.servers and security.protocol " - "ignored and replaced"); + "ignored and replaced with %s", + mock_bootstraps); /* Overwrite bootstrap.servers and connection settings */ - if (rd_kafka_conf_set( - &rk->rk_conf, "bootstrap.servers", - rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster), - NULL, 0) != RD_KAFKA_CONF_OK) + if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers", + mock_bootstraps, NULL, + 0) != RD_KAFKA_CONF_OK) rd_assert(!"failed to replace mock bootstrap.servers"); if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol", @@ -2340,6 +2343,13 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_assert(!"failed to reset mock security.protocol"); rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT; + + /* Apply default RTT to brokers */ + if (rk->rk_conf.mock.broker_rtt) + rd_kafka_mock_broker_set_rtt( + rk->rk_mock.cluster, + -1/*all brokers*/, + rk->rk_conf.mock.broker_rtt); } if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL || diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index e2d9960c0f..d670b74b3d 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3417,7 +3417,13 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { * * The return value indicates if ops_serve() below should * use a timeout or not. + * + * If there are ops enqueued cut the timeout short so + * that they're processed as soon as possible. */ + if (abs_timeout > 0 && rd_kafka_q_len(rkb->rkb_ops) > 0) + abs_timeout = RD_POLL_NOWAIT; + if (rd_kafka_transport_io_serve( rkb->rkb_transport, rkb->rkb_ops, rd_timeout_remains(abs_timeout))) @@ -3429,6 +3435,8 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { wakeup = rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout)); + rd_atomic64_add(&rkb->rkb_c.wakeups, 1); + /* An op might have triggered the need for a connection, if so * transition to TRY_CONNECT state. */ if (unlikely(rd_kafka_broker_needs_connection(rkb) && @@ -3627,11 +3635,29 @@ rd_kafka_broker_outbufs_space(rd_kafka_broker_t *rkb) { } + +/** + * @brief Update \p *next_wakeup_ptr to \p maybe_next_wakeup if it is sooner. + * + * Both parameters are absolute timestamps. + * \p maybe_next_wakeup must not be 0. + */ +#define rd_kafka_set_next_wakeup(next_wakeup_ptr, maybe_next_wakeup) \ + do { \ + rd_ts_t *__n = (next_wakeup_ptr); \ + rd_ts_t __m = (maybe_next_wakeup); \ + rd_dassert(__m != 0); \ + if (__m < *__n) \ + *__n = __m; \ + } while (0) + + /** * @brief Serve a toppar for producing. * * @param next_wakeup will be updated to when the next wake-up/attempt is - * desired, only lower (sooner) values will be set. + * desired. Does not take the current value into + * consideration, even if it is lower. * @param do_timeout_scan perform msg timeout scan * @param may_send if set to false there is something on the global level * that prohibits sending messages, such as a transactional @@ -3659,6 +3685,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, int reqcnt; int inflight = 0; uint64_t epoch_base_msgid = 0; + rd_bool_t batch_ready = rd_false; /* By limiting the number of not-yet-sent buffers (rkb_outbufs) we * provide a backpressure mechanism to the producer loop @@ -3685,8 +3712,8 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, timeoutcnt = rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now, &next); - if (next && next < *next_wakeup) - *next_wakeup = next; + if (next) + rd_kafka_set_next_wakeup(next_wakeup, next); if (rd_kafka_is_idempotent(rkb->rkb_rk)) { if (!rd_kafka_pid_valid(pid)) { @@ -3732,10 +3759,32 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, } else if (max_requests > 0) { /* Move messages from locked partition produce queue * to broker-local xmit queue. */ - if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0) + if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0) { + rd_kafka_msgq_insert_msgq( &rktp->rktp_xmit_msgq, &rktp->rktp_msgq, rktp->rktp_rkt->rkt_conf.msg_order_cmp); + } + + /* Calculate maximum wait-time to honour + * queue.buffering.max.ms contract. + * Unless flushing in which case immediate + * wakeups are allowed. */ + batch_ready = rd_kafka_msgq_allow_wakeup_at( + &rktp->rktp_msgq, &rktp->rktp_xmit_msgq, + /* Only update the broker thread wakeup time + * if connection is up and messages can actually be + * sent, otherwise the wakeup can't do much. */ + rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP ? next_wakeup + : NULL, + now, flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us, + /* Batch message count threshold */ + rkb->rkb_rk->rk_conf.batch_num_messages, + /* Batch size threshold. + * When compression is enabled the + * threshold is increased by x8. */ + (rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) * + (int64_t)rkb->rkb_rk->rk_conf.batch_size); } rd_kafka_toppar_unlock(rktp); @@ -3870,30 +3919,9 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, /* Attempt to fill the batch size, but limit our waiting * to queue.buffering.max.ms, batch.num.messages, and batch.size. */ - if (!flushing && r < rkb->rkb_rk->rk_conf.batch_num_messages && - rktp->rktp_xmit_msgq.rkmq_msg_bytes < - (int64_t)rkb->rkb_rk->rk_conf.batch_size) { - rd_ts_t wait_max; - - /* Calculate maximum wait-time to honour - * queue.buffering.max.ms contract. */ - wait_max = rd_kafka_msg_enq_time(rkm) + - rkb->rkb_rk->rk_conf.buffering_max_us; - - if (wait_max > now) { - /* Wait for more messages or queue.buffering.max.ms - * to expire. */ - if (wait_max < *next_wakeup) - *next_wakeup = wait_max; - return 0; - } - } - - /* Honour retry.backoff.ms. */ - if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) { - if (rkm->rkm_u.producer.ts_backoff < *next_wakeup) - *next_wakeup = rkm->rkm_u.producer.ts_backoff; - /* Wait for backoff to expire */ + if (!batch_ready) { + /* Wait for more messages or queue.buffering.max.ms + * to expire. */ return 0; } @@ -3907,10 +3935,22 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, break; } - /* If there are messages still in the queue, make the next - * wakeup immediate. */ - if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0) - *next_wakeup = now; + /* Update the allowed wake-up time based on remaining messages + * in the queue. */ + if (cnt > 0) { + rd_kafka_toppar_lock(rktp); + batch_ready = rd_kafka_msgq_allow_wakeup_at( + &rktp->rktp_msgq, &rktp->rktp_xmit_msgq, next_wakeup, now, + flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us, + /* Batch message count threshold */ + rkb->rkb_rk->rk_conf.batch_num_messages, + /* Batch size threshold. + * When compression is enabled the + * threshold is increased by x8. */ + (rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) * + (int64_t)rkb->rkb_rk->rk_conf.batch_size); + rd_kafka_toppar_unlock(rktp); + } return cnt; } @@ -3921,7 +3961,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb, * @brief Produce from all toppars assigned to this broker. * * @param next_wakeup is updated if the next IO/ops timeout should be - * less than the input value. + * less than the input value (i.e., sooner). * * @returns the total number of messages produced. */ @@ -3970,8 +4010,7 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb, rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan, may_send, flushing); - if (this_next_wakeup < ret_next_wakeup) - ret_next_wakeup = this_next_wakeup; + rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup); } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp, rktp_activelink)) != @@ -4008,7 +4047,7 @@ static void rd_kafka_broker_producer_serve(rd_kafka_broker_t *rkb, (abs_timeout > (now = rd_clock()))) { rd_bool_t do_timeout_scan; rd_ts_t next_wakeup = abs_timeout; - int overshoot; + rd_bool_t overshot; rd_kafka_broker_unlock(rkb); @@ -4016,8 +4055,8 @@ static void rd_kafka_broker_producer_serve(rd_kafka_broker_t *rkb, * on each state change, to make sure messages in * partition rktp_xmit_msgq are timed out before * being attempted to re-transmit. */ - overshoot = rd_interval(&timeout_scan, 1000 * 1000, now); - do_timeout_scan = cnt++ == 0 || overshoot >= 0; + overshot = rd_interval(&timeout_scan, 1000 * 1000, now) >= 0; + do_timeout_scan = cnt++ == 0 || overshot; rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup, do_timeout_scan); @@ -5403,9 +5442,9 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) { rd_kafka_sasl_broker_term(rkb); if (rkb->rkb_wakeup_fd[0] != -1) - rd_close(rkb->rkb_wakeup_fd[0]); + rd_socket_close(rkb->rkb_wakeup_fd[0]); if (rkb->rkb_wakeup_fd[1] != -1) - rd_close(rkb->rkb_wakeup_fd[1]); + rd_socket_close(rkb->rkb_wakeup_fd[1]); if (rkb->rkb_recv_buf) rd_kafka_buf_destroy(rkb->rkb_recv_buf); diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index 05a8af611a..0552d89557 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -541,29 +541,33 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ #define rd_kafka_buf_read_i64(rkbuf, dstptr) \ do { \ int64_t _v; \ + int64_t *_vp = dstptr; \ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ - *(dstptr) = be64toh(_v); \ + *_vp = be64toh(_v); \ } while (0) #define rd_kafka_buf_peek_i64(rkbuf, of, dstptr) \ do { \ int64_t _v; \ + int64_t *_vp = dstptr; \ rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \ - *(dstptr) = be64toh(_v); \ + *_vp = be64toh(_v); \ } while (0) #define rd_kafka_buf_read_i32(rkbuf, dstptr) \ do { \ int32_t _v; \ + int32_t *_vp = dstptr; \ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ - *(dstptr) = be32toh(_v); \ + *_vp = be32toh(_v); \ } while (0) #define rd_kafka_buf_peek_i32(rkbuf, of, dstptr) \ do { \ int32_t _v; \ + int32_t *_vp = dstptr; \ rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \ - *(dstptr) = be32toh(_v); \ + *_vp = be32toh(_v); \ } while (0) @@ -579,16 +583,17 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ #define rd_kafka_buf_read_i16(rkbuf, dstptr) \ do { \ int16_t _v; \ + int16_t *_vp = dstptr; \ rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ - *(dstptr) = (int16_t)be16toh(_v); \ + *_vp = (int16_t)be16toh(_v); \ } while (0) - #define rd_kafka_buf_peek_i16(rkbuf, of, dstptr) \ do { \ int16_t _v; \ + int16_t *_vp = dstptr; \ rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \ - *(dstptr) = be16toh(_v); \ + *_vp = be16toh(_v); \ } while (0) #define rd_kafka_buf_read_i16a(rkbuf, dst) \ @@ -615,29 +620,31 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ /** * @brief Read varint and store in int64_t \p dst */ -#define rd_kafka_buf_read_varint(rkbuf, dst) \ +#define rd_kafka_buf_read_varint(rkbuf, dstptr) \ do { \ int64_t _v; \ + int64_t *_vp = dstptr; \ size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, &_v); \ if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \ rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \ "varint parsing failed"); \ - *(dst) = _v; \ + *_vp = _v; \ } while (0) /** * @brief Read unsigned varint and store in uint64_t \p dst */ -#define rd_kafka_buf_read_uvarint(rkbuf, dst) \ +#define rd_kafka_buf_read_uvarint(rkbuf, dstptr) \ do { \ uint64_t _v; \ + uint64_t *_vp = dstptr; \ size_t _r = \ rd_slice_read_uvarint(&(rkbuf)->rkbuf_reader, &_v); \ if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \ rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \ "uvarint parsing failed"); \ - *(dst) = _v; \ + *_vp = _v; \ } while (0) diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 5894462a4c..0899002a97 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1027,6 +1027,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "This will automatically overwrite `bootstrap.servers` with the " "mock broker list.", 0, 10000, 0}, + {_RK_GLOBAL | _RK_HIDDEN, "test.mock.broker.rtt", _RK_C_INT, + _RK(mock.broker_rtt), "Simulated mock broker latency in milliseconds.", 0, + 60 * 60 * 1000 /*1h*/, 0}, /* Unit test interfaces. * These are not part of the public API and may change at any time. diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 829dd6279d..db87404a56 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -518,6 +518,7 @@ struct rd_kafka_conf_s { */ struct { int broker_cnt; /**< Number of mock brokers */ + int broker_rtt; /**< Broker RTT */ } mock; /* diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index 3265c9c3fb..394c9e487c 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -281,7 +281,7 @@ rd_kafka_mock_validate_records(rd_kafka_mock_partition_t *mpart, else mpidstate->lo = (mpidstate->lo + 1) % mpidstate->window; mpidstate->hi = (mpidstate->hi + 1) % mpidstate->window; - mpidstate->seq[mpidstate->hi] = BaseSequence + RecordCount; + mpidstate->seq[mpidstate->hi] = (int32_t)(BaseSequence + RecordCount); return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -304,6 +304,7 @@ rd_kafka_mock_partition_log_append(rd_kafka_mock_partition_t *mpart, rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int8_t MagicByte; int32_t RecordCount; + int16_t Attributes; rd_kafka_mock_msgset_t *mset; rd_bool_t is_dup = rd_false; @@ -322,10 +323,13 @@ rd_kafka_mock_partition_log_append(rd_kafka_mock_partition_t *mpart, rd_kafka_buf_peek_i32(rkbuf, RD_KAFKAP_MSGSET_V2_OF_RecordCount, &RecordCount); + rd_kafka_buf_peek_i16(rkbuf, RD_KAFKAP_MSGSET_V2_OF_Attributes, + &Attributes); if (RecordCount < 1 || - (size_t)RecordCount > RD_KAFKAP_BYTES_LEN(records) / - RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD) { + (!(Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) && + (size_t)RecordCount > RD_KAFKAP_BYTES_LEN(records) / + RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD)) { err = RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE; goto err; } @@ -1220,7 +1224,7 @@ rd_kafka_mock_connection_new(rd_kafka_mock_broker_t *mrkb, char errstr[128]; if (!mrkb->up) { - rd_close(fd); + rd_socket_close(fd); return NULL; } @@ -1231,7 +1235,7 @@ rd_kafka_mock_connection_new(rd_kafka_mock_broker_t *mrkb, "Failed to create transport for new " "mock connection: %s", errstr); - rd_close(fd); + rd_socket_close(fd); return NULL; } @@ -1264,7 +1268,7 @@ static void rd_kafka_mock_cluster_op_io(rd_kafka_mock_cluster_t *mcluster, void *opaque) { /* Read wake-up fd data and throw away, just used for wake-ups*/ char buf[1024]; - while (rd_read(fd, buf, sizeof(buf)) > 0) + while (rd_socket_read(fd, buf, sizeof(buf)) > 0) ; /* Read all buffered signalling bytes */ } @@ -1401,7 +1405,7 @@ static void rd_kafka_mock_broker_destroy(rd_kafka_mock_broker_t *mrkb) { rd_kafka_mock_broker_close_all(mrkb, "Destroying broker"); rd_kafka_mock_cluster_io_del(mrkb->cluster, mrkb->listen_s); - rd_close(mrkb->listen_s); + rd_socket_close(mrkb->listen_s); while ((errstack = TAILQ_FIRST(&mrkb->errstacks))) { TAILQ_REMOVE(&mrkb->errstacks, errstack, link); @@ -1442,7 +1446,7 @@ rd_kafka_mock_broker_new(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { "Failed to bind mock broker socket to %s: %s", rd_socket_strerror(rd_socket_errno), rd_sockaddr2str(&sin, RD_SOCKADDR2STR_F_PORT)); - rd_close(listen_s); + rd_socket_close(listen_s); return NULL; } @@ -1451,7 +1455,7 @@ rd_kafka_mock_broker_new(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", "Failed to get mock broker socket name: %s", rd_socket_strerror(rd_socket_errno)); - rd_close(listen_s); + rd_socket_close(listen_s); return NULL; } rd_assert(sin.sin_family == AF_INET); @@ -1460,7 +1464,7 @@ rd_kafka_mock_broker_new(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", "Failed to listen on mock broker socket: %s", rd_socket_strerror(rd_socket_errno)); - rd_close(listen_s); + rd_socket_close(listen_s); return NULL; } @@ -1980,6 +1984,82 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster, } +/** + * @brief Apply command to specific broker. + * + * @locality mcluster thread + */ +static rd_kafka_resp_err_t +rd_kafka_mock_broker_cmd(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_broker_t *mrkb, + rd_kafka_op_t *rko) { + switch (rko->rko_u.mock.cmd) { + case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: + mrkb->up = (rd_bool_t)rko->rko_u.mock.lo; + + if (!mrkb->up) + rd_kafka_mock_broker_close_all(mrkb, "Broker down"); + break; + + case RD_KAFKA_MOCK_CMD_BROKER_SET_RTT: + mrkb->rtt = (rd_ts_t)rko->rko_u.mock.lo * 1000; + + /* Check if there is anything to send now that the RTT + * has changed or if a timer is to be started. */ + rd_kafka_mock_broker_connections_write_out(mrkb); + break; + + case RD_KAFKA_MOCK_CMD_BROKER_SET_RACK: + if (mrkb->rack) + rd_free(mrkb->rack); + + if (rko->rko_u.mock.name) + mrkb->rack = rd_strdup(rko->rko_u.mock.name); + else + mrkb->rack = NULL; + break; + + default: + RD_BUG("Unhandled mock cmd %d", rko->rko_u.mock.cmd); + break; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Apply command to to one or all brokers, depending on the value of + * broker_id, where -1 means all, and != -1 means a specific broker. + * + * @locality mcluster thread + */ +static rd_kafka_resp_err_t +rd_kafka_mock_brokers_cmd(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_op_t *rko) { + rd_kafka_mock_broker_t *mrkb; + + if (rko->rko_u.mock.broker_id != -1) { + /* Specific broker */ + mrkb = rd_kafka_mock_broker_find(mcluster, + rko->rko_u.mock.broker_id); + if (!mrkb) + return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; + + return rd_kafka_mock_broker_cmd(mcluster, mrkb, rko); + } + + /* All brokers */ + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) { + rd_kafka_resp_err_t err; + + if ((err = rd_kafka_mock_broker_cmd(mcluster, mrkb, rko))) + return err; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + /** * @brief Handle command op @@ -2081,45 +2161,11 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, } break; + /* Broker commands */ case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: - mrkb = rd_kafka_mock_broker_find(mcluster, - rko->rko_u.mock.broker_id); - if (!mrkb) - return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; - - mrkb->up = (rd_bool_t)rko->rko_u.mock.lo; - - if (!mrkb->up) - rd_kafka_mock_broker_close_all(mrkb, "Broker down"); - break; - case RD_KAFKA_MOCK_CMD_BROKER_SET_RTT: - mrkb = rd_kafka_mock_broker_find(mcluster, - rko->rko_u.mock.broker_id); - if (!mrkb) - return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; - - mrkb->rtt = (rd_ts_t)rko->rko_u.mock.lo * 1000; - - /* Check if there is anything to send now that the RTT - * has changed or if a timer is to be started. */ - rd_kafka_mock_broker_connections_write_out(mrkb); - break; - case RD_KAFKA_MOCK_CMD_BROKER_SET_RACK: - mrkb = rd_kafka_mock_broker_find(mcluster, - rko->rko_u.mock.broker_id); - if (!mrkb) - return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; - - if (mrkb->rack) - rd_free(mrkb->rack); - - if (rko->rko_u.mock.name) - mrkb->rack = rd_strdup(rko->rko_u.mock.name); - else - mrkb->rack = NULL; - break; + return rd_kafka_mock_brokers_cmd(mcluster, rko); case RD_KAFKA_MOCK_CMD_COORD_SET: if (!rd_kafka_mock_coord_set(mcluster, rko->rko_u.mock.name, @@ -2235,8 +2281,8 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) { rd_free(mcluster->bootstraps); - rd_close(mcluster->wakeup_fds[0]); - rd_close(mcluster->wakeup_fds[1]); + rd_socket_close(mcluster->wakeup_fds[0]); + rd_socket_close(mcluster->wakeup_fds[1]); } diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 9e1f78488f..363d6bd8ae 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -264,6 +264,9 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, /** * @brief Disconnects the broker and disallows any new connections. * This does NOT trigger leader change. + * + * @param mcluster Mock cluster instance. + * @param broker_id Use -1 for all brokers, or >= 0 for a specific broker. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster, @@ -272,6 +275,9 @@ rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster, /** * @brief Makes the broker accept connections again. * This does NOT trigger leader change. + * + * @param mcluster Mock cluster instance. + * @param broker_id Use -1 for all brokers, or >= 0 for a specific broker. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_broker_set_up(rd_kafka_mock_cluster_t *mcluster, @@ -280,6 +286,9 @@ rd_kafka_mock_broker_set_up(rd_kafka_mock_cluster_t *mcluster, /** * @brief Set broker round-trip-time delay in milliseconds. + * + * @param mcluster Mock cluster instance. + * @param broker_id Use -1 for all brokers, or >= 0 for a specific broker. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_broker_set_rtt(rd_kafka_mock_cluster_t *mcluster, @@ -288,6 +297,9 @@ rd_kafka_mock_broker_set_rtt(rd_kafka_mock_cluster_t *mcluster, /** * @brief Sets the broker's rack as reported in Metadata to the client. + * + * @param mcluster Mock cluster instance. + * @param broker_id Use -1 for all brokers, or >= 0 for a specific broker. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_broker_set_rack(rd_kafka_mock_cluster_t *mcluster, diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index a7d2057eb5..6f7f0a6ffc 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -439,7 +439,8 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, while (PartitionCnt-- > 0) { int32_t Partition, CurrentLeaderEpoch; - int64_t Timestamp, MaxNumOffsets, Offset = -1; + int64_t Timestamp, Offset = -1; + int32_t MaxNumOffsets; rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c index 9bd2b8d31b..ee0e177379 100644 --- a/src/rdkafka_msg.c +++ b/src/rdkafka_msg.c @@ -776,7 +776,7 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt, continue; } } - rd_kafka_toppar_enq_msg(rktp, rkm); + rd_kafka_toppar_enq_msg(rktp, rkm, now); if (rd_kafka_is_transactional(rkt->rkt_rk)) { /* Add partition to transaction */ @@ -796,7 +796,7 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt, } else { /* Single destination partition. */ - rd_kafka_toppar_enq_msg(rktp, rkm); + rd_kafka_toppar_enq_msg(rktp, rkm, now); } rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; @@ -1244,7 +1244,7 @@ int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt, rkm->rkm_partition = partition; /* Partition is available: enqueue msg on partition's queue */ - rd_kafka_toppar_enq_msg(rktp_new, rkm); + rd_kafka_toppar_enq_msg(rktp_new, rkm, rd_clock()); if (do_lock) rd_kafka_topic_rdunlock(rkt); @@ -1667,6 +1667,155 @@ void rd_kafka_msgbatch_ready_produce(rd_kafka_msgbatch_t *rkmb) { } + +/** + * @brief Allow queue wakeups after \p abstime, or when the + * given \p batch_msg_cnt or \p batch_msg_bytes have been reached. + * + * @param rkmq Queue to monitor and set wakeup parameters on. + * @param dest_rkmq Destination queue used to meter current queue depths + * and oldest message. May be the same as \p rkmq but is + * typically the rktp_xmit_msgq. + * @param next_wakeup If non-NULL: update the caller's next scheduler wakeup + * according to the wakeup time calculated by this function. + * @param now The current time. + * @param linger_us The configured queue linger / batching time. + * @param batch_msg_cnt Queue threshold before signalling. + * @param batch_msg_bytes Queue threshold before signalling. + * + * @returns true if the wakeup conditions are already met and messages are ready + * to be sent, else false. + * + * @locks_required rd_kafka_toppar_lock() + * + * + * Producer queue and broker thread wake-up behaviour. + * + * There are contradicting requirements at play here: + * - Latency: queued messages must be batched and sent according to + * batch size and linger.ms configuration. + * - Wakeups: keep the number of thread wake-ups to a minimum to avoid + * high CPU utilization and context switching. + * + * The message queue (rd_kafka_msgq_t) has functionality for the writer (app) + * to wake up the reader (broker thread) when there's a new message added. + * This wakeup is done thru a combination of cndvar signalling and IO writes + * to make sure a thread wakeup is triggered regardless if the broker thread + * is blocking on cnd_timedwait() or on IO poll. + * When the broker thread is woken up it will scan all the partitions it is + * the leader for to check if there are messages to be sent - all according + * to the configured batch size and linger.ms - and then decide its next + * wait time depending on the lowest remaining linger.ms setting of any + * partition with messages enqueued. + * + * This wait time must also be set as a threshold on the message queue, telling + * the writer (app) that it must not trigger a wakeup until the wait time + * has expired, or the batch sizes have been exceeded. + * + * The message queue wakeup time is per partition, while the broker thread + * wakeup time is the lowest of all its partitions' wakeup times. + * + * The per-partition wakeup constraints are calculated and set by + * rd_kafka_msgq_allow_wakeup_at() which is called from the broker thread's + * per-partition handler. + * This function is called each time there are changes to the broker-local + * partition transmit queue (rktp_xmit_msgq), such as: + * - messages are moved from the partition queue (rktp_msgq) to rktp_xmit_msgq + * - messages are moved to a ProduceRequest + * - messages are timed out from the rktp_xmit_msgq + * - the flushing state changed (rd_kafka_flush() is called or returned). + * + * If none of these things happen, the broker thread will simply read the + * last stored wakeup time for each partition and use that for calculating its + * minimum wait time. + * + * + * On the writer side, namely the application calling rd_kafka_produce(), the + * followings checks are performed to see if it may trigger a wakeup when + * it adds a new message to the partition queue: + * - the current time has reached the wakeup time (e.g., remaining linger.ms + * has expired), or + * - with the new message(s) being added, either the batch.size or + * batch.num.messages thresholds have been exceeded, or + * - the application is calling rd_kafka_flush(), + * - and no wakeup has been signalled yet. This is critical since it may take + * some time for the broker thread to do its work we'll want to avoid + * flooding it with wakeups. So a wakeup is only sent once per + * wakeup period. + */ +rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq, + const rd_kafka_msgq_t *dest_rkmq, + rd_ts_t *next_wakeup, + rd_ts_t now, + rd_ts_t linger_us, + int32_t batch_msg_cnt, + int64_t batch_msg_bytes) { + int32_t msg_cnt = rd_kafka_msgq_len(dest_rkmq); + int64_t msg_bytes = rd_kafka_msgq_size(dest_rkmq); + + if (RD_KAFKA_MSGQ_EMPTY(dest_rkmq)) { + rkmq->rkmq_wakeup.on_first = rd_true; + rkmq->rkmq_wakeup.abstime = now + linger_us; + /* Leave next_wakeup untouched since the queue is empty */ + msg_cnt = 0; + msg_bytes = 0; + } else { + const rd_kafka_msg_t *rkm = rd_kafka_msgq_first(dest_rkmq); + + rkmq->rkmq_wakeup.on_first = rd_false; + + if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) { + /* Honour retry.backoff.ms: + * wait for backoff to expire */ + rkmq->rkmq_wakeup.abstime = + rkm->rkm_u.producer.ts_backoff; + } else { + /* Use message's produce() time + linger.ms */ + rkmq->rkmq_wakeup.abstime = + rd_kafka_msg_enq_time(rkm) + linger_us; + if (rkmq->rkmq_wakeup.abstime <= now) + rkmq->rkmq_wakeup.abstime = now; + } + + /* Update the caller's scheduler wakeup time */ + if (next_wakeup && rkmq->rkmq_wakeup.abstime < *next_wakeup) + *next_wakeup = rkmq->rkmq_wakeup.abstime; + + msg_cnt = rd_kafka_msgq_len(dest_rkmq); + msg_bytes = rd_kafka_msgq_size(dest_rkmq); + } + + /* + * If there are more messages or bytes in queue than the batch limits, + * or the linger time has been exceeded, + * then there is no need for wakeup since the broker thread will + * produce those messages as quickly as it can. + */ + if (msg_cnt >= batch_msg_cnt || msg_bytes >= batch_msg_bytes || + (msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) { + /* Prevent further signalling */ + rkmq->rkmq_wakeup.signalled = rd_true; + + /* Batch is ready */ + return rd_true; + } + + /* If the current msg or byte count is less than the batch limit + * then set the rkmq count to the remaining count or size to + * reach the batch limits. + * This is for the case where the producer is waiting for more + * messages to accumulate into a batch. The wakeup should only + * occur once a threshold is reached or the abstime has expired. + */ + rkmq->rkmq_wakeup.signalled = rd_false; + rkmq->rkmq_wakeup.msg_cnt = batch_msg_cnt - msg_cnt; + rkmq->rkmq_wakeup.msg_bytes = batch_msg_bytes - msg_bytes; + + return rd_false; +} + + + /** * @brief Verify order (by msgid) in message queue. * For development use only. diff --git a/src/rdkafka_msg.h b/src/rdkafka_msg.h index 3743dfba25..8546a819e2 100644 --- a/src/rdkafka_msg.h +++ b/src/rdkafka_msg.h @@ -194,6 +194,16 @@ typedef struct rd_kafka_msgq_s { struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ int32_t rkmq_msg_cnt; int64_t rkmq_msg_bytes; + struct { + rd_ts_t abstime; /**< Allow wake-ups after this point in time.*/ + int32_t msg_cnt; /**< Signal wake-up when this message count + * is reached. */ + int64_t msg_bytes; /**< .. or when this byte count is + * reached. */ + rd_bool_t on_first; /**< Wake-up on first message enqueued + * regardless of .abstime. */ + rd_bool_t signalled; /**< Wake-up (already) signalled. */ + } rkmq_wakeup; } rd_kafka_msgq_t; #define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \ @@ -383,6 +393,43 @@ rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t *rkmq) { } + +rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq, + const rd_kafka_msgq_t *dest_rkmq, + rd_ts_t *next_wakeup, + rd_ts_t now, + rd_ts_t linger_us, + int32_t batch_msg_cnt, + int64_t batch_msg_bytes); + +/** + * @returns true if msgq may be awoken. + */ + +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_msgq_may_wakeup(const rd_kafka_msgq_t *rkmq, rd_ts_t now) { + /* No: Wakeup already signalled */ + if (rkmq->rkmq_wakeup.signalled) + return rd_false; + + /* Yes: Wakeup linger time has expired */ + if (now >= rkmq->rkmq_wakeup.abstime) + return rd_true; + + /* Yes: First message enqueued may trigger wakeup */ + if (rkmq->rkmq_msg_cnt == 1 && rkmq->rkmq_wakeup.on_first) + return rd_true; + + /* Yes: batch.size or batch.num.messages exceeded */ + if (rkmq->rkmq_msg_cnt >= rkmq->rkmq_wakeup.msg_cnt || + rkmq->rkmq_msg_bytes > rkmq->rkmq_wakeup.msg_bytes) + return rd_true; + + /* No */ + return rd_false; +} + + /** * @brief Message ordering comparator using the message id * number to order messages in ascending order (FIFO). diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 28a199744f..02a4c02f85 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -539,7 +539,7 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) { struct { int64_t Offset; /* MessageSet header */ int32_t MessageSize; /* MessageSet header */ - uint32_t Crc; + int32_t Crc; int8_t MagicByte; /* MsgVersion */ int8_t Attributes; int64_t Timestamp; /* v1 */ @@ -603,7 +603,7 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) { calc_crc = rd_slice_crc32(&crc_slice); rd_dassert(rd_slice_remains(&crc_slice) == 0); - if (unlikely(hdr.Crc != calc_crc)) { + if (unlikely(hdr.Crc != (int32_t)calc_crc)) { /* Propagate CRC error to application and * continue with next message. */ rd_kafka_consumer_err( diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 944d6adb20..a0cb99d046 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -670,8 +670,9 @@ void rd_kafka_toppar_desired_del(rd_kafka_toppar_t *rktp) { /** * Append message at tail of 'rktp' message queue. */ -void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { - int queue_len; +void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, + rd_kafka_msg_t *rkm, + rd_ts_t now) { rd_kafka_q_t *wakeup_q = NULL; rd_kafka_toppar_lock(rktp); @@ -683,18 +684,22 @@ void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA || rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) { /* No need for enq_sorted(), this is the oldest message. */ - queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm); + rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm); } else { - queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, - &rktp->rktp_msgq, rkm); + rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm); } - if (unlikely(queue_len == 1 && (wakeup_q = rktp->rktp_msgq_wakeup_q))) + if (unlikely(rktp->rktp_partition != RD_KAFKA_PARTITION_UA && + rd_kafka_msgq_may_wakeup(&rktp->rktp_msgq, now) && + (wakeup_q = rktp->rktp_msgq_wakeup_q))) { + /* Wake-up broker thread */ + rktp->rktp_msgq.rkmq_wakeup.signalled = rd_true; rd_kafka_q_keep(wakeup_q); + } rd_kafka_toppar_unlock(rktp); - if (wakeup_q) { + if (unlikely(wakeup_q != NULL)) { rd_kafka_q_yield(wakeup_q); rd_kafka_q_destroy(wakeup_q); } diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 68e8cf296e..c51e666be4 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -449,7 +449,9 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt, void rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t *rktp); void rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t *rktp, int fetch_state); void rd_kafka_toppar_insert_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm); -void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm); +void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, + rd_kafka_msg_t *rkm, + rd_ts_t now); int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq, rd_kafka_msgq_t *srcq, int incr_retry, diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index 2356ade603..0d50f58703 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -322,8 +322,8 @@ static RD_INLINE RD_UNUSED void rd_kafka_q_io_event(rd_kafka_q_t *rkq) { /* Write wake-up event to socket. * Ignore errors, not much to do anyway. */ - if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, - (int)rkq->rkq_qio->size) == -1) + if (rd_socket_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, + (int)rkq->rkq_qio->size) == -1) ; } diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c index 732d1d3461..d848ad7410 100644 --- a/src/rdkafka_transport.c +++ b/src/rdkafka_transport.c @@ -72,7 +72,7 @@ static void rd_kafka_transport_close0(rd_kafka_t *rk, rd_socket_t s) { if (rk->rk_conf.closesocket_cb) rk->rk_conf.closesocket_cb((int)s, rk->rk_conf.opaque); else - rd_close(s); + rd_socket_close(s); } /** @@ -1240,13 +1240,11 @@ static int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) { if (r <= 0) return r; - rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1); - if (rktrans->rktrans_pfd[1].revents & POLLIN) { /* Read wake-up fd data and throw away, just used for wake-ups*/ char buf[1024]; - while (rd_read((int)rktrans->rktrans_pfd[1].fd, buf, - sizeof(buf)) > 0) + while (rd_socket_read((int)rktrans->rktrans_pfd[1].fd, buf, + sizeof(buf)) > 0) ; /* Read all buffered signalling bytes */ } diff --git a/src/rdposix.h b/src/rdposix.h index deb1fe009f..7b2376823f 100644 --- a/src/rdposix.h +++ b/src/rdposix.h @@ -238,9 +238,13 @@ static RD_UNUSED int rd_pipe_nonblocking(rd_socket_t *fds) { #endif return 0; } -#define rd_pipe(fds) pipe(fds) -#define rd_read(fd, buf, sz) read(fd, buf, sz) -#define rd_write(fd, buf, sz) write(fd, buf, sz) -#define rd_close(fd) close(fd) +#define rd_socket_read(fd, buf, sz) read(fd, buf, sz) +#define rd_socket_write(fd, buf, sz) write(fd, buf, sz) +#define rd_socket_close(fd) close(fd) + +/* File IO */ +#define rd_write(fd, buf, sz) write(fd, buf, sz) +#define rd_open(path, flags, mode) open(path, flags, mode) +#define rd_close(fd) close(fd) #endif /* _RDPOSIX_H_ */ diff --git a/src/rdwin32.h b/src/rdwin32.h index 8ca0887f60..73edd41d6a 100644 --- a/src/rdwin32.h +++ b/src/rdwin32.h @@ -367,9 +367,15 @@ static RD_UNUSED int rd_pipe_nonblocking(rd_socket_t *fds) { return -1; } -#define rd_read(fd, buf, sz) recv(fd, buf, sz, 0) -#define rd_write(fd, buf, sz) send(fd, buf, sz, 0) -#define rd_close(fd) closesocket(fd) +/* Socket IO */ +#define rd_socket_read(fd, buf, sz) recv(fd, buf, sz, 0) +#define rd_socket_write(fd, buf, sz) send(fd, buf, sz, 0) +#define rd_socket_close(fd) closesocket(fd) + +/* File IO */ +#define rd_write(fd, buf, sz) _write(fd, buf, sz) +#define rd_open(path, flags, mode) _open(path, flags, mode) +#define rd_close(fd) _close(fd) #endif /* !__cplusplus*/ diff --git a/tests/0055-producer_latency.c b/tests/0055-producer_latency.c index 2759e098f9..e0244cec95 100644 --- a/tests/0055-producer_latency.c +++ b/tests/0055-producer_latency.c @@ -43,11 +43,14 @@ struct latconf { char linger_ms_conf[32]; /**< Read back to show actual value */ /* Result vector */ + rd_bool_t passed; float latency[_MSG_COUNT]; float sum; int cnt; + int wakeups; }; +static int tot_wakeups = 0; static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { @@ -76,6 +79,46 @@ dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { } +/** + * @brief A stats callback to get the per-broker wakeup counts. + * + * The JSON "parsing" here is crude.. + */ +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { + const char *t = json; + int cnt = 0; + int total = 0; + + /* Since we're only producing to one partition there will only be + * one broker, the leader, who's wakeup counts we're interested in, but + * we also want to know that other broker threads aren't spinning + * like crazy. So just summarize all the wakeups from all brokers. */ + while ((t = strstr(t, "\"wakeups\":"))) { + int wakeups; + const char *next; + + t += strlen("\"wakeups\":"); + while (isspace((int)*t)) + t++; + wakeups = strtol(t, (char **)&next, 0); + + TEST_ASSERT(t != next, "No wakeup number found at \"%.*s...\"", + 16, t); + + total += wakeups; + cnt++; + + t = next; + } + + TEST_ASSERT(cnt > 0, "No brokers found in stats"); + + tot_wakeups = total; + + return 0; +} + + static int verify_latency(struct latconf *latconf) { float avg; int fails = 0; @@ -86,8 +129,11 @@ static int verify_latency(struct latconf *latconf) { avg = latconf->sum / (float)latconf->cnt; - TEST_SAY("%s: average latency %.3fms, allowed range %d..%d +%.0fms\n", - latconf->name, avg, latconf->min, latconf->max, ext_overhead); + TEST_SAY( + "%s: average latency %.3fms, allowed range %d..%d +%.0fms, " + "%d wakeups\n", + latconf->name, avg, latconf->min, latconf->max, ext_overhead, + tot_wakeups); if (avg < (float)latconf->min || avg > (float)latconf->max + ext_overhead) { @@ -99,6 +145,16 @@ static int verify_latency(struct latconf *latconf) { fails++; } + latconf->wakeups = tot_wakeups; + if (latconf->wakeups < 10 || latconf->wakeups > 1000) { + TEST_FAIL_LATER( + "%s: broker wakeups out of range: %d, " + "expected 10..1000", + latconf->name, latconf->wakeups); + fails++; + } + + return fails; } @@ -116,23 +172,32 @@ static void measure_rtt(struct latconf *latconf, rd_kafka_t *rk) { rd_kafka_metadata_destroy(md); } -static int test_producer_latency(const char *topic, struct latconf *latconf) { + + +static void test_producer_latency(const char *topic, struct latconf *latconf) { rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_resp_err_t err; int i; size_t sz; + rd_bool_t with_transactions = rd_false; + + SUB_TEST("%s (linger.ms=%d)", latconf->name); test_conf_init(&conf, NULL, 60); rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); rd_kafka_conf_set_opaque(conf, latconf); + rd_kafka_conf_set_stats_cb(conf, stats_cb); + test_conf_set(conf, "statistics.interval.ms", "100"); + tot_wakeups = 0; - TEST_SAY(_C_BLU "[%s: begin]\n" _C_CLR, latconf->name); for (i = 0; latconf->conf[i]; i += 2) { TEST_SAY("%s: set conf %s = %s\n", latconf->name, latconf->conf[i], latconf->conf[i + 1]); test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]); + if (!strcmp(latconf->conf[i], "transactional.id")) + with_transactions = rd_true; } sz = sizeof(latconf->linger_ms_conf); @@ -140,6 +205,11 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) { rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + if (with_transactions) { + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 10 * 1000)); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + } + TEST_SAY("%s: priming producer\n", latconf->name); /* Send a priming message to make sure everything is up * and functional before starting measurements */ @@ -151,8 +221,12 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) { TEST_FAIL("%s: priming producev failed: %s", latconf->name, rd_kafka_err2str(err)); - /* Await delivery */ - rd_kafka_flush(rk, tmout_multip(5000)); + if (with_transactions) { + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + } else { + /* Await delivery */ + rd_kafka_flush(rk, tmout_multip(5000)); + } /* Get a network+broker round-trip-time base time. */ measure_rtt(latconf, rk); @@ -160,6 +234,10 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) { TEST_SAY("%s: producing %d messages\n", latconf->name, _MSG_COUNT); for (i = 0; i < _MSG_COUNT; i++) { int64_t *ts_send; + int pre_cnt = latconf->cnt; + + if (with_transactions) + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); ts_send = malloc(sizeof(*ts_send)); *ts_send = test_clock(); @@ -174,12 +252,31 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) { i, rd_kafka_err2str(err)); /* Await delivery */ - rd_kafka_poll(rk, 5000); + while (latconf->cnt == pre_cnt) + rd_kafka_poll(rk, 5000); + + if (with_transactions) { + test_timing_t timing; + TIMING_START(&timing, "commit_transaction"); + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + TIMING_ASSERT_LATER(&timing, 0, + (int)(latconf->rtt + 50.0)); + } } + while (tot_wakeups == 0) + rd_kafka_poll(rk, 100); /* Get final stats_cb */ + rd_kafka_destroy(rk); - return verify_latency(latconf); + if (verify_latency(latconf)) + return; /* verify_latency() has already + * called TEST_FAIL_LATER() */ + + + latconf->passed = rd_true; + + SUB_TEST_PASS(); } @@ -206,33 +303,37 @@ static float find_max(const struct latconf *latconf) { } int main_0055_producer_latency(int argc, char **argv) { + const char *topic = test_mk_topic_name("0055_producer_latency", 1); struct latconf latconfs[] = { {"standard settings", {NULL}, 5, 5}, /* default is now 5ms */ - {"low queue.buffering.max.ms", - {"queue.buffering.max.ms", "0", NULL}, - 0, - 0}, - {"microsecond queue.buffering.max.ms", - {"queue.buffering.max.ms", "0.001", NULL}, + {"low linger.ms (0ms)", {"linger.ms", "0", NULL}, 0, 0}, + {"microsecond linger.ms (0.001ms)", + {"linger.ms", "0.001", NULL}, 0, 1}, - {"high queue.buffering.max.ms", - {"queue.buffering.max.ms", "3000", NULL}, + {"high linger.ms (3000ms)", + {"linger.ms", "3000", NULL}, 3000, 3100}, - {"queue.buffering.max.ms < 1000", /* internal block_max_ms */ - {"queue.buffering.max.ms", "500", NULL}, + {"linger.ms < 1000 (500ms)", /* internal block_max_ms */ + {"linger.ms", "500", NULL}, 500, 600}, - {"no acks", - {"queue.buffering.max.ms", "0", "acks", "0", "enable.idempotence", - "false", NULL}, + {"no acks (0ms)", + {"linger.ms", "0", "acks", "0", "enable.idempotence", "false", + NULL}, 0, 0}, + {"idempotence (10ms)", + {"linger.ms", "10", "enable.idempotence", "true", NULL}, + 10, + 10}, + {"transactions (35ms)", + {"linger.ms", "35", "transactional.id", topic, NULL}, + 35, + 50 + 35 /* extra time for AddPartitions..*/}, {NULL}}; struct latconf *latconf; - const char *topic = test_mk_topic_name("0055_producer_latency", 0); - int fails = 0; if (test_on_ci) { TEST_SKIP("Latency measurements not reliable on CI\n"); @@ -240,24 +341,26 @@ int main_0055_producer_latency(int argc, char **argv) { } /* Create topic without replicas to keep broker-side latency down */ - test_create_topic(NULL, topic, 4, 1); + test_create_topic(NULL, topic, 1, 1); for (latconf = latconfs; latconf->name; latconf++) - fails += test_producer_latency(topic, latconf); - - if (fails) - TEST_FAIL("See %d previous failure(s)", fails); + test_producer_latency(topic, latconf); TEST_SAY(_C_YEL "Latency tests summary:\n" _C_CLR); - TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s\n", "Name", + TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s %8s\n", "Name", "linger.ms", "MinExp", "MaxExp", "RTT", "Min", "Average", - "Max"); + "Max", "Wakeups"); for (latconf = latconfs; latconf->name; latconf++) - TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g\n", + TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g %8d%s\n", latconf->name, latconf->linger_ms_conf, latconf->min, latconf->max, latconf->rtt, find_min(latconf), - latconf->sum / latconf->cnt, find_max(latconf)); + latconf->sum / latconf->cnt, find_max(latconf), + latconf->wakeups, + latconf->passed ? "" : _C_RED " FAILED"); + + + TEST_LATER_CHECK(""); return 0; } diff --git a/tests/0101-fetch-from-follower.cpp b/tests/0101-fetch-from-follower.cpp index 0168ac55d3..cc68530011 100644 --- a/tests/0101-fetch-from-follower.cpp +++ b/tests/0101-fetch-from-follower.cpp @@ -303,6 +303,7 @@ static void do_fff_test(void) { if (get_broker_rack_count(replica_ids) != 3) { Test::Skip("unexpected broker.rack configuration: skipping test.\n"); + return; } /* arrange for the consumer's client.rack to align with a broker that is not diff --git a/tests/test.c b/tests/test.c index 7469a8cc29..38a5440502 100644 --- a/tests/test.c +++ b/tests/test.c @@ -5186,7 +5186,9 @@ void test_report_add(struct test *test, const char *fmt, ...) { * If \p skip is set TEST_SKIP() will be called with a helpful message. */ int test_can_create_topics(int skip) { +#ifndef _WIN32 const char *bootstrap; +#endif /* Has AdminAPI */ if (test_broker_version >= TEST_BRKVER(0, 10, 2, 0)) @@ -6689,9 +6691,6 @@ int test_sub_start(const char *func, if (!is_quick && test_quick) return 0; - if (subtests_to_run && !strstr(func, subtests_to_run)) - return 0; - if (fmt && *fmt) { va_list ap; char buf[256]; @@ -6707,6 +6706,11 @@ int test_sub_start(const char *func, "%s:%d", func, line); } + if (subtests_to_run && !strstr(test_curr->subtest, subtests_to_run)) { + *test_curr->subtest = '\0'; + return 0; + } + TIMING_START(&test_curr->subtest_duration, "SUBTEST"); TEST_SAY(_C_MAG "[ %s ]\n", test_curr->subtest); diff --git a/tests/testshared.h b/tests/testshared.h index b54af26c1c..efdd5d5550 100644 --- a/tests/testshared.h +++ b/tests/testshared.h @@ -364,7 +364,7 @@ int test_sub_start(const char *func, const char *fmt, ...); void test_sub_pass(void); -void test_sub_skip(const char *fmt, ...); +void test_sub_skip(const char *fmt, ...) RD_FORMAT(printf, 1, 2); #define SUB_TEST0(IS_QUICK, ...) \ do { \ diff --git a/vcpkg.json b/vcpkg.json index 4bd2a0eff4..f2953d0dfd 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -8,7 +8,7 @@ }, { "name": "zlib", - "version>=": "1.2.11#13" + "version>=": "1.2.12" }, { "name": "openssl", @@ -19,5 +19,5 @@ "version>=": "7.82.0" } ], - "builtin-baseline": "773516ecf6014d89cc69b11bb54605ad4be56694" + "builtin-baseline": "01d6f6ff1e5332b926099f0c23bda996940ad4e8" }