Skip to content

Commit

Permalink
Test refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored and mahajanadhitya committed Oct 11, 2023
1 parent 9f3de7f commit fcc4a3d
Showing 1 changed file with 132 additions and 164 deletions.
296 changes: 132 additions & 164 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4930,78 +4930,70 @@ static void do_test_UserScramCredentials(const char *what,
SUB_TEST_PASS();
}

static void do_test_ListOffsets(rd_kafka_t *rk, rd_kafka_queue_t *queue) {
static void do_test_ListOffsets(const char *what,
rd_kafka_t *rk,
rd_kafka_queue_t *useq,
int req_timeout_ms) {
char errstr[512];
char *topicname = "non-existent";
char *message = "Message";
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
char *message = "Message";
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event;
rd_kafka_queue_t *q;
rd_kafka_t *p;
size_t i = 0;
rd_kafka_topic_partition_list_t *topic_partitions;
int64_t basetimestamp = 10000000;
int64_t t1 = basetimestamp + 100;
int64_t t2 = basetimestamp + 400;
int64_t t3 = basetimestamp + 250;

rd_kafka_NewTopic_t *topic[1];
topic[0] =
rd_kafka_NewTopic_new(topicname, 1, 1, errstr, sizeof(errstr));
rd_kafka_CreateTopics(rk, topic, 1, NULL, queue);
rd_kafka_NewTopic_destroy_array(topic, 1);
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
rd_kafka_event_destroy(event);
SUB_TEST();
rd_kafka_producev(
/* Producer handle */
rk,
/* Topic name */
RD_KAFKA_V_TOPIC(topicname),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(message, strlen(message)),

RD_KAFKA_V_TIMESTAMP(t1),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
rd_kafka_producev(
/* Producer handle */
rk,
/* Topic name */
RD_KAFKA_V_TOPIC(topicname),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(message, strlen(message)),

RD_KAFKA_V_TIMESTAMP(t2),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
rd_kafka_producev(
/* Producer handle */
rk,
/* Topic name */
RD_KAFKA_V_TOPIC(topicname),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(message, strlen(message)),

RD_KAFKA_V_TIMESTAMP(t3),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
rd_kafka_flush(rk, 20 * 1000);
int64_t timestamps[] = {
basetimestamp + 100,
basetimestamp + 400,
basetimestamp + 250,
};
struct test_fixture_s {
int64_t query;
int64_t expected;
} test_fixtures[] = {
{.query = RD_KAFKA_OFFSET_SPEC_EARLIEST, .expected = 0},
{.query = RD_KAFKA_OFFSET_SPEC_LATEST, .expected = 3},
{.query = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP, .expected = 1},
{.query = basetimestamp + 50, .expected = 0},
{.query = basetimestamp + 300, .expected = 1},
{.query = basetimestamp + 150, .expected = 1},
};

SUB_TEST_QUICK(
"%s ListOffsets with %s, "
"request_timeout %d",
rd_kafka_name(rk), what, req_timeout_ms);

q = useq ? useq : rd_kafka_queue_new(rk);

test_CreateTopics_simple(rk, NULL, (char **)&topic, 1, 1, NULL);

p = test_create_producer();
for (i = 0; i < RD_ARRAY_SIZE(timestamps); i++) {
rd_kafka_producev(
/* Producer handle */
p,
/* Topic name */
RD_KAFKA_V_TOPIC(topic),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(message, strlen(message)),

RD_KAFKA_V_TIMESTAMP(timestamps[i]),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
}

rd_kafka_flush(p, 20 * 1000);
rd_kafka_destroy(p);

/* Set timeout (optional) */
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTOFFSETS);

Expand All @@ -5011,107 +5003,80 @@ static void do_test_ListOffsets(rd_kafka_t *rk, rd_kafka_queue_t *queue) {
TEST_CALL_ERROR__(rd_kafka_AdminOptions_set_isolation_level(
options, RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED));

rd_kafka_topic_partition_list_t *topic_partitions;
topic_partitions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_t *topic_partition =
rd_kafka_topic_partition_list_add(topic_partitions, topicname, 0);
/* Set OffsetSpec to EARLIEST */
topic_partition->offset = RD_KAFKA_OFFSET_SPEC_EARLIEST;

/* Call ListOffsets */
rd_kafka_ListOffsets(
rk, rd_kafka_topic_partition_list_copy(topic_partitions), options,
queue);
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
if (!event || rd_kafka_event_error(event)) {
TEST_FAIL("Event Failed.");
} else {
const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
size_t cnt;
size_t i;
result = rd_kafka_event_ListOffsets_result(event);
result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);
for (i = 0; i < cnt; i++) {
const rd_kafka_topic_partition_t *topic_partition =
rd_kafka_ListOffsetsResultInfo_topic_partition(
result_infos[i]);
TEST_ASSERT((topic_partition->err == 0) &&
(topic_partition->offset == 0),
"Offset should be 0.");
}
}

rd_kafka_event_destroy(event);

topic_partition->offset = RD_KAFKA_OFFSET_SPEC_LATEST;
/* Call ListOffsets */
rd_kafka_ListOffsets(
rk, rd_kafka_topic_partition_list_copy(topic_partitions), options,
queue);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
if (!event || rd_kafka_event_error(event)) {
TEST_FAIL("Event Failed.");
} else {
const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
size_t cnt;
size_t i;
result = rd_kafka_event_ListOffsets_result(event);
result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);
for (i = 0; i < cnt; i++) {
const rd_kafka_topic_partition_t *topic_partition =
rd_kafka_ListOffsetsResultInfo_topic_partition(
result_infos[i]);
TEST_ASSERT((topic_partition->err == 0) &&
(topic_partition->offset == 3),
"Offset should be 3.");
rd_kafka_topic_partition_list_add(topic_partitions, topic, 0);

for (i = 0; i < RD_ARRAY_SIZE(test_fixtures); i++) {
rd_bool_t retry = rd_true;

struct test_fixture_s test_fixture = test_fixtures[i];

rd_kafka_topic_partition_list_t *topic_partitions_copy =
rd_kafka_topic_partition_list_copy(topic_partitions);

/* Set OffsetSpec */
topic_partitions_copy->elems[0].offset = test_fixture.query;

while (retry) {
rd_kafka_resp_err_t err;
/* Call ListOffsets */
rd_kafka_ListOffsets(rk, topic_partitions_copy, options,
q);
/* Wait for results */
event = rd_kafka_queue_poll(q, -1 /*indefinitely*/);
if (!event)
TEST_FAIL("Event missing");

err = rd_kafka_event_error(event);
if (err == RD_KAFKA_RESP_ERR__NOENT) {
/* Still looking for the leader */
rd_usleep(100000, 0);
} else if (err) {
TEST_FAIL("Failed with error: %s",
rd_kafka_err2name(err));
} else {
const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t *
*result_infos;
size_t cnt;
size_t j;
result =
rd_kafka_event_ListOffsets_result(event);
result_infos =
rd_kafka_ListOffsets_result_infos(result,
&cnt);
for (j = 0; j < cnt; j++) {
const rd_kafka_topic_partition_t
*topic_partition =
rd_kafka_ListOffsetsResultInfo_topic_partition(
result_infos[j]);
TEST_ASSERT(
topic_partition->err == 0,
"Expected error NO_ERROR, got %s",
rd_kafka_err2name(
topic_partition->err));
TEST_ASSERT(topic_partition->offset ==
test_fixture.expected,
"Expected offset %" PRId64
", got %" PRId64,
test_fixture.expected,
topic_partition->offset);
}
retry = rd_false;
}
rd_kafka_event_destroy(event);
}
rd_kafka_topic_partition_list_destroy(topic_partitions_copy);
}
rd_kafka_event_destroy(event);

topic_partition->offset = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP;

/* Call ListOffsets */
rd_kafka_ListOffsets(
rk, rd_kafka_topic_partition_list_copy(topic_partitions), options,
queue);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);

if (!event || rd_kafka_event_error(event)) {
TEST_FAIL("Event Failed.");
} else {
const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
size_t cnt;
size_t i;
result = rd_kafka_event_ListOffsets_result(event);
result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);
for (i = 0; i < cnt; i++) {
const rd_kafka_topic_partition_t *topic_partition =
rd_kafka_ListOffsetsResultInfo_topic_partition(
result_infos[i]);
TEST_ASSERT((topic_partition->err == 0) &&
(topic_partition->offset == 1),
"Offset should be 1.");
}
}
rd_kafka_event_destroy(event);
rd_kafka_AdminOptions_destroy(options);
rd_kafka_topic_partition_list_destroy(topic_partitions);

rd_kafka_DeleteTopic_t *del_topics[1];
del_topics[0] = rd_kafka_DeleteTopic_new(topicname);
rd_kafka_DeleteTopics(rk, del_topics, 1, NULL, queue);
rd_kafka_DeleteTopic_destroy_array(del_topics, 1);
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
rd_kafka_event_destroy(event);
test_DeleteTopics_simple(rk, NULL, (char **)&topic, 1, NULL);

if (!useq)
rd_kafka_queue_destroy(q);

SUB_TEST_PASS();
}

Expand All @@ -5133,12 +5098,11 @@ static void do_test_apis(rd_kafka_type_t cltype) {

test_conf_init(&conf, NULL, 180);
test_conf_set(conf, "socket.timeout.ms", "10000");

rk = test_create_handle(cltype, conf);

mainq = rd_kafka_queue_get_main(rk);

/* ListOffsets */
do_test_ListOffsets(rk, mainq);
/* Create topics */
do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0);
do_test_CreateTopics("temp queue, op timeout 15000", rk, NULL, 15000,
Expand Down Expand Up @@ -5236,6 +5200,10 @@ static void do_test_apis(rd_kafka_type_t cltype) {
}

if (test_broker_version >= TEST_BRKVER(2, 5, 0, 0)) {
/* ListOffsets */
do_test_ListOffsets("temp queue", rk, NULL, -1);
do_test_ListOffsets("main queue", rk, mainq, 1500);

/* Alter committed offsets */
do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1,
rd_false, rd_true);
Expand Down

0 comments on commit fcc4a3d

Please sign in to comment.