From bd9267bc5fcd5e3d202f0684674dfc48c01cf40b Mon Sep 17 00:00:00 2001 From: Andrew Garrett Date: Tue, 19 Oct 2021 08:26:35 -0400 Subject: [PATCH] Fixes error handling for error responses from STS --- src/rdkafka_aws.c | 27 +++++++++++++-------------- src/rdkafka_sasl_aws_msk_iam.c | 17 ++++++++++------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/rdkafka_aws.c b/src/rdkafka_aws.c index 494815b05b..75413550b0 100644 --- a/src/rdkafka_aws.c +++ b/src/rdkafka_aws.c @@ -76,7 +76,7 @@ static size_t rd_kafka_aws_curl_write_callback(char *ptr, size_t size, size_t nm size_t realsize = size * nmemb; curl_in_mem_buf *req = (curl_in_mem_buf *) userdata; - printf("receive chunk of %zu bytes\n", realsize); + printf("received chunk of %zu bytes\n", realsize); while (req->buflen < req->len + realsize + 1) { @@ -401,8 +401,6 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential, const char *signed_headers, const char *request_parameters, const EVP_MD *md) { - int r = 1; - char *canonical_request = rd_kafka_aws_build_canonical_request( host, method, @@ -515,23 +513,25 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential, res = curl_easy_perform(curl); if (res != CURLE_OK) { - /* add errstr handling */ fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); return -1; } xmlDoc *document; xmlNode *cur; - document = xmlReadMemory(req.buffer, req.len, "assume_role_response.xml", NULL, 0); + document = xmlReadMemory((char *)req.buffer, req.len, "assume_role_response.xml", NULL, 0); if (document == NULL) { - /* add errstr handling */ fprintf(stderr, "Failed to parse document\n"); - // return -1; + return -1; } cur = xmlDocGetRootElement(document); + if (xmlStrcmp(cur->name, (const xmlChar *)"ErrorResponse") == 0) { + fprintf(stderr, "Error occurred in AssumeRole call: %s\n", req.buffer); + return -1; + } cur = cur->children; while (cur != NULL) { - if ((!xmlStrcmp(cur->name, (const xmlChar *)"AssumeRoleResult"))) { + if (!xmlStrcmp(cur->name, (const xmlChar *)"AssumeRoleResult")) { break; } cur = cur->next; @@ -539,27 +539,26 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential, cur = cur->children; while (cur != NULL) { - if ((!xmlStrcmp(cur->name, (const xmlChar *)"Credentials"))) { + if (!xmlStrcmp(cur->name, (const xmlChar *)"Credentials")) { break; } cur = cur->next; } - cur = cur->children; while (cur != NULL) { - if ((!xmlStrcmp(cur->name, (const xmlChar *)"AccessKeyId"))) { + if (!xmlStrcmp(cur->name, (const xmlChar *)"AccessKeyId")) { xmlChar *content = xmlNodeListGetString(document, cur->children, 1); credential->aws_access_key_id = rd_strdup((const char *)content); xmlFree(content); } - if ((!xmlStrcmp(cur->name, (const xmlChar *)"SecretAccessKey"))) { + if (!xmlStrcmp(cur->name, (const xmlChar *)"SecretAccessKey")) { xmlChar *content = xmlNodeListGetString(document, cur->children, 1); credential->aws_secret_access_key = rd_strdup((const char *)content); xmlFree(content); } - if ((!xmlStrcmp(cur->name, (const xmlChar *)"SessionToken"))) { + if (!xmlStrcmp(cur->name, (const xmlChar *)"SessionToken")) { xmlChar *content = xmlNodeListGetString(document, cur->children, 1); credential->aws_security_token = rd_strdup((const char *)content); xmlFree(content); @@ -581,7 +580,7 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential, } curl_easy_cleanup(curl); - return r; + return 1; } /** diff --git a/src/rdkafka_sasl_aws_msk_iam.c b/src/rdkafka_sasl_aws_msk_iam.c index 3ac7f2b49e..78cf7151c1 100644 --- a/src/rdkafka_sasl_aws_msk_iam.c +++ b/src/rdkafka_sasl_aws_msk_iam.c @@ -277,7 +277,6 @@ rd_kafka_aws_msk_iam_credential_refresh0 ( str_builder_t *sb; sb = str_builder_create(); - int r = 1; char *handle_aws_access_key_id; char *handle_aws_secret_access_key; char *handle_aws_region; @@ -349,7 +348,8 @@ rd_kafka_aws_msk_iam_credential_refresh0 ( credential->aws_region = rd_strdup(handle_aws_region); credential->md_lifetime_ms = now_wallclock_ms + conf->sasl.duration_sec * 1000; - rd_kafka_aws_send_request(credential, + rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Sending refresh request to STS"); + if (rd_kafka_aws_send_request(credential, ymd, hms, host, @@ -363,12 +363,15 @@ rd_kafka_aws_msk_iam_credential_refresh0 ( canonical_headers, signed_headers, request_parameters, - md); - - if (r == -1) { + md) == -1) { + rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "AWS credential retrieval and parsing failed"); rd_kafka_sasl_aws_msk_iam_credential_free(credential); + + return -1; } + rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "New AWS credentials retrieved from STS"); + RD_IF_FREE(handle_aws_access_key_id, rd_free); RD_IF_FREE(handle_aws_secret_access_key, rd_free); RD_IF_FREE(handle_aws_region, rd_free); @@ -380,7 +383,7 @@ rd_kafka_aws_msk_iam_credential_refresh0 ( RD_IF_FREE(canonical_headers, rd_free); RD_IF_FREE(request_parameters, rd_free); - return r; + return 1; } /** @@ -396,7 +399,7 @@ rd_kafka_aws_msk_iam_credential_refresh (rd_kafka_t *rk, void *opaque) { char errstr[512]; rd_kafka_aws_credential_t credential = RD_ZERO_INIT; - rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Checking to refreshing AWS credentials"); + rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Checking whether to refresh AWS credentials"); if (rk->rk_conf.sasl.enable_use_sts) { rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Use STS enabled, will refresh credentials");