-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
parser: json: fix type confusion bug (#3417)
Signed-off-by: davkor <[email protected]>
- Loading branch information
1 parent
9c992e4
commit fdb7147
Showing
8 changed files
with
234 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
set(src | ||
compress.c) | ||
|
||
add_library(out-s3-arrow STATIC ${src}) | ||
|
||
target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) | ||
target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* This converts S3 plugin's request buffer into Apache Arrow format. | ||
* | ||
* We use GLib binding to call Arrow functions (which is implemented | ||
* in C++) from Fluent Bit. | ||
* | ||
* https://github.com/apache/arrow/tree/master/c_glib | ||
*/ | ||
|
||
#include <arrow-glib/arrow-glib.h> | ||
#include <inttypes.h> | ||
|
||
/* | ||
* GArrowTable is the central structure that represents "table" (a.k.a. | ||
* data frame). | ||
*/ | ||
static GArrowTable* parse_json(uint8_t *json, int size) | ||
{ | ||
GArrowJSONReader *reader; | ||
GArrowBuffer *buffer; | ||
GArrowBufferInputStream *input; | ||
GArrowJSONReadOptions *options; | ||
GArrowTable *table; | ||
GError *error = NULL; | ||
|
||
buffer = garrow_buffer_new(json, size); | ||
if (buffer == NULL) { | ||
return NULL; | ||
} | ||
|
||
input = garrow_buffer_input_stream_new(buffer); | ||
if (input == NULL) { | ||
g_object_unref(buffer); | ||
return NULL; | ||
} | ||
|
||
options = garrow_json_read_options_new(); | ||
if (options == NULL) { | ||
g_print("fail to create json options: %s\n", error->message); | ||
g_object_unref(buffer); | ||
g_object_unref(input); | ||
return NULL; | ||
} | ||
|
||
reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); | ||
if (reader == NULL) { | ||
g_print("cannot create json reader: %s\n", error->message); | ||
g_error_free(error); | ||
g_object_unref(buffer); | ||
g_object_unref(input); | ||
g_object_unref(options); | ||
return NULL; | ||
} | ||
|
||
table = garrow_json_reader_read(reader, &error); | ||
if (table == NULL) { | ||
g_print("cannot parse json: %s\n", error->message); | ||
g_error_free(error); | ||
g_object_unref(buffer); | ||
g_object_unref(input); | ||
g_object_unref(options); | ||
g_object_unref(reader); | ||
return NULL; | ||
} | ||
g_object_unref(buffer); | ||
g_object_unref(input); | ||
g_object_unref(options); | ||
g_object_unref(reader); | ||
return table; | ||
} | ||
|
||
static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) | ||
{ | ||
GArrowResizableBuffer *buffer; | ||
GArrowBufferOutputStream *sink; | ||
GError *error = NULL; | ||
gboolean success; | ||
|
||
buffer = garrow_resizable_buffer_new(0, &error); | ||
if (buffer == NULL) { | ||
g_print("cannot create buffer: %s\n", error->message); | ||
g_error_free(error); | ||
return NULL; | ||
} | ||
|
||
sink = garrow_buffer_output_stream_new(buffer); | ||
if (sink == NULL) { | ||
g_object_unref(buffer); | ||
return NULL; | ||
} | ||
|
||
success = garrow_table_write_as_feather( | ||
table, GARROW_OUTPUT_STREAM(sink), | ||
NULL, &error); | ||
if (!success) { | ||
g_print("cannot output table: %s\n", error->message); | ||
g_error_free(error); | ||
g_object_unref(buffer); | ||
g_object_unref(sink); | ||
return NULL; | ||
} | ||
g_object_unref(sink); | ||
return buffer; | ||
} | ||
|
||
int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) | ||
{ | ||
GArrowTable *table; | ||
GArrowResizableBuffer *buffer; | ||
GBytes *bytes; | ||
gconstpointer ptr; | ||
gsize len; | ||
uint8_t *buf; | ||
|
||
table = parse_json(json, size); | ||
if (table == NULL) { | ||
return -1; | ||
} | ||
|
||
buffer = table_to_buffer(table); | ||
g_object_unref(table); | ||
if (buffer == NULL) { | ||
return -1; | ||
} | ||
|
||
bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); | ||
if (bytes == NULL) { | ||
g_object_unref(buffer); | ||
return -1; | ||
} | ||
|
||
ptr = g_bytes_get_data(bytes, &len); | ||
if (ptr == NULL) { | ||
g_object_unref(buffer); | ||
g_bytes_unref(bytes); | ||
return -1; | ||
} | ||
|
||
buf = malloc(len); | ||
if (buf == NULL) { | ||
g_object_unref(buffer); | ||
g_bytes_unref(bytes); | ||
return -1; | ||
} | ||
memcpy(buf, ptr, len); | ||
*out_buf = (void *) buf; | ||
*out_size = len; | ||
|
||
g_object_unref(buffer); | ||
g_bytes_unref(bytes); | ||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
/* | ||
* This function converts out_s3 buffer into Apache Arrow format. | ||
* | ||
* `json` is a string that contain (concatenated) JSON objects. | ||
* | ||
* `size` is the length of the json data (excluding the trailing | ||
* null-terminator character). | ||
* | ||
* Return 0 on success (with `out_buf` and `out_size` updated), | ||
* and -1 on failure | ||
*/ | ||
|
||
int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters