From fdb71472d1dcea053f2fd6df6e6a44272056d375 Mon Sep 17 00:00:00 2001 From: DavidKorczynski Date: Mon, 26 Apr 2021 19:04:17 +0100 Subject: [PATCH] parser: json: fix type confusion bug (#3417) Signed-off-by: davkor --- CMakeLists.txt | 11 ++ plugins/out_s3/CMakeLists.txt | 5 + plugins/out_s3/arrow/CMakeLists.txt | 7 ++ plugins/out_s3/arrow/compress.c | 152 ++++++++++++++++++++++++++++ plugins/out_s3/arrow/compress.h | 13 +++ plugins/out_s3/s3.c | 47 ++++++--- plugins/out_s3/s3.h | 6 +- src/flb_parser_json.c | 6 ++ 8 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 plugins/out_s3/arrow/CMakeLists.txt create mode 100644 plugins/out_s3/arrow/compress.c create mode 100644 plugins/out_s3/arrow/compress.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b4faa194694..1bb8556cd93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,6 +93,7 @@ option(FLB_STATIC_CONF "Build binary using static configuration") option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes) option(FLB_CORO_STACK_SIZE "Set coroutine stack size") option(FLB_AVRO_ENCODER "Build with Avro encoding support" No) +option(FLB_ARROW "Build with Apache Arrow support" No) # Metrics: Experimental Feature, disabled by default on 0.12 series # but enabled in the upcoming 0.13 release. Note that development @@ -680,6 +681,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND)) FLB_OPTION(FLB_OUT_PGSQL OFF) endif() +# Arrow GLib +# ========== +find_package(PkgConfig) +pkg_check_modules(ARROW_GLIB QUIET arrow-glib) +if(FLB_ARROW AND ARROW_GLIB_FOUND) + FLB_DEFINITION(FLB_HAVE_ARROW) +else() + set(FLB_ARROW OFF) +endif() + # Pthread Local Storage # ===================== # By default we expect the compiler already support thread local storage diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 94e04861707..2a3412b3682 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -4,3 +4,8 @@ set(src s3_multipart.c) FLB_PLUGIN(out_s3 "${src}" "") + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-plugin-out_s3 out-s3-arrow) +endif() diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt new file mode 100644 index 00000000000..36dedc714ca --- /dev/null +++ b/plugins/out_s3/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(out-s3-arrow STATIC ${src}) + +target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c new file mode 100644 index 00000000000..dca27b5d9c4 --- /dev/null +++ b/plugins/out_s3/arrow/compress.c @@ -0,0 +1,152 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include +#include + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_print("fail to create json options: %s\n", error->message); + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_print("cannot create json reader: %s\n", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_print("cannot parse json: %s\n", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_print("cannot create buffer: %s\n", error->message); + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_print("cannot output table: %s\n", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json(json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h new file mode 100644 index 00000000000..867d9ce02f3 --- /dev/null +++ b/plugins/out_s3/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index bca7f2ba9f4..29454c74eb1 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -36,6 +36,10 @@ #include "s3.h" #include "s3_store.h" +#ifdef FLB_HAVE_ARROW +#include "arrow/compress.h" +#endif + static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); @@ -122,7 +126,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -149,7 +153,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -515,17 +519,23 @@ static int cb_s3_init(struct flb_output_instance *ins, tmp = flb_output_get_property("compression", ins); if (tmp) { - if (strcmp((char *) tmp, "gzip") != 0) { - flb_plg_error(ctx->ins, - "'gzip' is currently the only supported value for 'compression'"); - return -1; - } else if (ctx->use_put_object == FLB_FALSE) { + if (ctx->use_put_object == FLB_FALSE) { flb_plg_error(ctx->ins, "use_put_object must be enabled when compression is enabled"); return -1; } - - ctx->compression = (char *) tmp; + if (strcmp(tmp, "gzip") == 0) { + ctx->compression = COMPRESS_GZIP; + } +#ifdef FLB_HAVE_ARROW + else if (strcmp(tmp, "arrow") == 0) { + ctx->compression = COMPRESS_ARROW; + } +#endif + else { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } } tmp = flb_output_get_property("content_type", ins); @@ -1090,7 +1100,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); @@ -1098,7 +1108,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time return -1; } final_body = (char *) compressed_body; - } else { + } +#ifdef FLB_HAVE_ARROW + else if (ctx->compression == COMPRESS_ARROW) { + ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + flb_sds_destroy(uri); + return -1; + } + final_body = compressed_body; + } +#endif + else { final_body = body; final_body_size = body_size; } @@ -1602,7 +1624,8 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, "Compression type for S3 objects. 'gzip' is currently the only supported value. " - "The Content-Encoding HTTP Header will be set to 'gzip'." + "The Content-Encoding HTTP Header will be set to 'gzip'. " + "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 8d9382ba8c4..1b288db7260 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -46,6 +46,10 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 +#define COMPRESS_NONE 0 +#define COMPRESS_GZIP 1 +#define COMPRESS_ARROW 2 + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -95,11 +99,11 @@ struct flb_s3 { char *endpoint; char *sts_endpoint; char *canned_acl; - char *compression; char *content_type; int free_endpoint; int use_put_object; int send_content_md5; + int compression; struct flb_aws_provider *provider; struct flb_aws_provider *base_provider; diff --git a/src/flb_parser_json.c b/src/flb_parser_json.c index f3a55b3d4fd..1afc39a6a28 100644 --- a/src/flb_parser_json.c +++ b/src/flb_parser_json.c @@ -168,6 +168,12 @@ int flb_parser_json_do(struct flb_parser *parser, return *out_size; } + /* Ensure we have an accurate type */ + if (v->type != MSGPACK_OBJECT_STR) { + msgpack_unpacked_destroy(&result); + return *out_size; + } + /* Lookup time */ ret = flb_parser_time_lookup(v->via.str.ptr, v->via.str.size, 0, parser, &tm, &tmfrac);