Skip to content

Commit

Permalink
out_s3: add Apache Arrow support
Browse files Browse the repository at this point in the history
Apache Arrow is an efficient columnar data format that is suitable
for statistical analysis, and popular in machine learning community.

    https://arrow.apache.org/

With this patch merged, users now can specify 'arrow' as the
compression type like this:

    [OUTPUT]
      Name s3
      Bucket some-bucket
      total_file_size 1M
      use_put_object On
      Compression arrow

which makes Fluent Bit convert the request buffer into Apache Arrow
format before uploading.

Signed-off-by: Fujimoto Seiji <[email protected]>
Reviewed-by: Sutou Kouhei <[email protected]>
Reviewed-by: Wesley Pettit <[email protected]>
  • Loading branch information
fujimotos committed Jul 26, 2021
1 parent 06507a2 commit 544fa89
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 15 deletions.
12 changes: 11 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes)
option(FLB_CORO_STACK_SIZE "Set coroutine stack size")
option(FLB_AVRO_ENCODER "Build with Avro encoding support" No)
option(FLB_AWS_ERROR_REPORTER "Build with aws error reporting support" No)

option(FLB_ARROW "Build with Apache Arrow support" No)

# Metrics: Experimental Feature, disabled by default on 0.12 series
# but enabled in the upcoming 0.13 release. Note that development
Expand Down Expand Up @@ -724,6 +724,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# Arrow GLib
# ==========
find_package(PkgConfig)
pkg_check_modules(ARROW_GLIB QUIET arrow-glib)
if(FLB_ARROW AND ARROW_GLIB_FOUND)
FLB_DEFINITION(FLB_HAVE_ARROW)
else()
set(FLB_ARROW OFF)
endif()

# Pthread Local Storage
# =====================
# By default we expect the compiler already support thread local storage
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ set(src
s3_multipart.c)

FLB_PLUGIN(out_s3 "${src}" "")

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
endif()
7 changes: 7 additions & 0 deletions plugins/out_s3/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
compress.c)

add_library(out-s3-arrow STATIC ${src})

target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS})
147 changes: 147 additions & 0 deletions plugins/out_s3/arrow/compress.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
*
* We use GLib binding to call Arrow functions (which is implemented
* in C++) from Fluent Bit.
*
* https://github.com/apache/arrow/tree/master/c_glib
*/

#include <arrow-glib/arrow-glib.h>
#include <inttypes.h>

/*
* GArrowTable is the central structure that represents "table" (a.k.a.
* data frame).
*/
static GArrowTable* parse_json(uint8_t *json, int size)
{
GArrowJSONReader *reader;
GArrowBuffer *buffer;
GArrowBufferInputStream *input;
GArrowJSONReadOptions *options;
GArrowTable *table;
GError *error = NULL;

buffer = garrow_buffer_new(json, size);
if (buffer == NULL) {
return NULL;
}

input = garrow_buffer_input_stream_new(buffer);
if (input == NULL) {
g_object_unref(buffer);
return NULL;
}

options = garrow_json_read_options_new();
if (options == NULL) {
g_object_unref(buffer);
g_object_unref(input);
return NULL;
}

reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
if (reader == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
return NULL;
}

table = garrow_json_reader_read(reader, &error);
if (table == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return NULL;
}
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return table;
}

static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
{
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GError *error = NULL;
gboolean success;

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_error_free(error);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(buffer);
return NULL;
}

success = garrow_table_write_as_feather(
table, GARROW_OUTPUT_STREAM(sink),
NULL, &error);
if (!success) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}
g_object_unref(sink);
return buffer;
}

int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json(json, size);
if (table == NULL) {
return -1;
}

buffer = table_to_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
13 changes: 13 additions & 0 deletions plugins/out_s3/arrow/compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This function converts out_s3 buffer into Apache Arrow format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/

int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
49 changes: 36 additions & 13 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include "s3.h"
#include "s3_store.h"

#ifdef FLB_HAVE_ARROW
#include "arrow/compress.h"
#endif

static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);
Expand Down Expand Up @@ -125,7 +129,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand All @@ -152,7 +156,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -678,17 +682,23 @@ static int cb_s3_init(struct flb_output_instance *ins,

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (strcmp((char *) tmp, "gzip") != 0) {
flb_plg_error(ctx->ins,
"'gzip' is currently the only supported value for 'compression'");
return -1;
} else if (ctx->use_put_object == FLB_FALSE) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}

ctx->compression = (char *) tmp;
if (strcmp(tmp, "gzip") == 0) {
ctx->compression = COMPRESS_GZIP;
}
#ifdef FLB_HAVE_ARROW
else if (strcmp(tmp, "arrow") == 0) {
ctx->compression = COMPRESS_ARROW;
}
#endif
else {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
}

tmp = flb_output_get_property("content_type", ins);
Expand Down Expand Up @@ -1261,15 +1271,27 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
} else {
}
#ifdef FLB_HAVE_ARROW
else if (ctx->compression == COMPRESS_ARROW) {
ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = compressed_body;
}
#endif
else {
final_body = body;
final_body_size = body_size;
}
Expand Down Expand Up @@ -1312,7 +1334,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
headers, num_headers);
if (ctx->compression != NULL) {
if (ctx->compression != COMPRESS_NONE) {
flb_free(compressed_body);
}
flb_free(headers);
Expand Down Expand Up @@ -2216,7 +2238,8 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"The Content-Encoding HTTP Header will be set to 'gzip'."
"The Content-Encoding HTTP Header will be set to 'gzip'. "
"If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
6 changes: 5 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

#define DEFAULT_UPLOAD_TIMEOUT 3600

#define COMPRESS_NONE 0
#define COMPRESS_GZIP 1
#define COMPRESS_ARROW 2

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down Expand Up @@ -107,13 +111,13 @@ struct flb_s3 {
char *endpoint;
char *sts_endpoint;
char *canned_acl;
char *compression;
char *content_type;
char *log_key;
int free_endpoint;
int use_put_object;
int send_content_md5;
int static_file_path;
int compression;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down

0 comments on commit 544fa89

Please sign in to comment.