Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better ways to use deletion vectors #215

Merged
merged 22 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9cd41f2
feat: better ways to use deletion vectors
hntd187 May 22, 2024
9edcce9
Merge branch 'main' into dv-utils
hntd187 May 22, 2024
6acb5f8
Merge branch 'main' into dv-utils
hntd187 May 22, 2024
6c726f5
feat: option to pass DVs over as just array of row indexes
hntd187 Jun 8, 2024
0c20f6b
Merge branch 'main' into dv-utils
hntd187 Jun 21, 2024
213c519
Updated and added some plumbing for the row index vector creation
hntd187 Jun 21, 2024
0da2dfb
chore: fmt
hntd187 Jun 21, 2024
ea36d70
Merge branch 'main' into dv-utils
hntd187 Jun 21, 2024
1c897c6
Merge branch 'main' into dv-utils
hntd187 Jun 27, 2024
b33d621
More deleting, a few doc comments, some cleanup
hntd187 Jun 27, 2024
c621634
Merge branch 'main' into dv-utils
hntd187 Jul 5, 2024
4b8e18f
Address PR feedback
hntd187 Jul 5, 2024
63bdbc6
Address PR feedback
hntd187 Jul 7, 2024
b92fcc0
Merge branch 'main' into dv-utils
hntd187 Jul 20, 2024
bfc7486
Address PR feedback, use dangling pointer for init/empty array instea…
hntd187 Jul 20, 2024
59c150b
Merge branch 'main' into dv-utils
hntd187 Jul 23, 2024
8e19022
Address PR feedback, use dangling pointer for init/empty array instea…
hntd187 Jul 25, 2024
265d1f1
Merge remote-tracking branch 'mine/dv-utils' into dv-utils
hntd187 Jul 25, 2024
90dc89f
Update kernel/src/actions/deletion_vector.rs
hntd187 Jul 26, 2024
c97c456
Address PR feedback, use dangling pointer for init/empty array instea…
hntd187 Jul 26, 2024
e68dc95
Merge remote-tracking branch 'mine/dv-utils' into dv-utils
hntd187 Jul 26, 2024
c9e08f2
Address PR feedback, as well as resolve some lints and nightly build …
hntd187 Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 74 additions & 8 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,30 @@ pub type AllocateStringFn = extern "C" fn(kernel_str: KernelStringSlice) -> Null
// Put KernelBoolSlice in a sub-module, with non-public members, so rust code cannot instantiate it
// directly. It can only be created by converting `From<Vec<bool>>`.
mod private {
use std::ptr::NonNull;

/// Represents an owned slice of boolean values allocated by the kernel. Any time the engine
/// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible
/// to free that slice, by calling [super::free_bool_slice] exactly once.
#[repr(C)]
pub struct KernelBoolSlice {
ptr: *mut bool,
ptr: NonNull<bool>,
len: usize,
}

/// An owned slice of u64 row indexes allocated by the kernel. The engine is responsible for
/// freeing this slice by calling [super::free_row_indexes] once.
#[repr(C)]
nicklan marked this conversation as resolved.
Show resolved Hide resolved
pub struct KernelRowIndexArray {
ptr: NonNull<u64>,
len: usize,
}

impl KernelBoolSlice {
/// Creates an empty slice.
pub fn empty() -> KernelBoolSlice {
KernelBoolSlice {
ptr: std::ptr::null_mut(),
ptr: NonNull::dangling(),
len: 0,
}
}
Expand All @@ -153,10 +163,10 @@ mod private {
/// The slice must have been originally created `From<Vec<bool>>`, and must not have been
/// already been consumed by a previous call to this method.
pub unsafe fn as_ref(&self) -> &[bool] {
if self.ptr.is_null() {
if self.len == 0 {
hntd187 marked this conversation as resolved.
Show resolved Hide resolved
Default::default()
} else {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
}
}

Expand All @@ -167,10 +177,10 @@ mod private {
/// The slice must have been originally created `From<Vec<bool>>`, and must not have been
/// already been consumed by a previous call to this method.
pub unsafe fn into_vec(self) -> Vec<bool> {
if self.ptr.is_null() {
if self.len == 0 {
Default::default()
} else {
Vec::from_raw_parts(self.ptr, self.len, self.len)
Vec::from_raw_parts(self.ptr.as_ptr(), self.len, self.len)
}
}
}
Expand All @@ -179,8 +189,12 @@ mod private {
fn from(val: Vec<bool>) -> Self {
let len = val.len();
let boxed = val.into_boxed_slice();
let ptr = Box::into_raw(boxed).cast();
KernelBoolSlice { ptr, len }
let ptr = Box::leak(boxed).as_mut_ptr();
if let Some(ptr) = NonNull::new(ptr) {
KernelBoolSlice { ptr, len }
} else {
KernelBoolSlice::empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the leaked pointer can ever be null, so None case here is impossible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't believe so, I am just pleasing the compiler here because internally a vec that hasn't allocated also won't produce a null here and I can't think of any situation in which it would realistically be null.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's a suitable From:

let ptr = Box::leak(boxed).into();
KernelBoolSlice { ptr, len }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping this would work, but we call .as_mut_ptr() to get the pointer to the inner buffer of the vec, NonNull::from() wants a &mut T which I am not aware of anyway to get a mutable reference to the inner buffer (only a pointer) that doesn't involve dereferencing the pointer above, which would add unsafe to the call. I would prefer to keep the check above in lieu of adding unsafe to the method, thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also keep in mind a lot of ways to refer to the vec in rust give pointers to the rust object of the vec, not the inner buffer, making this harder to get

Copy link
Collaborator

@scovich scovich Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! I had indeed forgotten the as_mut_ptr was a vec method (wrongly assumed it was just turning the leaked reference into a pointer). Agree there doesn't seem to be any good way to directly extract a NonNull from a slice, even tho it's guaranteed to exist (grr). All the examples just just expect to unwrap the option when passing a reference to NonNull::new, should we do that?

(the current code is confusing because it uses an empty slice as a fallback for an error case that can't actually arise in practice -- which could trick some future reader into thinking empty slices need some kind of special handling there)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility would be to leverage first_mut, but that's still annoying and ugly because it returns an Option<&mut T> (because it's UB to create a reference from a zero-length slice). So expect or the current code are probably the best we can do, barring a rust API change.

Copy link
Collaborator Author

@hntd187 hntd187 Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect should be okay because if it ever hits that error, we have a legitimate bug we should investigate.

}
}
}

Expand All @@ -190,13 +204,57 @@ mod private {
/// memory, but must only free it by calling [super::free_bool_slice]. Since the global
/// allocator is threadsafe, it doesn't matter which engine thread invokes that method.
unsafe impl Send for KernelBoolSlice {}
unsafe impl Send for KernelRowIndexArray {}
hntd187 marked this conversation as resolved.
Show resolved Hide resolved

/// # Safety
///
/// If engine chooses to leverage concurrency, engine is responsible to prevent data races.
unsafe impl Sync for KernelBoolSlice {}
/// # Safety
///
/// If engine chooses to leverage concurrency, engine is responsible to prevent data races.
/// Same contract as KernelBoolSlice above
unsafe impl Sync for KernelRowIndexArray {}
hntd187 marked this conversation as resolved.
Show resolved Hide resolved

impl KernelRowIndexArray {
/// Converts this slice back into a `Vec<u64>`.
///
/// # Safety
///
/// The slice must have been originally created `From<Vec<u64>>`, and must not have
/// already been consumed by a previous call to this method.
pub unsafe fn into_vec(self) -> Vec<u64> {
if self.len == 0 {
Default::default()
} else {
Vec::from_raw_parts(self.ptr.as_ptr(), self.len, self.len)
}
}

/// Creates an empty slice.
pub fn empty() -> KernelRowIndexArray {
Self {
ptr: NonNull::dangling(),
len: 0,
}
}
}

impl From<Vec<u64>> for KernelRowIndexArray {
fn from(vec: Vec<u64>) -> Self {
let len = vec.len();
let boxed = vec.into_boxed_slice();
let ptr = Box::leak(boxed).as_mut_ptr();
if let Some(ptr) = NonNull::new(ptr) {
KernelRowIndexArray { ptr, len }
} else {
KernelRowIndexArray::empty()
}
}
}
}
pub use private::KernelBoolSlice;
pub use private::KernelRowIndexArray;

/// # Safety
///
Expand All @@ -207,6 +265,14 @@ pub unsafe extern "C" fn free_bool_slice(slice: KernelBoolSlice) {
debug!("Dropping bool slice. It is {vec:#?}");
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
#[no_mangle]
pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) {
let _ = slice.into_vec();
}

// TODO: Do we want this handle at all? Perhaps we should just _always_ pass raw *mut c_void pointers
// that are the engine data? Even if we want the type, should it be a shared handle instead?
/// an opaque struct that encapsulates data read by an engine. this handle can be passed back into
Expand Down
31 changes: 29 additions & 2 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use url::Url;
use crate::{
unwrap_kernel_expression, AllocateStringFn, EnginePredicate, ExclusiveEngineData, ExternEngine,
ExternResult, IntoExternResult, KernelBoolSlice, KernelExpressionVisitorState,
KernelStringSlice, NullableCvoid, SharedExternEngine, SharedSnapshot, StringIter,
StringSliceIterator, TryFromStringSlice,
KernelRowIndexArray, KernelStringSlice, NullableCvoid, SharedExternEngine, SharedSnapshot,
StringIter, StringSliceIterator, TryFromStringSlice,
};

use super::handle::Handle;
Expand Down Expand Up @@ -391,6 +391,33 @@ fn selection_vector_from_dv_impl(
}
}

/// Get a vector of row indexes out of a [`DvInfo`] struct
///
/// # Safety
/// Engine is responsible for providing valid pointers for each argument
#[no_mangle]
pub unsafe extern "C" fn row_indexes_from_dv(
dv_info: &DvInfo,
engine: Handle<SharedExternEngine>,
state: Handle<SharedGlobalScanState>,
) -> ExternResult<KernelRowIndexArray> {
let state = unsafe { state.as_ref() };
let engine = unsafe { engine.as_ref() };
row_indexes_from_dv_impl(dv_info, engine, state).into_extern_result(&engine)
}

fn row_indexes_from_dv_impl(
dv_info: &DvInfo,
extern_engine: &dyn ExternEngine,
state: &GlobalScanState,
) -> DeltaResult<KernelRowIndexArray> {
let root_url = Url::parse(&state.table_root)?;
match dv_info.get_row_indexes(extern_engine.engine().as_ref(), &root_url)? {
Some(v) => Ok(v.into()),
None => Ok(KernelRowIndexArray::empty()),
}
}

// Wrapper function that gets called by the kernel, transforms the arguments to make the ffi-able,
// and then calls the ffi specified callback
fn rust_callback(
Expand Down
28 changes: 25 additions & 3 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::io::{Cursor, Read};
use std::sync::Arc;

use bytes::Bytes;
use delta_kernel_derive::Schema;
use roaring::RoaringTreemap;
use url::Url;

use delta_kernel_derive::Schema;

use crate::utils::require;
use crate::{DeltaResult, Error, FileSystemClient};

Expand Down Expand Up @@ -165,6 +166,14 @@ impl DeletionVectorDescriptor {
}
}
}

pub fn row_indexes(
&self,
fs_client: Arc<dyn FileSystemClient>,
parent: &Url,
) -> DeltaResult<Vec<u64>> {
Ok(self.read(fs_client, parent)?.into_iter().collect())
}
}

enum Endian {
Expand Down Expand Up @@ -249,13 +258,14 @@ pub fn split_vector(

#[cfg(test)]
mod tests {
use roaring::RoaringTreemap;
use std::path::PathBuf;

use super::*;
use roaring::RoaringTreemap;

use crate::{engine::sync::SyncEngine, Engine};

use super::DeletionVectorDescriptor;
use super::*;

fn dv_relative() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
Expand Down Expand Up @@ -382,4 +392,16 @@ mod tests {
expected[4294967300] = false;
assert_eq!(bools, expected);
}

#[test]
fn test_dv_wrapper() {
hntd187 marked this conversation as resolved.
Show resolved Hide resolved
let example = dv_inline();
let sync_engine = SyncEngine::new();
let fs_client = sync_engine.get_file_system_client();
let parent = Url::parse("http://not.used").unwrap();
let row_idx = example.row_indexes(fs_client, &parent).unwrap();

assert_eq!(row_idx.len(), 6);
assert_eq!(&row_idx, &[3, 4, 7, 11, 18, 29]);
}
}
2 changes: 1 addition & 1 deletion kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod fs_client;
pub(crate) mod json;
mod parquet;

/// This is a simple implemention of [`Engine`]. It only supports reading data from the local
/// This is a simple implementation of [`Engine`]. It only supports reading data from the local
/// filesystem, and internally represents data using `Arrow`.
pub struct SyncEngine {
fs_client: Arc<fs_client::SyncFilesystemClient>,
Expand Down
15 changes: 15 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ impl DvInfo {
.transpose()?;
Ok(dv_treemap.map(treemap_to_bools))
}

hntd187 marked this conversation as resolved.
Show resolved Hide resolved
/// Returns a vector of row indexes that should be *removed* from the result set
pub fn get_row_indexes(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<u64>>> {
self.deletion_vector
.as_ref()
.map(|dv| {
let fs_client = engine.get_file_system_client();
dv.row_indexes(fs_client, table_root)
})
.transpose()
}
}

pub type ScanCallback<T> = fn(
Expand Down
Loading