Skip to content

Commit

Permalink
Expose top level stats in scans (#227)
Browse files Browse the repository at this point in the history
1. Adds a "stats" column to the `EngineData` we produce during scans
2. Extracts, parses, and calls back with stats when using
visit_scan_files
3. Exposes this via ffi


Only supports `numRecords` for the moment, but I've kept this as a
struct so we can add future stats without breaking apis

---------

Co-authored-by: Ryan Johnson <[email protected]>
  • Loading branch information
nicklan and scovich authored Jun 25, 2024
1 parent be69dcf commit 4b25f40
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 27 deletions.
8 changes: 7 additions & 1 deletion ffi/cffi-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
void visit_callback(void* engine_context,
KernelStringSlice path,
int64_t size,
const Stats* stats,
const DvInfo* dv_info,
const CStringMap* partition_values) {
printf("file: %.*s\n", (int)path.len, path.ptr);
printf("file: %.*s (size: %" PRId64 ", num_records:", (int)path.len, path.ptr, size);
if (stats) {
printf("%" PRId64 ")\n", stats->num_records);
} else {
printf(" [no stats])\n");
}
}

void visit_data(void* engine_context,
Expand Down
16 changes: 16 additions & 0 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,21 @@ pub unsafe extern "C" fn kernel_scan_data_free(data: Handle<SharedScanDataIterat
data.drop_handle();
}

/// Give engines an easy way to consume stats
#[repr(C)]
pub struct Stats {
/// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the
/// `num_records` statistic must be present and accurate, and must equal the number of records
/// in the data file. In the presence of Deletion Vectors the statistics may be somewhat
/// outdated, i.e. not reflecting deleted rows yet.
pub num_records: u64,
}

type CScanCallback = extern "C" fn(
engine_context: NullableCvoid,
path: KernelStringSlice,
size: i64,
stats: Option<&Stats>,
dv_info: &DvInfo,
partition_map: &CStringMap,
);
Expand Down Expand Up @@ -336,16 +347,21 @@ fn rust_callback(
context: &mut ContextWrapper,
path: &str,
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
values: partition_values,
};
let stats = kernel_stats.map(|ks| Stats {
num_records: ks.num_records,
});
(context.callback)(
context.engine_context,
path.into(),
size,
stats.as_ref(),
&dv_info,
&partition_map,
);
Expand Down
3 changes: 2 additions & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState};
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::{transform_to_logical, ScanBuilder};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
Expand Down Expand Up @@ -94,6 +94,7 @@ fn send_scan_file(
scan_tx: &mut spmc::Sender<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ macro_rules! impl_default_get {
$(
fn $name(&'a self, _row_index: usize, field_name: &str) -> DeltaResult<Option<$typ>> {
debug!("Asked for type {} on {field_name}, but using default error impl.", stringify!($typ));
Err(Error::UnexpectedColumnType(format!("{field_name} is not of type {}", stringify!($typ))))
Err(Error::UnexpectedColumnType(format!("{field_name} is not of type {}", stringify!($typ))).with_backtrace())
}
)*
};
Expand Down
10 changes: 8 additions & 2 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ impl DataVisitor for AddRemoveVisitor {

lazy_static! {
// NB: If you update this schema, ensure you update the comment describing it in the doc comment
// for `scan_row_schema` in scan/mod.rs!
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
// indexes will be off
pub(crate) static ref SCAN_ROW_SCHEMA: Arc<StructType> = Arc::new(StructType::new(vec!(
StructField::new("path", DataType::STRING, true),
StructField::new("size", DataType::LONG, true),
StructField::new("modificationTime", DataType::LONG, true),
StructField::new("stats", DataType::STRING, true),
StructField::new(
"deletionVector",
StructType::new(vec![
Expand Down Expand Up @@ -162,6 +164,7 @@ impl LogReplayScanner {
Expression::column("add.path"),
Expression::column("add.size"),
Expression::column("add.modificationTime"),
Expression::column("add.stats"),
Expression::column("add.deletionVector"),
Expression::Struct(vec![Expression::column("add.partitionValues")]),
])
Expand Down Expand Up @@ -316,7 +319,7 @@ mod tests {
use std::collections::HashMap;

use crate::scan::{
state::DvInfo,
state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
};

Expand All @@ -326,6 +329,7 @@ mod tests {
_: &mut (),
path: &str,
size: i64,
stats: Option<Stats>,
_: DvInfo,
part_vals: HashMap<String, String>,
) {
Expand All @@ -334,6 +338,8 @@ mod tests {
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
}
Expand Down
13 changes: 4 additions & 9 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl Scan {
/// path: string,
/// size: long,
/// modificationTime: long,
/// stats: string,
/// deletionVector: {
/// storageType: string,
/// pathOrInlineDv: string,
Expand Down Expand Up @@ -482,7 +483,7 @@ pub fn transform_to_logical(
// some utils that are used in file_stream.rs and state.rs tests
#[cfg(test)]
pub(crate) mod test_utils {
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
Expand All @@ -498,7 +499,7 @@ pub(crate) mod test_utils {
EngineData, JsonHandler,
};

use super::state::DvInfo;
use super::state::ScanCallback;

// TODO(nick): Merge all copies of this into one "test utils" thing
fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
Expand Down Expand Up @@ -546,13 +547,7 @@ pub(crate) mod test_utils {
batch: Vec<Box<ArrowEngineData>>,
expected_sel_vec: &[bool],
context: T,
validate_callback: fn(
context: &mut T,
path: &str,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
),
validate_callback: ScanCallback<T>,
) {
let engine = SyncEngine::new();
// doesn't matter here
Expand Down
65 changes: 53 additions & 12 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
DataVisitor, DeltaResult, Engine, EngineData, Error,
};
use serde::{Deserialize, Serialize};
use tracing::warn;

use super::log_replay::SCAN_ROW_SCHEMA;

Expand All @@ -32,7 +33,24 @@ pub struct DvInfo {
deletion_vector: Option<DeletionVectorDescriptor>,
}

/// Give engines an easy way to consume stats
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
/// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the
/// `num_records` statistic must be present and accurate, and must equal the number of records
/// in the data file. In the presence of Deletion Vectors the statistics may be somewhat
/// outdated, i.e. not reflecting deleted rows yet.
pub num_records: u64,
}

impl DvInfo {
/// Check if this DvInfo contains a Deletion Vector. This is mostly used to know if the
/// associated [`Stats`] struct has fully accurate information or not.
pub fn has_vector(&self) -> bool {
self.deletion_vector.is_some()
}

pub fn get_selection_vector(
&self,
engine: &dyn Engine,
Expand All @@ -50,6 +68,15 @@ impl DvInfo {
}
}

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
);

/// Request that the kernel call a callback on each valid file that needs to be read for the
/// scan.
///
Expand All @@ -67,7 +94,7 @@ impl DvInfo {
///
/// ## Example
/// ```ignore
/// let context = [my context];
/// let mut context = [my context];
/// for res in scan_data { // scan data from scan.get_scan_data()
/// let (data, vector) = res?;
/// context = delta_kernel::scan::state::visit_scan_files(
Expand All @@ -82,13 +109,7 @@ pub fn visit_scan_files<T>(
data: &dyn EngineData,
selection_vector: &[bool],
context: T,
callback: fn(
context: &mut T,
path: &str,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
),
callback: ScanCallback<T>,
) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
Expand All @@ -101,7 +122,7 @@ pub fn visit_scan_files<T>(

// add some visitor magic for engines
struct ScanFileVisitor<'a, T> {
callback: fn(&mut T, &str, i64, DvInfo, HashMap<String, String>),
callback: ScanCallback<T>,
selection_vector: &'a [bool],
context: T,
}
Expand All @@ -116,14 +137,31 @@ impl<T> DataVisitor for ScanFileVisitor<'_, T> {
// Since path column is required, use it to detect presence of an Add action
if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? {
let size = getters[1].get(row_index, "scanFile.size")?;
let stats: Option<String> = getters[3].get_opt(row_index, "scanFile.stats")?;
let stats: Option<Stats> =
stats.and_then(|json| match serde_json::from_str(json.as_str()) {
Ok(stats) => Some(stats),
Err(e) => {
warn!("Invalid stats string in Add file {json}: {}", e);
None
}
});

let dv_index = SCAN_ROW_SCHEMA
.index_of("deletionVector")
.ok_or_else(|| Error::missing_column("deletionVector"))?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[dv_index..])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[8].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(&mut self.context, path, size, dv_info, partition_values)
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(
&mut self.context,
path,
size,
stats,
dv_info,
partition_values,
)
}
}
Ok(())
Expand All @@ -136,7 +174,7 @@ mod tests {

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};

use super::DvInfo;
use super::{DvInfo, Stats};

#[derive(Clone)]
struct TestContext {
Expand All @@ -147,6 +185,7 @@ mod tests {
context: &mut TestContext,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
part_vals: HashMap<String, String>,
) {
Expand All @@ -155,6 +194,8 @@ mod tests {
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert!(dv_info.deletion_vector.is_some());
Expand Down
3 changes: 2 additions & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{BinaryOperator, Expression};
use delta_kernel::scan::state::{visit_scan_files, DvInfo};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan, ScanBuilder};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
Expand Down Expand Up @@ -431,6 +431,7 @@ fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
Expand Down

0 comments on commit 4b25f40

Please sign in to comment.