Skip to content

Commit

Permalink
chore: remove some file_actions callsites
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Aug 17, 2024
1 parent 8a3e5ed commit eea53f4
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 37 deletions.
23 changes: 6 additions & 17 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ use url::Url;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::kernel::{
Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt,
};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -226,17 +228,6 @@ fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<Arro
Ok(Arc::new(ArrowSchema::new(fields)))
}

pub(crate) trait DataFusionFileMixins {
/// Iterate over all files in the log matching a predicate
fn files_matching_predicate(&self, filters: &[Expr]) -> DeltaResult<impl Iterator<Item = Add>>;
}

impl DataFusionFileMixins for EagerSnapshot {
fn files_matching_predicate(&self, filters: &[Expr]) -> DeltaResult<impl Iterator<Item = Add>> {
files_matching_predicate(self, filters)
}
}

pub(crate) fn files_matching_predicate<'a>(
snapshot: &'a EagerSnapshot,
filters: &[Expr],
Expand Down Expand Up @@ -1007,7 +998,7 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
}
}

pub(crate) fn partitioned_file_from_action(
fn partitioned_file_from_action(
action: &Add,
partition_columns: &[String],
schema: &ArrowSchema,
Expand Down Expand Up @@ -1149,9 +1140,7 @@ pub(crate) async fn execute_plan_to_batch(
)
.await?;

let batch = concat_batches(&plan.schema(), data.iter())?;

Ok(batch)
Ok(concat_batches(&plan.schema(), data.iter())?)
}

/// Responsible for checking batches of data conform to table's invariants.
Expand Down Expand Up @@ -1897,7 +1886,7 @@ mod tests {
let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl FileSystemCheckBuilder {

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<String, Add> =
HashMap::with_capacity(self.snapshot.file_actions()?.len());
HashMap::with_capacity(self.snapshot.files_count());
let log_store = self.log_store.clone();

for active in self.snapshot.file_actions_iter()? {
Expand Down
10 changes: 3 additions & 7 deletions crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ mod tests {
use datafusion_expr::{col, lit};

use super::*;
use crate::delta_datafusion::{DataFusionFileMixins, DataFusionMixins};
use crate::delta_datafusion::{files_matching_predicate, DataFusionMixins};
use crate::kernel::Action;
use crate::test_utils::{ActionFactory, TestSchemas};

Expand Down Expand Up @@ -318,9 +318,7 @@ mod tests {
)));

let state = DeltaTableState::from_actions(actions).unwrap();
let files = state
.snapshot
.files_matching_predicate(&[])
let files = files_matching_predicate(&state.snapshot, &[])
.unwrap()
.collect::<Vec<_>>();
assert_eq!(files.len(), 3);
Expand All @@ -329,9 +327,7 @@ mod tests {
.gt(lit::<i32>(10))
.or(col("value").lt_eq(lit::<i32>(0)));

let files = state
.snapshot
.files_matching_predicate(&[predictate])
let files = files_matching_predicate(&state.snapshot, &[predictate])
.unwrap()
.collect::<Vec<_>>();
assert_eq!(files.len(), 2);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ enum CheckpointError {
source: ArrowError,
},

#[error("missing rewquired action type in snapshot: {0}")]
#[error("missing required action type in snapshot: {0}")]
MissingActionType(String),
}

Expand Down
8 changes: 4 additions & 4 deletions crates/core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use deltalake_core::kernel::{DataType, PrimitiveType, StructField};
use deltalake_core::protocol::SaveMode;
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::{DeltaOps, DeltaTable};
use itertools::Itertools;
use rand::Rng;
use std::error::Error;
use std::fs;
Expand Down Expand Up @@ -103,10 +104,9 @@ async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();
let mut table = DeltaOps::try_from_uri(table_uri).await?;
table.0.load_version(1).await?;
assert_eq!(
table.0.snapshot()?.file_actions()?,
result.0.snapshot()?.file_actions()?
);
let curr_files = table.0.snapshot()?.file_paths_iter().collect_vec();
let result_files = result.0.snapshot()?.file_paths_iter().collect_vec();
assert_eq!(curr_files, result_files);

let result = DeltaOps(result.0)
.restore()
Expand Down
11 changes: 4 additions & 7 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,16 +1238,13 @@ impl RawDeltaTable {
}

pub fn get_add_file_sizes(&self) -> PyResult<HashMap<String, i64>> {
let actions = self
Ok(self
._table
.snapshot()
.map_err(PythonError::from)?
.file_actions()
.map_err(PythonError::from)?;

Ok(actions
.iter()
.map(|action| (action.path(), action.size))
.eager_snapshot()
.files()
.map(|f| (f.path().to_string(), f.size()))
.collect::<HashMap<String, i64>>())
}
/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
Expand Down

0 comments on commit eea53f4

Please sign in to comment.