diff --git a/CMakeLists.txt b/CMakeLists.txt index 472681751b1..fad8daa0e31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,6 +93,7 @@ option(FLB_STATIC_CONF "Build binary using static configuration") option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes) option(FLB_CORO_STACK_SIZE "Set coroutine stack size") option(FLB_AVRO_ENCODER "Build with Avro encoding support" No) +option(FLB_ARROW "Build with Apache Arrow support" No) # Metrics: Experimental Feature, disabled by default on 0.12 series # but enabled in the upcoming 0.13 release. Note that development @@ -672,6 +673,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND)) FLB_OPTION(FLB_OUT_PGSQL OFF) endif() +# Arrow Glib +# ========== +find_package(PkgConfig) +pkg_check_modules(ARROW_GLIB QUIET arrow-glib) +if(FLB_ARROW AND ARROW_GLIB_FOUND) + FLB_DEFINITION(FLB_HAVE_ARROW) +else() + set(FLB_ARROW OFF) +endif() + # Pthread Local Storage # ===================== # By default we expect the compiler already support thread local storage diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 94e04861707..2a3412b3682 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -4,3 +4,8 @@ set(src s3_multipart.c) FLB_PLUGIN(out_s3 "${src}" "") + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-plugin-out_s3 out-s3-arrow) +endif() diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt new file mode 100644 index 00000000000..36dedc714ca --- /dev/null +++ b/plugins/out_s3/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(out-s3-arrow STATIC ${src}) + +target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c new file mode 100644 index 00000000000..0daf301f594 --- /dev/null +++ b/plugins/out_s3/arrow/compress.c @@ -0,0 +1,174 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include +#include + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * matrix). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_print("fail to create json options: %s\n", error->message); + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_print("cannot create json reader: %s\n", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_print("cannot parse json: %s\n", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowSchema *schema; + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GArrowRecordBatchStreamWriter *writer; + GError *error = NULL; + gboolean success; + + schema = garrow_table_get_schema(table); + if (schema == NULL) { + return NULL; + } + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_print("cannot create buffer: %s\n", error->message); + g_error_free(error); + g_object_unref(schema); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(schema); + g_object_unref(buffer); + return NULL; + } + + writer = garrow_record_batch_stream_writer_new( + GARROW_OUTPUT_STREAM(sink), + schema, &error); + if (writer == NULL) { + g_print("cannot create writer: %s\n", error->message); + g_error_free(error); + g_object_unref(schema); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + + success = garrow_record_batch_writer_write_table( + GARROW_RECORD_BATCH_WRITER(writer), + table, &error); + if (!success) { + g_print("cannot output stream: %s\n", error->message); + g_error_free(error); + g_object_unref(schema); + g_object_unref(buffer); + g_object_unref(sink); + g_object_unref(writer); + return NULL; + } + return buffer; +} + +int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json(json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h new file mode 100644 index 00000000000..867d9ce02f3 --- /dev/null +++ b/plugins/out_s3/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index d182fa1a723..a4eb42311fc 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -34,6 +34,10 @@ #include "s3.h" #include "s3_store.h" +#ifdef FLB_HAVE_ARROW +#include "arrow/compress.h" +#endif + static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); @@ -113,7 +117,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -137,7 +141,7 @@ static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, i s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -497,17 +501,23 @@ static int cb_s3_init(struct flb_output_instance *ins, tmp = flb_output_get_property("compression", ins); if (tmp) { - if (strcmp((char *) tmp, "gzip") != 0) { - flb_plg_error(ctx->ins, - "'gzip' is currently the only supported value for 'compression'"); - return -1; - } else if (ctx->use_put_object == FLB_FALSE) { + if (ctx->use_put_object == FLB_FALSE) { flb_plg_error(ctx->ins, "use_put_object must be enabled when compression is enabled"); return -1; } - - ctx->compression = (char *) tmp; + if (strcmp(tmp, "gzip") == 0) { + ctx->compression = COMPRESS_GZIP; + } +#ifdef FLB_HAVE_ARROW + else if (strcmp(tmp, "arrow") == 0) { + ctx->compression = COMPRESS_ARROW; + } +#endif + else { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } } tmp = flb_output_get_property("content_type", ins); @@ -1071,7 +1081,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression != NULL) { + if (ctx->compression == COMPRESS_GZIP) { ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); @@ -1079,7 +1089,19 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time return -1; } final_body = (char *) compressed_body; - } else { + } +#ifdef FLB_HAVE_ARROW + else if (ctx->compression == COMPRESS_ARROW) { + ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + flb_sds_destroy(uri); + return -1; + } + final_body = compressed_body; + } +#endif + else { final_body = body; final_body_size = body_size; } @@ -1550,7 +1572,8 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, "Compression type for S3 objects. 'gzip' is currently the only supported value. " - "The Content-Encoding HTTP Header will be set to 'gzip'." + "The Content-Encoding HTTP Header will be set to 'gzip'. " + "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 6cb6c8ce7fc..d48ee710ca3 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -46,6 +46,10 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 +#define COMPRESS_NONE 0 +#define COMPRESS_GZIP 1 +#define COMPRESS_ARROW 2 + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -95,10 +99,10 @@ struct flb_s3 { char *endpoint; char *sts_endpoint; char *canned_acl; - char *compression; char *content_type; int free_endpoint; int use_put_object; + int compression; struct flb_aws_provider *provider; struct flb_aws_provider *base_provider;