Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: add Apache Arrow support #3184

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ option(FLB_STREAM_PROCESSOR "Enable Stream Processor" Yes)
option(FLB_CORO_STACK_SIZE "Set coroutine stack size")
option(FLB_AVRO_ENCODER "Build with Avro encoding support" No)
option(FLB_AWS_ERROR_REPORTER "Build with aws error reporting support" No)

option(FLB_ARROW "Build with Apache Arrow support" No)

# Metrics: Experimental Feature, disabled by default on 0.12 series
# but enabled in the upcoming 0.13 release. Note that development
Expand Down Expand Up @@ -724,6 +724,16 @@ if(FLB_OUT_PGSQL AND (NOT PostgreSQL_FOUND))
FLB_OPTION(FLB_OUT_PGSQL OFF)
endif()

# Arrow GLib
# ==========
find_package(PkgConfig)
pkg_check_modules(ARROW_GLIB QUIET arrow-glib)
if(FLB_ARROW AND ARROW_GLIB_FOUND)
FLB_DEFINITION(FLB_HAVE_ARROW)
else()
set(FLB_ARROW OFF)
endif()

# Pthread Local Storage
# =====================
# By default we expect the compiler already support thread local storage
Expand Down
5 changes: 5 additions & 0 deletions plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ set(src
s3_multipart.c)

FLB_PLUGIN(out_s3 "${src}" "")

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-plugin-out_s3 out-s3-arrow)
endif()
7 changes: 7 additions & 0 deletions plugins/out_s3/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
compress.c)

add_library(out-s3-arrow STATIC ${src})

target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS})
147 changes: 147 additions & 0 deletions plugins/out_s3/arrow/compress.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
*
* We use GLib binding to call Arrow functions (which is implemented
* in C++) from Fluent Bit.
*
* https://github.com/apache/arrow/tree/master/c_glib
*/

#include <arrow-glib/arrow-glib.h>
#include <inttypes.h>

/*
* GArrowTable is the central structure that represents "table" (a.k.a.
* data frame).
*/
static GArrowTable* parse_json(uint8_t *json, int size)
{
GArrowJSONReader *reader;
GArrowBuffer *buffer;
GArrowBufferInputStream *input;
GArrowJSONReadOptions *options;
GArrowTable *table;
GError *error = NULL;

buffer = garrow_buffer_new(json, size);
if (buffer == NULL) {
return NULL;
}

input = garrow_buffer_input_stream_new(buffer);
if (input == NULL) {
g_object_unref(buffer);
return NULL;
}

options = garrow_json_read_options_new();
if (options == NULL) {
g_object_unref(buffer);
g_object_unref(input);
return NULL;
}

reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
if (reader == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
return NULL;
}

table = garrow_json_reader_read(reader, &error);
if (table == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return NULL;
}
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return table;
}

static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
{
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GError *error = NULL;
gboolean success;

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_error_free(error);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(buffer);
return NULL;
}

success = garrow_table_write_as_feather(
table, GARROW_OUTPUT_STREAM(sink),
NULL, &error);
if (!success) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}
g_object_unref(sink);
return buffer;
}

int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json(json, size);
if (table == NULL) {
return -1;
}

buffer = table_to_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
13 changes: 13 additions & 0 deletions plugins/out_s3/arrow/compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This function converts out_s3 buffer into Apache Arrow format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/

int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
49 changes: 36 additions & 13 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "s3.h"
#include "s3_store.h"

#ifdef FLB_HAVE_ARROW
#include "arrow/compress.h"
#endif

static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
struct s3_file *chunk,
char **out_buf, size_t *out_size);
Expand Down Expand Up @@ -122,7 +126,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand All @@ -149,7 +153,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
s3_headers[n] = content_encoding_header;
n++;
}
Expand Down Expand Up @@ -515,17 +519,23 @@ static int cb_s3_init(struct flb_output_instance *ins,

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (strcmp((char *) tmp, "gzip") != 0) {
flb_plg_error(ctx->ins,
"'gzip' is currently the only supported value for 'compression'");
return -1;
} else if (ctx->use_put_object == FLB_FALSE) {
if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}

ctx->compression = (char *) tmp;
if (strcmp(tmp, "gzip") == 0) {
ctx->compression = COMPRESS_GZIP;
}
#ifdef FLB_HAVE_ARROW
else if (strcmp(tmp, "arrow") == 0) {
ctx->compression = COMPRESS_ARROW;
}
#endif
else {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
}

tmp = flb_output_get_property("content_type", ins);
Expand Down Expand Up @@ -1090,15 +1100,27 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != NULL) {
if (ctx->compression == COMPRESS_GZIP) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
} else {
}
#ifdef FLB_HAVE_ARROW
else if (ctx->compression == COMPRESS_ARROW) {
ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = compressed_body;
}
#endif
else {
final_body = body;
final_body_size = body_size;
}
Expand Down Expand Up @@ -1128,7 +1150,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
headers, num_headers);
if (ctx->compression != NULL) {
if (ctx->compression != COMPRESS_NONE) {
flb_free(compressed_body);
}
flb_free(headers);
Expand Down Expand Up @@ -1602,7 +1624,8 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"The Content-Encoding HTTP Header will be set to 'gzip'."
"The Content-Encoding HTTP Header will be set to 'gzip'. "
"If Apache Arrow was enabled at compile time, you can set 'arrow' to this option."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
6 changes: 5 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

#define DEFAULT_UPLOAD_TIMEOUT 3600

#define COMPRESS_NONE 0
#define COMPRESS_GZIP 1
#define COMPRESS_ARROW 2

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down Expand Up @@ -95,11 +99,11 @@ struct flb_s3 {
char *endpoint;
char *sts_endpoint;
char *canned_acl;
char *compression;
char *content_type;
int free_endpoint;
int use_put_object;
int send_content_md5;
int compression;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand Down