Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have an ffi c example that actually reads data #203

Merged
merged 45 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
2009649
checkpoint
nicklan Apr 22, 2024
842cb15
checkpoint
nicklan Apr 29, 2024
e2ff7d2
checkpoint
nicklan Apr 30, 2024
d046fb2
can actually read data :)
nicklan May 13, 2024
d9f5c3f
actually print data and add partition cols
nicklan May 13, 2024
3430a47
fixup
nicklan May 13, 2024
77a1721
cleanup
nicklan May 13, 2024
9d94dbd
untab
nicklan May 13, 2024
57ac80e
minor clean
nicklan May 13, 2024
8704190
fmt
nicklan May 13, 2024
1ce74b5
fmt
nicklan May 14, 2024
b9435ab
reorg and add more comments
nicklan May 14, 2024
220493a
clean up error reporting + more diag
nicklan May 14, 2024
7d7c8b1
add comment
nicklan May 14, 2024
e753f79
clean up GError reporting and unref some data
nicklan May 14, 2024
ded945d
use same read_schema for whole scan
nicklan May 14, 2024
a3dd991
more unrefing
nicklan May 14, 2024
327190c
clean up read result iter
nicklan May 14, 2024
7b3f4ae
fix all the warnings
nicklan May 14, 2024
fbb018c
remove "interface"
nicklan May 14, 2024
d23b388
Merge branch 'main' into moar-ffi-data-reading
nicklan May 23, 2024
fa42973
fixups to make rust compile
nicklan May 23, 2024
44d1339
make it work without data reading/printing
nicklan May 23, 2024
c3af5c5
fix it all!
nicklan May 23, 2024
ade6d18
fmt
nicklan May 23, 2024
f3958a0
handle null partitions
nicklan May 31, 2024
1166cbb
Merge branch 'main' into moar-ffi-data-reading
nicklan May 31, 2024
56a068f
fix-ups for main merge
nicklan May 31, 2024
bf13aed
Merge branch 'main' into moar-ffi-data-reading
nicklan Jun 7, 2024
bebc003
fix for SchemaRef change
nicklan Jun 7, 2024
fba568e
try_from_slice takes a ref
nicklan Jun 7, 2024
2992873
Merge branch 'main' into moar-ffi-data-reading
nicklan Jun 19, 2024
c4ba31e
move arrow stuff to a .c file
nicklan Jun 19, 2024
84476cf
address some comments
nicklan Jun 19, 2024
d05fd74
drop_X -> free_X for methods exposed over FFI
nicklan Jun 20, 2024
47eaab0
cleanup and rename Mut->Exclusive
nicklan Jun 20, 2024
3fd52cd
no rename + no clone
nicklan Jun 20, 2024
21336aa
rename read_parquet_files -> read_parquet_file
nicklan Jun 20, 2024
8e01fa7
fix doc
nicklan Jun 20, 2024
5dfed10
fmt all the things!
nicklan Jun 20, 2024
afad39f
fix cffi-test.c
nicklan Jun 20, 2024
8a511f4
Update comment about slice->String
nicklan Jun 22, 2024
2c22869
Create `ExclusiveEngineData`
nicklan Jun 22, 2024
f497804
change control flow
nicklan Jun 24, 2024
7cfe23c
fix cff-test.c
nicklan Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ parse_deps = true
# only crates found in this list will ever be parsed.
#
# default: there is no allow-list (NOTE: this is the opposite of [])
include = ["delta_kernel"]
include = ["delta_kernel", "arrow-data", "arrow-schema"]
8 changes: 4 additions & 4 deletions ffi/cffi-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void visit_callback(void* engine_context,
}

void visit_data(void* engine_context,
EngineData* engine_data,
ExclusiveEngineData* engine_data,
const KernelBoolSlice selection_vec) {
visit_scan_data(engine_data, selection_vec, engine_context, visit_callback);
}
Expand Down Expand Up @@ -64,9 +64,9 @@ int test_engine(KernelStringSlice table_path_slice,
}
}

drop_scan(scan);
drop_snapshot(snapshot);
drop_engine(engine);
free_scan(scan);
free_snapshot(snapshot);
free_engine(engine);
return 0;
}

Expand Down
8 changes: 8 additions & 0 deletions ffi/examples/read-table/.clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
BasedOnStyle: Mozilla
ColumnLimit: 100
AlwaysBreakAfterReturnType: None
AlwaysBreakAfterDefinitionReturnType: None
AlignAfterOpenBracket: AlwaysBreak
...

8 changes: 7 additions & 1 deletion ffi/examples/read-table/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.12)
project(read_table)
option(PRINT_DATA "Print out the table data. Requires arrow-glib" ON)
add_executable(read_table read_table.c)
add_executable(read_table read_table.c arrow.c)
target_compile_definitions(read_table PUBLIC DEFINE_DEFAULT_ENGINE)
target_include_directories(read_table PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/ffi-headers")
target_link_directories(read_table PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/debug")
Expand All @@ -13,6 +13,12 @@ if(WIN32)
target_link_libraries(read_table PUBLIC ws2_32 userenv bcrypt ntdll)
endif(WIN32)

if(MSVC)
target_compile_options(read_table PRIVATE /W4 /WX)
else()
target_compile_options(read_table PRIVATE -Wall -Wextra -Wpedantic -Werror)
endif()

if(PRINT_DATA)
include(FindPkgConfig)
pkg_check_modules(ARROW_GLIB REQUIRED arrow-glib)
Expand Down
266 changes: 266 additions & 0 deletions ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
#include "arrow.h"
#include <stdio.h>
#include <string.h>

#ifdef PRINT_ARROW_DATA

ArrowContext* init_arrow_context()
{
ArrowContext* context = malloc(sizeof(ArrowContext));
context->num_batches = 0;
context->batches = NULL;
context->cur_filter = NULL;
return context;
}

// unref all the data in the context
void free_arrow_context(ArrowContext* context)
{
for (gsize i = 0; i < context->num_batches; i++) {
g_object_unref(context->batches[i]);
}
free(context);
}

// report and free an error if it's not NULL. Return true if error was not null, false otherwise
static bool report_g_error(char* msg, GError* error)
{
if (error != NULL) {
printf("%s: %s\n", msg, error->message);
g_error_free(error);
return true;
}
return false;
}

// Turn ffi formatted schema data into a GArrowSchema
static GArrowSchema* get_schema(FFI_ArrowSchema* schema)
{
GError* error = NULL;
GArrowSchema* garrow_schema = garrow_schema_import((gpointer)schema, &error);
report_g_error("Can't get schema", error);
return garrow_schema;
}

// Turn ffi formatted record batch data into a GArrowRecordBatch
static GArrowRecordBatch* get_record_batch(FFI_ArrowArray* array, GArrowSchema* schema)
{
GError* error = NULL;
GArrowRecordBatch* record_batch = garrow_record_batch_import((gpointer)array, schema, &error);
report_g_error("Can't get record batch", error);
return record_batch;
}

// Add columns to a record batch for each partition. In a "real" engine we would want to parse the
// string values into the correct data type. This program just adds all partition columns as strings
// for simplicity
static GArrowRecordBatch* add_partition_columns(
GArrowRecordBatch* record_batch,
PartitionList* partition_cols,
const CStringMap* partition_values)
{
gint64 rows = garrow_record_batch_get_n_rows(record_batch);
gint64 cols = garrow_record_batch_get_n_columns(record_batch);
GArrowRecordBatch* cur_record_batch = record_batch;
GError* error = NULL;
for (int i = 0; i < partition_cols->len; i++) {
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
partition_val ? partition_val : "NULL",
pos);
GArrowStringArrayBuilder* builder = garrow_string_array_builder_new();
for (gint64 i = 0; i < rows; i++) {
if (partition_val) {
garrow_string_array_builder_append_string(builder, partition_val, &error);
} else {
garrow_array_builder_append_null((GArrowArrayBuilder*)builder, &error);
}
if (report_g_error("Can't append to partition column builder", error)) {
break;
}
}

if (partition_val) {
free(partition_val);
}

if (error != NULL) {
printf("Giving up on column %s\n", col);
g_object_unref(builder);
error = NULL;
continue;
}

GArrowArray* ret = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
if (report_g_error("Can't build string array for parition column", error)) {
printf("Giving up on column %s\n", col);
g_object_unref(builder);
error = NULL;
continue;
}
g_object_unref(builder);

GArrowField* field = garrow_field_new(col, (GArrowDataType*)garrow_string_data_type_new());
GArrowRecordBatch* old_batch = cur_record_batch;
cur_record_batch = garrow_record_batch_add_column(old_batch, pos, field, ret, &error);
g_object_unref(old_batch);
if (cur_record_batch == NULL) {
if (error != NULL) {
printf("Could not add column at %u: %s\n", pos, error->message);
g_error_free(error);
}
}
}
return cur_record_batch;
}

// append a batch to our context
static void add_batch_to_context(
ArrowContext* context,
ArrowFFIData* arrow_data,
PartitionList* partition_cols,
const CStringMap* partition_values)
{
GArrowSchema* schema = get_schema(&arrow_data->schema);
GArrowRecordBatch* record_batch = get_record_batch(&arrow_data->array, schema);
if (context->cur_filter != NULL) {
GArrowRecordBatch* unfiltered = record_batch;
record_batch = garrow_record_batch_filter(unfiltered, context->cur_filter, NULL, NULL);
// unref the old batch and filter since we don't need them anymore
g_object_unref(unfiltered);
g_object_unref(context->cur_filter);
context->cur_filter = NULL;
}
record_batch = add_partition_columns(record_batch, partition_cols, partition_values);
if (record_batch == NULL) {
printf("Failed to add parition columns, not adding batch\n");
return;
}
context->batches =
realloc(context->batches, sizeof(GArrowRecordBatch*) * (context->num_batches + 1));
context->batches[context->num_batches] = record_batch;
context->num_batches++;
print_diag(
" Added batch to arrow context, have %i batches in context now\n", context->num_batches);
}

// convert to a garrow boolean array. can't use garrow_boolean_array_builder_append_values as that
// expects a gboolean*, which is actually an int* which is 4 bytes, but our slice is a C99 _Bool*
// which is 1 byte
static GArrowBooleanArray* slice_to_arrow_bool_array(const KernelBoolSlice slice)
{
GArrowBooleanArrayBuilder* builder = garrow_boolean_array_builder_new();
GError* error = NULL;
for (uintptr_t i = 0; i < slice.len; i++) {
gboolean val = slice.ptr[i] ? TRUE : FALSE;
garrow_boolean_array_builder_append_value(builder, val, &error);
if (report_g_error("Can't append to boolean builder", error)) {
g_object_unref(builder);
break;
}
}

if (error != NULL) {
return NULL;
}

GArrowArray* ret = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
g_object_unref(builder);
if (ret == NULL) {
printf("Error in building boolean array");
if (error != NULL) {
printf(": %s\n", error->message);
g_error_free(error);
} else {
printf(".\n");
}
}
return (GArrowBooleanArray*)ret;
}

// This is the callback that will be called for each chunk of data read from the parquet file
static void visit_read_data(void* vcontext, ExclusiveEngineData* data)
{
print_diag(" Converting read data to arrow\n");
struct EngineContext* context = vcontext;
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(data, context->engine);
if (arrow_res.tag != OkArrowFFIData) {
print_error("Failed to get arrow data.", (Error*)arrow_res.err);
free_error((Error*)arrow_res.err);
exit(-1);
}
ArrowFFIData* arrow_data = arrow_res.ok;
add_batch_to_context(
context->arrow_context, arrow_data, context->partition_cols, context->partition_values);
}

// We call this for each file we get called back to read in read_table.c::visit_callback
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector)
{
int full_len = strlen(context->table_root) + path.len + 1;
char* full_path = malloc(sizeof(char) * full_len);
snprintf(full_path, full_len, "%s%.*s", context->table_root, (int)path.len, path.ptr);
print_diag(" Reading parquet file at %s\n", full_path);
KernelStringSlice path_slice = { full_path, full_len };
FileMeta meta = {
.path = path_slice,
};
ExternResultHandleExclusiveFileReadResultIterator read_res =
read_parquet_file(context->engine, &meta, context->read_schema);
if (read_res.tag != OkHandleExclusiveFileReadResultIterator) {
printf("Couldn't read data\n");
return;
}
if (selection_vector.len > 0) {
GArrowBooleanArray* sel_array = slice_to_arrow_bool_array(selection_vector);
if (sel_array == NULL) {
printf("[WARN] Failed to get an arrow boolean array, selection vector will be ignored\n");
}
context->arrow_context->cur_filter = sel_array;
}
ExclusiveFileReadResultIterator* read_iter = read_res.ok;
for (;;) {
ExternResultbool ok_res = read_result_next(read_iter, context, visit_read_data);
if (ok_res.tag != Okbool) {
print_error("Failed to iterate read data.", (Error*)ok_res.err);
free_error((Error*)ok_res.err);
exit(-1);
} else if (!ok_res.ok) {
print_diag(" Done reading parquet file\n");
break;
}
}
free_read_result_iter(read_iter);
}

// Concat all our batches into a `GArrowTable`, call to_string on it, and print the result
void print_arrow_context(ArrowContext* context)
{
if (context->num_batches > 0) {
GError* error = NULL;
GArrowSchema* schema = garrow_record_batch_get_schema(context->batches[0]);
GArrowTable* table =
garrow_table_new_record_batches(schema, context->batches, context->num_batches, &error);
if (report_g_error("Can't create table from batches", error)) {
return;
}
gchar* out = garrow_table_to_string(table, &error);
if (!report_g_error("Can't turn table into string", error)) {
printf("\nTable Data:\n-----------\n\n%s\n", out);
g_free(out);
}
g_object_unref(table);
} else {
printf("[No data]\n");
}
}

#endif // PRINT_ARROW_DATA
Loading
Loading