diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 4bd6c4d2d2..c13685e401 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -4930,78 +4930,70 @@ static void do_test_UserScramCredentials(const char *what, SUB_TEST_PASS(); } -static void do_test_ListOffsets(rd_kafka_t *rk, rd_kafka_queue_t *queue) { +static void do_test_ListOffsets(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int req_timeout_ms) { char errstr[512]; - char *topicname = "non-existent"; - char *message = "Message"; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + char *message = "Message"; rd_kafka_AdminOptions_t *options; rd_kafka_event_t *event; + rd_kafka_queue_t *q; + rd_kafka_t *p; + size_t i = 0; + rd_kafka_topic_partition_list_t *topic_partitions; int64_t basetimestamp = 10000000; - int64_t t1 = basetimestamp + 100; - int64_t t2 = basetimestamp + 400; - int64_t t3 = basetimestamp + 250; - - rd_kafka_NewTopic_t *topic[1]; - topic[0] = - rd_kafka_NewTopic_new(topicname, 1, 1, errstr, sizeof(errstr)); - rd_kafka_CreateTopics(rk, topic, 1, NULL, queue); - rd_kafka_NewTopic_destroy_array(topic, 1); - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); - rd_kafka_event_destroy(event); - SUB_TEST(); - rd_kafka_producev( - /* Producer handle */ - rk, - /* Topic name */ - RD_KAFKA_V_TOPIC(topicname), - /* Make a copy of the payload. */ - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - /* Message value and length */ - RD_KAFKA_V_VALUE(message, strlen(message)), - - RD_KAFKA_V_TIMESTAMP(t1), - /* Per-Message opaque, provided in - * delivery report callback as - * msg_opaque. */ - RD_KAFKA_V_OPAQUE(NULL), - /* End sentinel */ - RD_KAFKA_V_END); - rd_kafka_producev( - /* Producer handle */ - rk, - /* Topic name */ - RD_KAFKA_V_TOPIC(topicname), - /* Make a copy of the payload. */ - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - /* Message value and length */ - RD_KAFKA_V_VALUE(message, strlen(message)), - - RD_KAFKA_V_TIMESTAMP(t2), - /* Per-Message opaque, provided in - * delivery report callback as - * msg_opaque. */ - RD_KAFKA_V_OPAQUE(NULL), - /* End sentinel */ - RD_KAFKA_V_END); - rd_kafka_producev( - /* Producer handle */ - rk, - /* Topic name */ - RD_KAFKA_V_TOPIC(topicname), - /* Make a copy of the payload. */ - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - /* Message value and length */ - RD_KAFKA_V_VALUE(message, strlen(message)), - - RD_KAFKA_V_TIMESTAMP(t3), - /* Per-Message opaque, provided in - * delivery report callback as - * msg_opaque. */ - RD_KAFKA_V_OPAQUE(NULL), - /* End sentinel */ - RD_KAFKA_V_END); - rd_kafka_flush(rk, 20 * 1000); + int64_t timestamps[] = { + basetimestamp + 100, + basetimestamp + 400, + basetimestamp + 250, + }; + struct test_fixture_s { + int64_t query; + int64_t expected; + } test_fixtures[] = { + {.query = RD_KAFKA_OFFSET_SPEC_EARLIEST, .expected = 0}, + {.query = RD_KAFKA_OFFSET_SPEC_LATEST, .expected = 3}, + {.query = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP, .expected = 1}, + {.query = basetimestamp + 50, .expected = 0}, + {.query = basetimestamp + 300, .expected = 1}, + {.query = basetimestamp + 150, .expected = 1}, + }; + + SUB_TEST_QUICK( + "%s ListOffsets with %s, " + "request_timeout %d", + rd_kafka_name(rk), what, req_timeout_ms); + + q = useq ? useq : rd_kafka_queue_new(rk); + + test_CreateTopics_simple(rk, NULL, (char **)&topic, 1, 1, NULL); + + p = test_create_producer(); + for (i = 0; i < RD_ARRAY_SIZE(timestamps); i++) { + rd_kafka_producev( + /* Producer handle */ + p, + /* Topic name */ + RD_KAFKA_V_TOPIC(topic), + /* Make a copy of the payload. */ + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + /* Message value and length */ + RD_KAFKA_V_VALUE(message, strlen(message)), + + RD_KAFKA_V_TIMESTAMP(timestamps[i]), + /* Per-Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + RD_KAFKA_V_OPAQUE(NULL), + /* End sentinel */ + RD_KAFKA_V_END); + } + + rd_kafka_flush(p, 20 * 1000); + rd_kafka_destroy(p); + /* Set timeout (optional) */ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTOFFSETS); @@ -5011,107 +5003,80 @@ static void do_test_ListOffsets(rd_kafka_t *rk, rd_kafka_queue_t *queue) { TEST_CALL_ERROR__(rd_kafka_AdminOptions_set_isolation_level( options, RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED)); - rd_kafka_topic_partition_list_t *topic_partitions; topic_partitions = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_t *topic_partition = - rd_kafka_topic_partition_list_add(topic_partitions, topicname, 0); - /* Set OffsetSpec to EARLIEST */ - topic_partition->offset = RD_KAFKA_OFFSET_SPEC_EARLIEST; - - /* Call ListOffsets */ - rd_kafka_ListOffsets( - rk, rd_kafka_topic_partition_list_copy(topic_partitions), options, - queue); - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); - if (!event || rd_kafka_event_error(event)) { - TEST_FAIL("Event Failed."); - } else { - const rd_kafka_ListOffsets_result_t *result; - const rd_kafka_ListOffsetsResultInfo_t **result_infos; - size_t cnt; - size_t i; - result = rd_kafka_event_ListOffsets_result(event); - result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); - for (i = 0; i < cnt; i++) { - const rd_kafka_topic_partition_t *topic_partition = - rd_kafka_ListOffsetsResultInfo_topic_partition( - result_infos[i]); - TEST_ASSERT((topic_partition->err == 0) && - (topic_partition->offset == 0), - "Offset should be 0."); - } - } - - rd_kafka_event_destroy(event); - - topic_partition->offset = RD_KAFKA_OFFSET_SPEC_LATEST; - /* Call ListOffsets */ - rd_kafka_ListOffsets( - rk, rd_kafka_topic_partition_list_copy(topic_partitions), options, - queue); - - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); - if (!event || rd_kafka_event_error(event)) { - TEST_FAIL("Event Failed."); - } else { - const rd_kafka_ListOffsets_result_t *result; - const rd_kafka_ListOffsetsResultInfo_t **result_infos; - size_t cnt; - size_t i; - result = rd_kafka_event_ListOffsets_result(event); - result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); - for (i = 0; i < cnt; i++) { - const rd_kafka_topic_partition_t *topic_partition = - rd_kafka_ListOffsetsResultInfo_topic_partition( - result_infos[i]); - TEST_ASSERT((topic_partition->err == 0) && - (topic_partition->offset == 3), - "Offset should be 3."); + rd_kafka_topic_partition_list_add(topic_partitions, topic, 0); + + for (i = 0; i < RD_ARRAY_SIZE(test_fixtures); i++) { + rd_bool_t retry = rd_true; + + struct test_fixture_s test_fixture = test_fixtures[i]; + + rd_kafka_topic_partition_list_t *topic_partitions_copy = + rd_kafka_topic_partition_list_copy(topic_partitions); + + /* Set OffsetSpec */ + topic_partitions_copy->elems[0].offset = test_fixture.query; + + while (retry) { + rd_kafka_resp_err_t err; + /* Call ListOffsets */ + rd_kafka_ListOffsets(rk, topic_partitions_copy, options, + q); + /* Wait for results */ + event = rd_kafka_queue_poll(q, -1 /*indefinitely*/); + if (!event) + TEST_FAIL("Event missing"); + + err = rd_kafka_event_error(event); + if (err == RD_KAFKA_RESP_ERR__NOENT) { + /* Still looking for the leader */ + rd_usleep(100000, 0); + } else if (err) { + TEST_FAIL("Failed with error: %s", + rd_kafka_err2name(err)); + } else { + const rd_kafka_ListOffsets_result_t *result; + const rd_kafka_ListOffsetsResultInfo_t * + *result_infos; + size_t cnt; + size_t j; + result = + rd_kafka_event_ListOffsets_result(event); + result_infos = + rd_kafka_ListOffsets_result_infos(result, + &cnt); + for (j = 0; j < cnt; j++) { + const rd_kafka_topic_partition_t + *topic_partition = + rd_kafka_ListOffsetsResultInfo_topic_partition( + result_infos[j]); + TEST_ASSERT( + topic_partition->err == 0, + "Expected error NO_ERROR, got %s", + rd_kafka_err2name( + topic_partition->err)); + TEST_ASSERT(topic_partition->offset == + test_fixture.expected, + "Expected offset %" PRId64 + ", got %" PRId64, + test_fixture.expected, + topic_partition->offset); + } + retry = rd_false; + } + rd_kafka_event_destroy(event); } + rd_kafka_topic_partition_list_destroy(topic_partitions_copy); } - rd_kafka_event_destroy(event); - topic_partition->offset = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP; - - /* Call ListOffsets */ - rd_kafka_ListOffsets( - rk, rd_kafka_topic_partition_list_copy(topic_partitions), options, - queue); - - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); - - if (!event || rd_kafka_event_error(event)) { - TEST_FAIL("Event Failed."); - } else { - const rd_kafka_ListOffsets_result_t *result; - const rd_kafka_ListOffsetsResultInfo_t **result_infos; - size_t cnt; - size_t i; - result = rd_kafka_event_ListOffsets_result(event); - result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); - for (i = 0; i < cnt; i++) { - const rd_kafka_topic_partition_t *topic_partition = - rd_kafka_ListOffsetsResultInfo_topic_partition( - result_infos[i]); - TEST_ASSERT((topic_partition->err == 0) && - (topic_partition->offset == 1), - "Offset should be 1."); - } - } - rd_kafka_event_destroy(event); rd_kafka_AdminOptions_destroy(options); rd_kafka_topic_partition_list_destroy(topic_partitions); - rd_kafka_DeleteTopic_t *del_topics[1]; - del_topics[0] = rd_kafka_DeleteTopic_new(topicname); - rd_kafka_DeleteTopics(rk, del_topics, 1, NULL, queue); - rd_kafka_DeleteTopic_destroy_array(del_topics, 1); - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/); - rd_kafka_event_destroy(event); + test_DeleteTopics_simple(rk, NULL, (char **)&topic, 1, NULL); + + if (!useq) + rd_kafka_queue_destroy(q); + SUB_TEST_PASS(); } @@ -5133,12 +5098,11 @@ static void do_test_apis(rd_kafka_type_t cltype) { test_conf_init(&conf, NULL, 180); test_conf_set(conf, "socket.timeout.ms", "10000"); + rk = test_create_handle(cltype, conf); mainq = rd_kafka_queue_get_main(rk); - /* ListOffsets */ - do_test_ListOffsets(rk, mainq); /* Create topics */ do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); do_test_CreateTopics("temp queue, op timeout 15000", rk, NULL, 15000, @@ -5236,6 +5200,10 @@ static void do_test_apis(rd_kafka_type_t cltype) { } if (test_broker_version >= TEST_BRKVER(2, 5, 0, 0)) { + /* ListOffsets */ + do_test_ListOffsets("temp queue", rk, NULL, -1); + do_test_ListOffsets("main queue", rk, mainq, 1500); + /* Alter committed offsets */ do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1, rd_false, rd_true);