diff --git a/examples/describe_cluster.c b/examples/describe_cluster.c index 3df4e623a3..f6ca517de9 100644 --- a/examples/describe_cluster.c +++ b/examples/describe_cluster.c @@ -49,7 +49,7 @@ const char *argv0; -static rd_kafka_queue_t *queue; /** Admin result queue. +static rd_kafka_queue_t *queue = NULL; /** Admin result queue. * This is a global so we can * yield in stop() */ static volatile sig_atomic_t run = 1; @@ -63,7 +63,9 @@ static void stop(int sig) { exit(2); } run = 0; - rd_kafka_queue_yield(queue); + + if (queue) + rd_kafka_queue_yield(queue); } @@ -73,7 +75,7 @@ static void usage(const char *reason, ...) { stderr, "Describe cluster usage examples\n" "\n" - "Usage: %s ...\n" + "Usage: %s " "\n" "Options:\n" " -b Bootstrap server list to connect to.\n" @@ -123,7 +125,7 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { */ int64_t parse_int(const char *what, const char *str) { char *end; - unsigned long n = strtoull(str, &end, 0); + long n = strtol(str, &end, 0); if (end != str + strlen(str)) { fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", @@ -134,6 +136,7 @@ int64_t parse_int(const char *what, const char *str) { return (int64_t)n; } + /** * @brief Print cluster information. */ @@ -176,16 +179,21 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) { return 0; } + /** * @brief Call rd_kafka_DescribeCluster() */ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { - rd_kafka_t *rk; + rd_kafka_t *rk = NULL; char errstr[512]; - rd_kafka_AdminOptions_t *options; + rd_kafka_AdminOptions_t *options = NULL; rd_kafka_event_t *event = NULL; rd_kafka_error_t *error; int retval = 0; + const int min_argc = 1; + + if (argc < min_argc) + usage("Wrong number of arguments."); int include_cluster_authorized_operations = parse_int("include_cluster_authorized_operations", argv[0]); @@ -194,18 +202,15 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { usage("include_cluster_authorized_operations not a 0-1 int"); /* - * Create consumer instance + * Create producer instance * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ - rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) - fatal("Failed to create new consumer: %s", errstr); + fatal("Failed to create new producer: %s", errstr); - /* - * Describe cluster - */ queue = rd_kafka_queue_new(rk); /* Signal handler for clean shutdown */ @@ -217,6 +222,7 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { if (rd_kafka_AdminOptions_set_request_timeout( options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) { fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + retval = 1; goto exit; } if ((error = rd_kafka_AdminOptions_set_include_authorized_operations( @@ -226,9 +232,11 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { "operations: %s\n", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); - exit(1); + retval = 1; + goto exit; } + /* Call DescribeCluster. */ rd_kafka_DescribeCluster(rk, options, queue); /* Wait for results */ @@ -246,8 +254,7 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { /* DescribeCluster request failed */ fprintf(stderr, "%% DescribeCluster failed[%" PRId32 "]: %s\n", err, rd_kafka_event_error_string(event)); - goto exit; - + retval = 1; } else { /* DescribeCluster request succeeded */ const rd_kafka_DescribeCluster_result_t *result; @@ -259,12 +266,15 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) { exit: + /* Cleanup. */ if (event) rd_kafka_event_destroy(event); - rd_kafka_AdminOptions_destroy(options); - rd_kafka_queue_destroy(queue); - /* Destroy the client instance */ - rd_kafka_destroy(rk); + if (options) + rd_kafka_AdminOptions_destroy(options); + if (queue) + rd_kafka_queue_destroy(queue); + if (rk) + rd_kafka_destroy(rk); exit(retval); } diff --git a/examples/describe_consumer_groups.c b/examples/describe_consumer_groups.c index 09aa1e5a1a..d07e2fe7cd 100644 --- a/examples/describe_consumer_groups.c +++ b/examples/describe_consumer_groups.c @@ -50,9 +50,9 @@ const char *argv0; -static rd_kafka_queue_t *queue; /** Admin result queue. - * This is a global so we can - * yield in stop() */ +static rd_kafka_queue_t *queue = NULL; /** Admin result queue. + * This is a global so we can + * yield in stop() */ static volatile sig_atomic_t run = 1; /** @@ -64,7 +64,9 @@ static void stop(int sig) { exit(2); } run = 0; - rd_kafka_queue_yield(queue); + + if (queue) + rd_kafka_queue_yield(queue); } @@ -146,9 +148,92 @@ print_partition_list(FILE *fp, fprintf(fp, "\n"); } + +/** + * @brief Print group member information. + */ +static void +print_group_member_info(const rd_kafka_MemberDescription_t *member) { + printf( + " Member \"%s\" with client-id %s," + " group instance id: %s, host %s\n", + rd_kafka_MemberDescription_consumer_id(member), + rd_kafka_MemberDescription_client_id(member), + rd_kafka_MemberDescription_group_instance_id(member), + rd_kafka_MemberDescription_host(member)); + const rd_kafka_MemberAssignment_t *assignment = + rd_kafka_MemberDescription_assignment(member); + const rd_kafka_topic_partition_list_t *topic_partitions = + rd_kafka_MemberAssignment_partitions(assignment); + if (!topic_partitions) { + printf(" No assignment\n"); + } else if (topic_partitions->cnt == 0) { + printf(" Empty assignment\n"); + } else { + printf(" Assignment:\n"); + print_partition_list(stdout, topic_partitions, 0, " "); + } +} + + /** * @brief Print group information. */ +static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { + int j, member_cnt, authorized_operation_count, acl_operation; + const rd_kafka_error_t *error; + char coordinator_desc[512]; + const rd_kafka_Node_t *coordinator = NULL; + const char *group_id = + rd_kafka_ConsumerGroupDescription_group_id(group); + const char *partition_assignor = + rd_kafka_ConsumerGroupDescription_partition_assignor(group); + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupDescription_state(group); + authorized_operation_count = + rd_kafka_ConsumerGroupDescription_authorized_operation_count(group); + member_cnt = rd_kafka_ConsumerGroupDescription_member_count(group); + error = rd_kafka_ConsumerGroupDescription_error(group); + coordinator = rd_kafka_ConsumerGroupDescription_coordinator(group); + *coordinator_desc = '\0'; + + if (coordinator != NULL) { + snprintf(coordinator_desc, sizeof(coordinator_desc), + ", coordinator [id: %" PRId32 + ", host: %s" + ", port: %" PRIu16 "]", + rd_kafka_Node_id(coordinator), + rd_kafka_Node_host(coordinator), + rd_kafka_Node_port(coordinator)); + } + printf( + "Group \"%s\", partition assignor \"%s\", " + " state %s%s, with %" PRId32 " member(s)\n", + group_id, partition_assignor, + rd_kafka_consumer_group_state_name(state), coordinator_desc, + member_cnt); + for (j = 0; j < authorized_operation_count; j++) { + acl_operation = + rd_kafka_ConsumerGroupDescription_authorized_operation( + group, j); + printf("%s operation is allowed\n", + rd_kafka_AclOperation_name(acl_operation)); + } + if (error) + printf(" error[%" PRId32 "]: %s", rd_kafka_error_code(error), + rd_kafka_error_string(error)); + printf("\n"); + for (j = 0; j < member_cnt; j++) { + const rd_kafka_MemberDescription_t *member = + rd_kafka_ConsumerGroupDescription_member(group, j); + print_group_member_info(member); + } +} + + +/** + * @brief Print groups information. + */ static int print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc, int groups_cnt) { @@ -168,82 +253,7 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc, } for (i = 0; i < result_groups_cnt; i++) { - int j, member_cnt, authorized_operation_count, acl_operation; - const rd_kafka_error_t *error; - const rd_kafka_ConsumerGroupDescription_t *group = - result_groups[i]; - char coordinator_desc[512]; - const rd_kafka_Node_t *coordinator = NULL; - const char *group_id = - rd_kafka_ConsumerGroupDescription_group_id(group); - const char *partition_assignor = - rd_kafka_ConsumerGroupDescription_partition_assignor(group); - rd_kafka_consumer_group_state_t state = - rd_kafka_ConsumerGroupDescription_state(group); - authorized_operation_count = - rd_kafka_ConsumerGroupDescription_authorized_operation_count( - group); - member_cnt = - rd_kafka_ConsumerGroupDescription_member_count(group); - error = rd_kafka_ConsumerGroupDescription_error(group); - coordinator = - rd_kafka_ConsumerGroupDescription_coordinator(group); - *coordinator_desc = '\0'; - - if (coordinator != NULL) { - snprintf(coordinator_desc, sizeof(coordinator_desc), - ", coordinator [id: %" PRId32 - ", host: %s" - ", port: %" PRIu16 "]", - rd_kafka_Node_id(coordinator), - rd_kafka_Node_host(coordinator), - rd_kafka_Node_port(coordinator)); - } - printf( - "Group \"%s\", partition assignor \"%s\", " - " state %s%s, with %" PRId32 " member(s)\n", - group_id, partition_assignor, - rd_kafka_consumer_group_state_name(state), coordinator_desc, - member_cnt); - for (j = 0; j < authorized_operation_count; j++) { - acl_operation = - rd_kafka_ConsumerGroupDescription_authorized_operation( - group, j); - printf("%s operation is allowed\n", - rd_kafka_AclOperation_name(acl_operation)); - } - if (error) - printf(" error[%" PRId32 "]: %s", - rd_kafka_error_code(error), - rd_kafka_error_string(error)); - printf("\n"); - for (j = 0; j < member_cnt; j++) { - const rd_kafka_MemberDescription_t *member = - rd_kafka_ConsumerGroupDescription_member(group, j); - printf( - " Member \"%s\" with client-id %s," - " group instance id: %s, host %s\n", - rd_kafka_MemberDescription_consumer_id(member), - rd_kafka_MemberDescription_client_id(member), - rd_kafka_MemberDescription_group_instance_id( - member), - rd_kafka_MemberDescription_host(member)); - const rd_kafka_MemberAssignment_t *assignment = - rd_kafka_MemberDescription_assignment(member); - const rd_kafka_topic_partition_list_t - *topic_partitions = - rd_kafka_MemberAssignment_partitions( - assignment); - if (!topic_partitions) { - printf(" No assignment\n"); - } else if (topic_partitions->cnt == 0) { - printf(" Empty assignment\n"); - } else { - printf(" Assignment:\n"); - print_partition_list(stdout, topic_partitions, - 0, " "); - } - } + print_group_info(result_groups[i]); printf("\n"); } return 0; @@ -254,7 +264,7 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc, */ int64_t parse_int(const char *what, const char *str) { char *end; - unsigned long n = strtoull(str, &end, 0); + long n = strtol(str, &end, 0); if (end != str + strlen(str)) { fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", @@ -271,25 +281,28 @@ int64_t parse_int(const char *what, const char *str) { */ static void cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { - rd_kafka_t *rk; + rd_kafka_t *rk = NULL; const char **groups = NULL; char errstr[512]; - rd_kafka_AdminOptions_t *options; + rd_kafka_AdminOptions_t *options = NULL; rd_kafka_event_t *event = NULL; rd_kafka_error_t *error; - int retval = 0; - int groups_cnt = 0; + int retval = 0; + int groups_cnt = 0; + const int min_argc = 2; + int include_authorized_operations; + + if (argc < min_argc) + usage("Wrong number of arguments"); - int include_authorized_operations = + include_authorized_operations = parse_int("include_authorized_operations", argv[0]); if (include_authorized_operations < 0 || include_authorized_operations > 1) usage("include_authorized_operations not a 0-1 int"); - if (argc >= 1) { - groups = (const char **)&argv[1]; - groups_cnt = argc - 1; - } + groups = (const char **)&argv[1]; + groups_cnt = argc - 1; /* * Create consumer instance @@ -315,6 +328,7 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { if (rd_kafka_AdminOptions_set_request_timeout( options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) { fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + retval = 1; goto exit; } if ((error = rd_kafka_AdminOptions_set_include_authorized_operations( @@ -323,7 +337,8 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { "%% Failed to set require authorized operations: %s\n", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); - exit(1); + retval = 1; + goto exit; } rd_kafka_DescribeConsumerGroups(rk, groups, groups_cnt, options, queue); @@ -344,7 +359,7 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { fprintf(stderr, "%% DescribeConsumerGroups failed[%" PRId32 "]: %s\n", err, rd_kafka_event_error_string(event)); - goto exit; + retval = 1; } else { /* DescribeConsumerGroups request succeeded, but individual @@ -358,12 +373,15 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { exit: + /* Cleanup. */ if (event) rd_kafka_event_destroy(event); - rd_kafka_AdminOptions_destroy(options); - rd_kafka_queue_destroy(queue); - /* Destroy the client instance */ - rd_kafka_destroy(rk); + if (options) + rd_kafka_AdminOptions_destroy(options); + if (queue) + rd_kafka_queue_destroy(queue); + if (rk) + rd_kafka_destroy(rk); exit(retval); } diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 89ec32fca8..5085b36725 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -49,9 +49,9 @@ const char *argv0; -static rd_kafka_queue_t *queue; /** Admin result queue. - * This is a global so we can - * yield in stop() */ +static rd_kafka_queue_t *queue = NULL; /** Admin result queue. + * This is a global so we can + * yield in stop() */ static volatile sig_atomic_t run = 1; /** @@ -63,7 +63,8 @@ static void stop(int sig) { exit(2); } run = 0; - rd_kafka_queue_yield(queue); + if (queue) + rd_kafka_queue_yield(queue); } @@ -72,7 +73,7 @@ static void usage(const char *reason, ...) { fprintf(stderr, "Describe topics usage examples\n" "\n" - "Usage: %s \n" + "Usage: %s " " ...\n" "\n" "Options:\n" @@ -124,7 +125,7 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { */ int64_t parse_int(const char *what, const char *str) { char *end; - unsigned long n = strtoull(str, &end, 0); + long n = strtol(str, &end, 0); if (end != str + strlen(str)) { fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", @@ -136,6 +137,93 @@ int64_t parse_int(const char *what, const char *str) { } +/** + * @brief Print partition information. + */ +static void print_partition_info(const rd_kafka_TopicDescription_t *topic, + int partition_idx) { + rd_kafka_resp_err_t partition_err; + int leader, id, isr_cnt, replica_cnt, k; + partition_err = + rd_kafka_TopicDescription_partition_error(topic, partition_idx); + + if (partition_err != RD_KAFKA_RESP_ERR_NO_ERROR) { + printf("\tPartition at index %d has error[%" PRId32 "]: %s\n", + partition_idx, partition_err, + rd_kafka_err2str(partition_err)); + return; + } + printf("\tPartition at index %d succeeded\n", partition_idx); + id = rd_kafka_TopicDescription_partition_id(topic, partition_idx); + leader = + rd_kafka_TopicDescription_partition_leader(topic, partition_idx); + isr_cnt = + rd_kafka_TopicDescription_partition_isr_count(topic, partition_idx); + replica_cnt = rd_kafka_TopicDescription_partition_replica_count( + topic, partition_idx); + printf("\tPartition has id: %d with leader: %d\n", id, leader); + if (isr_cnt) { + printf( + "\tThe in-sync replica count is: %d, they " + "are: ", + isr_cnt); + for (k = 0; k < isr_cnt; k++) + printf("%d ", rd_kafka_TopicDescription_partition_isr( + topic, partition_idx, k)); + printf("\n"); + } else + printf("\tThe in-sync replica count is 0\n"); + + if (replica_cnt) { + printf("\tThe replica count is: %d, they are: ", replica_cnt); + for (k = 0; k < replica_cnt; k++) + printf("%d ", + rd_kafka_TopicDescription_partition_replica( + topic, partition_idx, k)); + printf("\n"); + } else + printf("\tThe replica count is 0\n"); +} + +/** + * @brief Print topic information. + */ +static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { + int j, acl_operation; + const rd_kafka_error_t *error; + const char *topic_name = rd_kafka_TopicDescription_topic_name(topic); + int topic_authorized_operations_cnt = + rd_kafka_TopicDescription_topic_authorized_operation_count(topic); + int partition_cnt = + rd_kafka_TopicDescription_topic_partition_count(topic); + error = rd_kafka_TopicDescription_error(topic); + + if (rd_kafka_error_code(error)) { + printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name, + rd_kafka_error_code(error), + rd_kafka_error_string(error)); + return; + } + + printf( + "Topic: %s succeeded, has %d topic authorized operations " + "allowed, they are:\n", + topic_name, topic_authorized_operations_cnt); + for (j = 0; j < topic_authorized_operations_cnt; j++) { + acl_operation = + rd_kafka_TopicDescription_authorized_operation(topic, j); + printf("\t%s operation is allowed\n", + rd_kafka_AclOperation_name(acl_operation)); + } + + printf("partition count is: %d\n", partition_cnt); + for (j = 0; j < partition_cnt; j++) { + print_partition_info(topic, j); + printf("\n"); + } +} + + /** * @brief Print topics information. */ @@ -157,90 +245,7 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc, } for (i = 0; i < result_topics_cnt; i++) { - int j, acl_operation; - const rd_kafka_error_t *error; - const rd_kafka_TopicDescription_t *topic = result_topics[i]; - const char *topic_name = - rd_kafka_TopicDescription_topic_name(topic); - int topic_authorized_operations_cnt = - rd_kafka_TopicDescription_topic_authorized_operation_count( - topic); - int partition_cnt = - rd_kafka_TopicDescription_topic_partition_count(topic); - error = rd_kafka_TopicDescription_error(topic); - - if (rd_kafka_error_code(error)) { - printf("Topic: %s has error[%" PRId32 "]: %s\n", - topic_name, rd_kafka_error_code(error), - rd_kafka_error_string(error)); - continue; - } - - printf( - "Topic: %s succeeded, has %d topic authorized operations " - "allowed, they are:\n", - topic_name, topic_authorized_operations_cnt); - for (j = 0; j < topic_authorized_operations_cnt; j++) { - acl_operation = - rd_kafka_TopicDescription_authorized_operation( - topic, j); - printf("\t%s operation is allowed\n", - rd_kafka_AclOperation_name(acl_operation)); - } - - printf("partition count is: %d\n", partition_cnt); - for (j = 0; j < partition_cnt; j++) { - rd_kafka_resp_err_t partition_err; - int leader, id, isr_cnt, replica_cnt, k; - partition_err = - rd_kafka_TopicDescription_partition_error(topic, j); - - if (partition_err != RD_KAFKA_RESP_ERR_NO_ERROR) { - printf( - "\tPartition at index %d has error[%" PRId32 - "]: %s\n", - j, partition_err, - rd_kafka_err2str(partition_err)); - continue; - } - printf("\tPartition at index %d succeeded\n", j); - id = rd_kafka_TopicDescription_partition_id(topic, j); - leader = rd_kafka_TopicDescription_partition_leader( - topic, j); - isr_cnt = rd_kafka_TopicDescription_partition_isr_count( - topic, j); - replica_cnt = - rd_kafka_TopicDescription_partition_replica_count( - topic, j); - printf("\tPartition has id: %d with leader: %d\n", id, - leader); - if (isr_cnt) { - printf( - "\tThe in-sync replica count is: %d, they " - "are: ", - isr_cnt); - for (k = 0; k < isr_cnt; k++) - printf( - "%d ", - rd_kafka_TopicDescription_partition_isr( - topic, j, k)); - printf("\n"); - } else - printf("\tThe in-sync replica count is 0\n"); - - if (replica_cnt) { - printf("\tThe replica count is: %d, they are: ", - replica_cnt); - for (k = 0; k < replica_cnt; k++) - printf( - "%d ", - rd_kafka_TopicDescription_partition_replica( - topic, j, k)); - printf("\n"); - } else - printf("\tThe replica count is 0\n"); - printf("\n"); - } + print_topic_info(result_topics[i]); printf("\n"); } return 0; @@ -252,25 +257,29 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc, * topics. */ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { - rd_kafka_t *rk; + rd_kafka_t *rk = NULL; const char **topics = NULL; char errstr[512]; - rd_kafka_AdminOptions_t *options; - rd_kafka_event_t *event = NULL; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *event = NULL; rd_kafka_error_t *error; - int retval = 0; - int topics_cnt = 0; + int retval = 0; + int topics_cnt = 0; + const int min_argc = 2; + int include_topic_authorized_operations; + + if (argc < min_argc) + usage("Wrong number of arguments"); - int include_topic_authorized_operations = + include_topic_authorized_operations = parse_int("include_topic_authorized_operations", argv[0]); if (include_topic_authorized_operations < 0 || include_topic_authorized_operations > 1) usage("include_topic_authorized_operations not a 0-1 int"); - if (argc >= 1) { - topics = (const char **)&argv[1]; - topics_cnt = argc - 1; - } + topics = (const char **)&argv[1]; + topics_cnt = argc - 1; + /* * Create producer instance * NOTE: rd_kafka_new() takes ownership of the conf object @@ -281,9 +290,6 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { if (!rk) fatal("Failed to create new producer: %s", errstr); - /* - * Describe topics - */ queue = rd_kafka_queue_new(rk); /* Signal handler for clean shutdown */ @@ -308,6 +314,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { goto exit; } + /* Call DescribeTopics */ rd_kafka_DescribeTopics(rk, topics, topics_cnt, options, queue); /* Wait for results */ @@ -325,6 +332,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { /* DescribeTopics request failed */ fprintf(stderr, "%% DescribeTopics failed[%" PRId32 "]: %s\n", err, rd_kafka_event_error_string(event)); + retval = 1; goto exit; } else { @@ -339,12 +347,15 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { exit: + /* Cleanup. */ if (event) rd_kafka_event_destroy(event); - rd_kafka_AdminOptions_destroy(options); - rd_kafka_queue_destroy(queue); - /* Destroy the client instance */ - rd_kafka_destroy(rk); + if (options) + rd_kafka_AdminOptions_destroy(options); + if (queue) + rd_kafka_queue_destroy(queue); + if (rk) + rd_kafka_destroy(rk); exit(retval); }