Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose top level stats in scans #227

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ffi/cffi-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
void visit_callback(void* engine_context,
KernelStringSlice path,
int64_t size,
const Stats* stats,
const DvInfo* dv_info,
const CStringMap* partition_values) {
printf("file: %.*s\n", (int)path.len, path.ptr);
printf("file: %.*s (size: %" PRId64 ", num_records:", (int)path.len, path.ptr, size);
if (stats) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: should this code also print dv info?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm considering cffi-test.c "deprecated". As soon as #203 merges, I'll remove cffi-test.c and switch our CI over to run the more complete example there, which does look at the dv info

printf("%" PRId64 ")\n", stats->num_records);
} else {
printf(" [no stats])\n");
}
}

void visit_data(void* engine_context,
Expand Down
16 changes: 16 additions & 0 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,21 @@ pub unsafe extern "C" fn kernel_scan_data_free(data: Handle<SharedScanDataIterat
data.drop_handle();
}

/// Give engines an easy way to consume stats
#[repr(C)]
pub struct Stats {
/// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the
/// `num_records` statistic must be present and accurate, and must equal the number of records
/// in the data file. In the presence of Deletion Vectors the statistics may be somewhat
/// outdated, i.e. not reflecting deleted rows yet.
pub num_records: u64,
}

type CScanCallback = extern "C" fn(
engine_context: NullableCvoid,
path: KernelStringSlice,
size: i64,
stats: Option<&Stats>,
dv_info: &DvInfo,
partition_map: &CStringMap,
);
Expand Down Expand Up @@ -336,16 +347,21 @@ fn rust_callback(
context: &mut ContextWrapper,
path: &str,
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
values: partition_values,
};
let stats = kernel_stats.map(|ks| Stats {
num_records: ks.num_records,
});
(context.callback)(
context.engine_context,
path.into(),
size,
stats.as_ref(),
&dv_info,
&partition_map,
);
Expand Down
3 changes: 2 additions & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState};
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::{transform_to_logical, ScanBuilder};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
Expand Down Expand Up @@ -94,6 +94,7 @@ fn send_scan_file(
scan_tx: &mut spmc::Sender<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ macro_rules! impl_default_get {
$(
fn $name(&'a self, _row_index: usize, field_name: &str) -> DeltaResult<Option<$typ>> {
debug!("Asked for type {} on {field_name}, but using default error impl.", stringify!($typ));
Err(Error::UnexpectedColumnType(format!("{field_name} is not of type {}", stringify!($typ))))
Err(Error::UnexpectedColumnType(format!("{field_name} is not of type {}", stringify!($typ))).with_backtrace())
}
)*
};
Expand Down
10 changes: 8 additions & 2 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ impl DataVisitor for AddRemoveVisitor {

lazy_static! {
// NB: If you update this schema, ensure you update the comment describing it in the doc comment
// for `scan_row_schema` in scan/mod.rs!
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
// indexes will be off
pub(crate) static ref SCAN_ROW_SCHEMA: Arc<StructType> = Arc::new(StructType::new(vec!(
StructField::new("path", DataType::STRING, true),
StructField::new("size", DataType::LONG, true),
StructField::new("modificationTime", DataType::LONG, true),
StructField::new("stats", DataType::STRING, true),
StructField::new(
"deletionVector",
StructType::new(vec![
Expand Down Expand Up @@ -162,6 +164,7 @@ impl LogReplayScanner {
Expression::column("add.path"),
Expression::column("add.size"),
Expression::column("add.modificationTime"),
Expression::column("add.stats"),
Expression::column("add.deletionVector"),
Expression::Struct(vec![Expression::column("add.partitionValues")]),
])
Expand Down Expand Up @@ -316,7 +319,7 @@ mod tests {
use std::collections::HashMap;

use crate::scan::{
state::DvInfo,
state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
};

Expand All @@ -326,6 +329,7 @@ mod tests {
_: &mut (),
path: &str,
size: i64,
stats: Option<Stats>,
_: DvInfo,
part_vals: HashMap<String, String>,
) {
Expand All @@ -334,6 +338,8 @@ mod tests {
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
}
Expand Down
13 changes: 4 additions & 9 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl Scan {
/// path: string,
/// size: long,
/// modificationTime: long,
/// stats: string,
/// deletionVector: {
/// storageType: string,
/// pathOrInlineDv: string,
Expand Down Expand Up @@ -482,7 +483,7 @@ pub fn transform_to_logical(
// some utils that are used in file_stream.rs and state.rs tests
#[cfg(test)]
pub(crate) mod test_utils {
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
Expand All @@ -498,7 +499,7 @@ pub(crate) mod test_utils {
EngineData, JsonHandler,
};

use super::state::DvInfo;
use super::state::ScanCallback;

// TODO(nick): Merge all copies of this into one "test utils" thing
fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
Expand Down Expand Up @@ -546,13 +547,7 @@ pub(crate) mod test_utils {
batch: Vec<Box<ArrowEngineData>>,
expected_sel_vec: &[bool],
context: T,
validate_callback: fn(
context: &mut T,
path: &str,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
),
validate_callback: ScanCallback<T>,
) {
let engine = SyncEngine::new();
// doesn't matter here
Expand Down
65 changes: 53 additions & 12 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
DataVisitor, DeltaResult, Engine, EngineData, Error,
};
use serde::{Deserialize, Serialize};
use tracing::warn;

use super::log_replay::SCAN_ROW_SCHEMA;

Expand All @@ -32,7 +33,24 @@ pub struct DvInfo {
deletion_vector: Option<DeletionVectorDescriptor>,
}

/// Give engines an easy way to consume stats
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
/// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the
/// `num_records` statistic must be present and accurate, and must equal the number of records
/// in the data file. In the presence of Deletion Vectors the statistics may be somewhat
/// outdated, i.e. not reflecting deleted rows yet.
pub num_records: u64,
}

impl DvInfo {
/// Check if this DvInfo contains a Deletion Vector. This is mostly used to know if the
/// associated [`Stats`] struct has fully accurate information or not.
pub fn has_vector(&self) -> bool {
self.deletion_vector.is_some()
}

pub fn get_selection_vector(
&self,
engine: &dyn Engine,
Expand All @@ -50,6 +68,15 @@ impl DvInfo {
}
}

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
);

/// Request that the kernel call a callback on each valid file that needs to be read for the
/// scan.
///
Expand All @@ -67,7 +94,7 @@ impl DvInfo {
///
/// ## Example
/// ```ignore
/// let context = [my context];
/// let mut context = [my context];
/// for res in scan_data { // scan data from scan.get_scan_data()
/// let (data, vector) = res?;
/// context = delta_kernel::scan::state::visit_scan_files(
Expand All @@ -82,13 +109,7 @@ pub fn visit_scan_files<T>(
data: &dyn EngineData,
selection_vector: &[bool],
context: T,
callback: fn(
context: &mut T,
path: &str,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
),
callback: ScanCallback<T>,
) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
Expand All @@ -101,7 +122,7 @@ pub fn visit_scan_files<T>(

// add some visitor magic for engines
struct ScanFileVisitor<'a, T> {
callback: fn(&mut T, &str, i64, DvInfo, HashMap<String, String>),
callback: ScanCallback<T>,
selection_vector: &'a [bool],
context: T,
}
Expand All @@ -116,14 +137,31 @@ impl<T> DataVisitor for ScanFileVisitor<'_, T> {
// Since path column is required, use it to detect presence of an Add action
if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? {
let size = getters[1].get(row_index, "scanFile.size")?;
let stats: Option<String> = getters[3].get_opt(row_index, "scanFile.stats")?;
let stats: Option<Stats> =
stats.and_then(|json| match serde_json::from_str(json.as_str()) {
Ok(stats) => Some(stats),
Err(e) => {
warn!("Invalid stats string in Add file {json}: {}", e);
None
}
});

let dv_index = SCAN_ROW_SCHEMA
.index_of("deletionVector")
.ok_or_else(|| Error::missing_column("deletionVector"))?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[dv_index..])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[8].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(&mut self.context, path, size, dv_info, partition_values)
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
(self.callback)(
&mut self.context,
path,
size,
stats,
dv_info,
partition_values,
)
}
}
Ok(())
Expand All @@ -136,7 +174,7 @@ mod tests {

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};

use super::DvInfo;
use super::{DvInfo, Stats};

#[derive(Clone)]
struct TestContext {
Expand All @@ -147,6 +185,7 @@ mod tests {
context: &mut TestContext,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
part_vals: HashMap<String, String>,
) {
Expand All @@ -155,6 +194,8 @@ mod tests {
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert!(dv_info.deletion_vector.is_some());
Expand Down
3 changes: 2 additions & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{BinaryOperator, Expression};
use delta_kernel::scan::state::{visit_scan_files, DvInfo};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan, ScanBuilder};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
Expand Down Expand Up @@ -431,6 +431,7 @@ fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
Expand Down
Loading