Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed Jul 18, 2023
1 parent 90ca8ea commit 6b35ec6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 26 deletions.
13 changes: 1 addition & 12 deletions examples/list_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ int main(int argc, char **argv) {
* offsets */
rd_kafka_AdminOptions_t *options; /* (Optional) Options for */
rd_kafka_event_t *event; /* Result event */
rd_kafka_resp_err_t api_error = 0;
int exitcode = 0;

// /* Set OffsetSpec to LATEST */
Expand Down Expand Up @@ -201,7 +200,7 @@ int main(int argc, char **argv) {
RD_KAFKA_V_OPAQUE(NULL),
/* End sentinel */
RD_KAFKA_V_END);
rd_kafka_flush(rk,20*1000);
rd_kafka_flush(rk,10*1000);
/* Set timeout (optional) */
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTOFFSETS);

Expand All @@ -217,17 +216,8 @@ int main(int argc, char **argv) {
/* Call ListOffsets */
rd_kafka_ListOffsets(rk, topic_partitions, options, queue);
rd_kafka_AdminOptions_destroy(options);
if(api_error){
printf("Api Entry Point Error : %s\n",rd_kafka_err2str(api_error));
rd_kafka_queue_destroy(queue);

/* Destroy the producer instance */
rd_kafka_destroy(rk);
return api_error;
}
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);

if (!event) {
/* User hit Ctrl-C */
fprintf(stderr, "%% Cancelled by user\n");
Expand All @@ -243,7 +233,6 @@ int main(int argc, char **argv) {
* partitions may have errors. */
const rd_kafka_ListOffsets_result_t *result;
size_t i;

result = rd_kafka_event_ListOffsets_result(event);
size_t result_cnt = rd_kafka_ListOffsets_result_get_count(result);
printf("ListOffsets results:\n");
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4000,7 +4000,7 @@ void rd_kafka_DeleteRecords(rd_kafka_t *rk,
rd_kafka_DeleteRecords_leaders_queried_cb, rko_fanout);
}

/*listOffsets*/
/*ListOffsets*/
void rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topic_partitions,
const rd_kafka_AdminOptions_t *options,
Expand Down Expand Up @@ -4058,6 +4058,7 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk,
if( error_code ){
rd_kafka_admin_result_fail(rko_fanout,
error_code,
"%s",
rd_kafka_err2str(error_code));
rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
rd_true /*destroy*/);
Expand Down
16 changes: 3 additions & 13 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3738,18 +3738,14 @@ void do_test_list_offsets(rd_kafka_t *rk, rd_kafka_queue_t *queue){
topic_partition->offset = RD_KAFKA_OFFSET_SPEC_EARLIEST;

/* Call ListOffsets */
if(rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue))
TEST_FAIL("Unable to call ListOffsets\n");

rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue);
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);

if (!event || rd_kafka_event_error(event)) {
TEST_FAIL("Event Failed\n");
} else {
const rd_kafka_ListOffsets_result_t *result;
size_t i;

result = rd_kafka_event_ListOffsets_result(event);
size_t result_cnt = rd_kafka_ListOffsets_result_get_count(result);
for (i = 0; i < result_cnt; i++){
Expand All @@ -3763,20 +3759,16 @@ void do_test_list_offsets(rd_kafka_t *rk, rd_kafka_queue_t *queue){
rd_kafka_event_destroy(event);

topic_partition->offset = RD_KAFKA_OFFSET_SPEC_LATEST;

/* Call ListOffsets */
if(rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue))
TEST_FAIL("Unable to call ListOffsets\n");
rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);

if (!event || rd_kafka_event_error(event)) {
TEST_FAIL("Event Failed\n");
} else {
const rd_kafka_ListOffsets_result_t *result;
size_t i;

result = rd_kafka_event_ListOffsets_result(event);
size_t result_cnt = rd_kafka_ListOffsets_result_get_count(result);
for (i = 0; i < result_cnt; i++){
Expand All @@ -3791,8 +3783,7 @@ void do_test_list_offsets(rd_kafka_t *rk, rd_kafka_queue_t *queue){
topic_partition->offset = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP;

/* Call ListOffsets */
if(rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue))
TEST_FAIL("Unable to call ListOffsets\n");
rd_kafka_ListOffsets(rk,rd_kafka_topic_partition_list_copy(topic_partitions), options, queue);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
Expand Down Expand Up @@ -3823,7 +3814,6 @@ void do_test_list_offsets(rd_kafka_t *rk, rd_kafka_queue_t *queue){
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
rd_kafka_event_destroy(event);

}

static void do_test_apis(rd_kafka_type_t cltype) {
Expand Down

0 comments on commit 6b35ec6

Please sign in to comment.