diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index 46fd234a7ac..5f8fa6feb0c 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -1200,6 +1200,55 @@ int fw_prot_secure_forward_handshake(struct flb_input_instance *ins, return -1; } +static size_t gzip_concatenated_count(const char *data, size_t len) +{ + int i; + size_t count = 0; + const uint8_t *p; + + p = (const uint8_t *) data; + + /* search other gzip starting bits and method. */ + for (i = 2; i < len && + i + 2 <= len; i++) { + if (p[i] == 0x1F && p[i+1] == 0x8B && p[i+2] == 8) { + count++; + } + } + + return count; +} + +static size_t gzip_concatenated_borders(const char *data, size_t len, size_t **out_borders, size_t border_count) +{ + int i; + size_t count = 0; + const uint8_t *p; + size_t *borders = NULL; + + p = (const uint8_t *) data; + borders = (size_t *) flb_calloc(1, sizeof(size_t) * (border_count + 1)); + if (borders == NULL) { + flb_errno(); + return -1; + } + + /* search other gzip starting bits and method. */ + for (i = 2; i < len && + i + 2 <= len; i++) { + if (p[i] == 0x1F && p[i+1] == 0x8B && p[i+2] == 8) { + borders[count] = i; + count++; + } + } + /* The length of the last border refers to the original length. */ + borders[border_count] = len; + + *out_borders = borders; + + return count; +} + int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) { int ret; @@ -1488,13 +1537,49 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } if (ret == FLB_TRUE) { - ret = flb_gzip_uncompress((void *) data, len, + size_t prev_pos = 0; + size_t gzip_payloads_count = 0; + size_t loop = 0; + size_t *gzip_borders = NULL; + const size_t original_len = len; + + gzip_payloads_count = gzip_concatenated_count(data, len); + flb_plg_debug(ctx->ins, "concatenated gzip payload count is %zd", + gzip_payloads_count); + if (gzip_payloads_count > 0) { + if (gzip_concatenated_borders(data, len, &gzip_borders, gzip_payloads_count) < 0) { + flb_plg_error(ctx->ins, + "failed to traverse boundaries of concatenated gzip payloads"); + return -1; + } + } + + retry_uncompress: + if (gzip_payloads_count > 0) { + if (loop == 0) { + len = gzip_borders[loop]; + } + else if (gzip_borders[loop] == original_len) { + len = original_len - gzip_borders[loop - 1]; + } + else if (loop >= 1) { + len = gzip_borders[loop] - gzip_borders[loop - 1]; + } + } + flb_plg_trace(ctx->ins, + "[gzip decompression] loop = %zd, len = %zd, original_len = %zd", + loop, len, original_len); + + ret = flb_gzip_uncompress((void *) (data + prev_pos), len, &gz_data, &gz_size); if (ret == -1) { flb_plg_error(ctx->ins, "gzip uncompress failure"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); + if (gzip_borders != NULL) { + flb_free(gzip_borders); + } return -1; } @@ -1506,6 +1591,9 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); flb_free(gz_data); + if (gzip_borders != NULL) { + flb_free(gzip_borders); + } return -1; } event_type = ret; @@ -1519,9 +1607,34 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); flb_free(gz_data); + if (gzip_borders != NULL) { + flb_free(gzip_borders); + } + return -1; } flb_free(gz_data); + + /* a valid payload of gzip is larger than 18 bytes. */ + if (gzip_payloads_count > 0) { + if ((gzip_payloads_count - loop) > 0 && + (original_len - gzip_borders[loop]) > 18) { + len = original_len - gzip_borders[loop]; + flb_plg_debug(ctx->ins, "left unconsumed %zd byte(s)", len); + prev_pos = gzip_borders[loop]; + loop++; + goto retry_uncompress; + } + else { + flb_plg_debug(ctx->ins, "left unconsumed %zd byte(s)", + original_len - gzip_borders[loop]); + } + if (loop == gzip_payloads_count) { + if (gzip_borders != NULL) { + flb_free(gzip_borders); + } + } + } } else { event_type = FLB_EVENT_TYPE_LOGS;