Skip to content

Commit

Permalink
Merge pull request #215 from hntd187/dv-utils
Browse files Browse the repository at this point in the history
feat: better ways to use deletion vectors
  • Loading branch information
hntd187 authored Jul 26, 2024
2 parents 1258a88 + c9e08f2 commit ec934e8
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 29 deletions.
77 changes: 70 additions & 7 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,30 @@ pub type AllocateStringFn = extern "C" fn(kernel_str: KernelStringSlice) -> Null
// Put KernelBoolSlice in a sub-module, with non-public members, so rust code cannot instantiate it
// directly. It can only be created by converting `From<Vec<bool>>`.
mod private {
use std::ptr::NonNull;

/// Represents an owned slice of boolean values allocated by the kernel. Any time the engine
/// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible
/// to free that slice, by calling [super::free_bool_slice] exactly once.
#[repr(C)]
pub struct KernelBoolSlice {
ptr: *mut bool,
ptr: NonNull<bool>,
len: usize,
}

/// An owned slice of u64 row indexes allocated by the kernel. The engine is responsible for
/// freeing this slice by calling [super::free_row_indexes] once.
#[repr(C)]
pub struct KernelRowIndexArray {
ptr: NonNull<u64>,
len: usize,
}

impl KernelBoolSlice {
/// Creates an empty slice.
pub fn empty() -> KernelBoolSlice {
KernelBoolSlice {
ptr: std::ptr::null_mut(),
ptr: NonNull::dangling(),
len: 0,
}
}
Expand All @@ -153,10 +163,10 @@ mod private {
/// The slice must have been originally created `From<Vec<bool>>`, and must not have been
/// already been consumed by a previous call to this method.
pub unsafe fn as_ref(&self) -> &[bool] {
if self.ptr.is_null() {
if self.len == 0 {
Default::default()
} else {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
}
}

Expand All @@ -167,10 +177,10 @@ mod private {
/// The slice must have been originally created `From<Vec<bool>>`, and must not have been
/// already been consumed by a previous call to this method.
pub unsafe fn into_vec(self) -> Vec<bool> {
if self.ptr.is_null() {
if self.len == 0 {
Default::default()
} else {
Vec::from_raw_parts(self.ptr, self.len, self.len)
Vec::from_raw_parts(self.ptr.as_ptr(), self.len, self.len)
}
}
}
Expand All @@ -179,7 +189,9 @@ mod private {
fn from(val: Vec<bool>) -> Self {
let len = val.len();
let boxed = val.into_boxed_slice();
let ptr = Box::into_raw(boxed).cast();
let leaked_ptr = Box::leak(boxed).as_mut_ptr();
let ptr = NonNull::new(leaked_ptr)
.expect("This should never be non-null please report this bug.");
KernelBoolSlice { ptr, len }
}
}
Expand All @@ -190,13 +202,56 @@ mod private {
/// memory, but must only free it by calling [super::free_bool_slice]. Since the global
/// allocator is threadsafe, it doesn't matter which engine thread invokes that method.
unsafe impl Send for KernelBoolSlice {}
/// # Safety
///
/// This follows the same contract as KernelBoolSlice above, engine assumes ownership of the
/// slice memory, but must only free it by calling [super::free_row_indexes]. It does not matter
/// from which thread the engine invoke that method
unsafe impl Send for KernelRowIndexArray {}

/// # Safety
///
/// If engine chooses to leverage concurrency, engine is responsible to prevent data races.
unsafe impl Sync for KernelBoolSlice {}
/// # Safety
///
/// If engine chooses to leverage concurrency, engine is responsible to prevent data races.
/// Same contract as KernelBoolSlice above
unsafe impl Sync for KernelRowIndexArray {}

impl KernelRowIndexArray {
/// Converts this slice back into a `Vec<u64>`.
///
/// # Safety
///
/// The slice must have been originally created `From<Vec<u64>>`, and must not have
/// already been consumed by a previous call to this method.
pub unsafe fn into_vec(self) -> Vec<u64> {
Vec::from_raw_parts(self.ptr.as_ptr(), self.len, self.len)
}

/// Creates an empty slice.
pub fn empty() -> KernelRowIndexArray {
Self {
ptr: NonNull::dangling(),
len: 0,
}
}
}

impl From<Vec<u64>> for KernelRowIndexArray {
fn from(vec: Vec<u64>) -> Self {
let len = vec.len();
let boxed = vec.into_boxed_slice();
let leaked_ptr = Box::leak(boxed).as_mut_ptr();
let ptr = NonNull::new(leaked_ptr)
.expect("This should never be non-null please report this bug.");
KernelRowIndexArray { ptr, len }
}
}
}
pub use private::KernelBoolSlice;
pub use private::KernelRowIndexArray;

/// # Safety
///
Expand All @@ -207,6 +262,14 @@ pub unsafe extern "C" fn free_bool_slice(slice: KernelBoolSlice) {
debug!("Dropping bool slice. It is {vec:#?}");
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
#[no_mangle]
pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) {
let _ = slice.into_vec();
}

// TODO: Do we want this handle at all? Perhaps we should just _always_ pass raw *mut c_void pointers
// that are the engine data? Even if we want the type, should it be a shared handle instead?
/// an opaque struct that encapsulates data read by an engine. this handle can be passed back into
Expand Down
31 changes: 29 additions & 2 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use url::Url;
use crate::{
unwrap_kernel_expression, AllocateStringFn, EnginePredicate, ExclusiveEngineData, ExternEngine,
ExternResult, IntoExternResult, KernelBoolSlice, KernelExpressionVisitorState,
KernelStringSlice, NullableCvoid, SharedExternEngine, SharedSnapshot, StringIter,
StringSliceIterator, TryFromStringSlice,
KernelRowIndexArray, KernelStringSlice, NullableCvoid, SharedExternEngine, SharedSnapshot,
StringIter, StringSliceIterator, TryFromStringSlice,
};

use super::handle::Handle;
Expand Down Expand Up @@ -391,6 +391,33 @@ fn selection_vector_from_dv_impl(
}
}

/// Get a vector of row indexes out of a [`DvInfo`] struct
///
/// # Safety
/// Engine is responsible for providing valid pointers for each argument
#[no_mangle]
pub unsafe extern "C" fn row_indexes_from_dv(
dv_info: &DvInfo,
engine: Handle<SharedExternEngine>,
state: Handle<SharedGlobalScanState>,
) -> ExternResult<KernelRowIndexArray> {
let state = unsafe { state.as_ref() };
let engine = unsafe { engine.as_ref() };
row_indexes_from_dv_impl(dv_info, engine, state).into_extern_result(&engine)
}

fn row_indexes_from_dv_impl(
dv_info: &DvInfo,
extern_engine: &dyn ExternEngine,
state: &GlobalScanState,
) -> DeltaResult<KernelRowIndexArray> {
let root_url = Url::parse(&state.table_root)?;
match dv_info.get_row_indexes(extern_engine.engine().as_ref(), &root_url)? {
Some(v) => Ok(v.into()),
None => Ok(KernelRowIndexArray::empty()),
}
}

// Wrapper function that gets called by the kernel, transforms the arguments to make the ffi-able,
// and then calls the ffi specified callback
fn rust_callback(
Expand Down
2 changes: 1 addition & 1 deletion kernel/build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use rustc_version::{version_meta, Channel};

fn main() {
println!("cargo::rustc-check-cfg=cfg(NIGHTLY_CHANNEL)");
// note if we're on the nightly channel so we can enable doc_auto_cfg if so
if let Channel::Nightly = version_meta().unwrap().channel {
println!("cargo::rustc-check-cfg=cfg(NIGHTLY_CHANNEL)");
println!("cargo:rustc-cfg=NIGHTLY_CHANNEL");
}
}
28 changes: 25 additions & 3 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::io::{Cursor, Read};
use std::sync::Arc;

use bytes::Bytes;
use delta_kernel_derive::Schema;
use roaring::RoaringTreemap;
use url::Url;

use delta_kernel_derive::Schema;

use crate::utils::require;
use crate::{DeltaResult, Error, FileSystemClient};

Expand Down Expand Up @@ -165,6 +166,14 @@ impl DeletionVectorDescriptor {
}
}
}

pub fn row_indexes(
&self,
fs_client: Arc<dyn FileSystemClient>,
parent: &Url,
) -> DeltaResult<Vec<u64>> {
Ok(self.read(fs_client, parent)?.into_iter().collect())
}
}

enum Endian {
Expand Down Expand Up @@ -249,13 +258,14 @@ pub fn split_vector(

#[cfg(test)]
mod tests {
use roaring::RoaringTreemap;
use std::path::PathBuf;

use super::*;
use roaring::RoaringTreemap;

use crate::{engine::sync::SyncEngine, Engine};

use super::DeletionVectorDescriptor;
use super::*;

fn dv_relative() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
Expand Down Expand Up @@ -382,4 +392,16 @@ mod tests {
expected[4294967300] = false;
assert_eq!(bools, expected);
}

#[test]
fn test_dv_row_indexes() {
let example = dv_inline();
let sync_engine = SyncEngine::new();
let fs_client = sync_engine.get_file_system_client();
let parent = Url::parse("http://not.used").unwrap();
let row_idx = example.row_indexes(fs_client, &parent).unwrap();

assert_eq!(row_idx.len(), 6);
assert_eq!(&row_idx, &[3, 4, 7, 11, 18, 29]);
}
}
4 changes: 2 additions & 2 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ impl ArrowEngineData {
/// # Arguments
///
/// * `out_col_array` - the vec that leaf values will be pushed onto. it is passed as an arg to
/// make the recursion below easier. if we returned a [`Vec`] we would have to `extend` it each
/// time we encountered a struct and made the recursive call.
/// make the recursion below easier. if we returned a [`Vec`] we would have to `extend` it each
/// time we encountered a struct and made the recursive call.
/// * `schema` - the schema to extract getters for
pub fn extract_columns<'a>(
&'a self,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod fs_client;
pub(crate) mod json;
mod parquet;

/// This is a simple implemention of [`Engine`]. It only supports reading data from the local
/// This is a simple implementation of [`Engine`]. It only supports reading data from the local
/// filesystem, and internally represents data using `Arrow`.
pub struct SyncEngine {
fs_client: Arc<fs_client::SyncFilesystemClient>,
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! Delta-kernel-rs is an experimental [Delta](https://github.com/delta-io/delta/) implementation
//! focused on interoperability with a wide range of query engines. It currently only supports
//! reads. This library defines a number of traits which must be implemented to provide a
//! working "delta reader". The are detailed below. There is a provided "default engine" that
//! implenents all these traits and can be used to ease integration work. See
//! working "delta reader". They are detailed below. There is a provided "default engine" that
//! implements all these traits and can be used to ease integration work. See
//! [`DefaultEngine`](engine/default/index.html) for more information.
//!
//! A full `rust` example for reading table data using the default engine can be found
Expand All @@ -26,7 +26,7 @@
//! ## File system interactions
//!
//! Delta Kernel needs to perform some basic operations against file systems like listing and reading files.
//! These interactions are encapsulated in the [`FileSystemClient`] trait. Implementors must take take
//! These interactions are encapsulated in the [`FileSystemClient`] trait. Implementors must take
//! care that all assumptions on the behavior if the functions - like sorted results - are respected.
//!
//! ## Reading log and data files
Expand Down
22 changes: 12 additions & 10 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use itertools::Itertools;
use tracing::debug;
use url::Url;

use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;
use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::column_mapping::ColumnMappingMode;
Expand All @@ -18,6 +16,9 @@ use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};

use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;

mod data_skipping;
pub mod log_replay;
pub mod state;
Expand Down Expand Up @@ -183,14 +184,14 @@ impl Scan {
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if
/// possible). Each item in the returned iterator is a tuple of:
/// - `Box<dyn EngineData>`: Data in engine format, where each row represents a file to be
/// scanned. The schema for each row can be obtained by calling [`scan_row_schema`].
/// scanned. The schema for each row can be obtained by calling [`scan_row_schema`].
/// - `Vec<bool>`: A selection vector. If a row is at index `i` and this vector is `false` at
/// index `i`, then that row should *not* be processed (i.e. it is filtered out). If the vector
/// is `true` at index `i` the row *should* be processed. If the selector vector is *shorter*
/// than the number of rows returned, missing elements are considered `true`, i.e. included in
/// the query. NB: If you are using the default engine and plan to call arrow's
/// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or
/// arrow will drop the extra rows.
/// index `i`, then that row should *not* be processed (i.e. it is filtered out). If the vector
/// is `true` at index `i` the row *should* be processed. If the selector vector is *shorter*
/// than the number of rows returned, missing elements are considered `true`, i.e. included in
/// the query. NB: If you are using the default engine and plan to call arrow's
/// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or
/// arrow will drop the extra rows.
pub fn scan_data(
&self,
engine: &dyn Engine,
Expand Down Expand Up @@ -579,11 +580,12 @@ pub(crate) mod test_utils {
mod tests {
use std::path::PathBuf;

use super::*;
use crate::engine::sync::SyncEngine;
use crate::schema::PrimitiveType;
use crate::Table;

use super::*;

fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
let scan_data = scan.scan_data(engine)?;
fn scan_data_callback(
Expand Down
15 changes: 15 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ impl DvInfo {
.transpose()?;
Ok(dv_treemap.map(treemap_to_bools))
}

/// Returns a vector of row indexes that should be *removed* from the result set
pub fn get_row_indexes(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<u64>>> {
self.deletion_vector
.as_ref()
.map(|dv| {
let fs_client = engine.get_file_system_client();
dv.row_indexes(fs_client, table_root)
})
.transpose()
}
}

pub type ScanCallback<T> = fn(
Expand Down

0 comments on commit ec934e8

Please sign in to comment.