diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 029a4974a1..9feb617470 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -376,9 +376,10 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - let schema = Arc::new(>::try_from( - DeltaTable::schema(self).unwrap(), - )?); + let schema = self + .state + .physical_arrow_schema(self.object_store()) + .await?; register_store(self, session.runtime_env().clone()); diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index ea4c5e49dd..2be717e549 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -1,71 +1,40 @@ -use std::collections::HashMap; use std::sync::Arc; -use crate::storage::DeltaObjectStore; -use crate::{builder::ensure_table_uri, DeltaResult, DeltaTable}; - use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + #[derive(Debug, Clone)] pub struct LoadBuilder { - location: Option, + /// A snapshot of the to-be-loaded table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// A sub-selection of columns to be loaded columns: Option>, - storage_options: Option>, - object_store: Option>, -} - -impl Default for LoadBuilder { - fn default() -> Self { - Self::new() - } } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new() -> Self { + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { Self { - location: None, + snapshot, + store, columns: None, - storage_options: None, - object_store: None, } } - /// Specify the path to the location where table data is stored, - /// which could be a path on distributed storage. - pub fn with_location(mut self, location: impl Into) -> Self { - self.location = Some(location.into()); - self - } - /// Specify column selection to load pub fn with_columns(mut self, columns: impl IntoIterator>) -> Self { self.columns = Some(columns.into_iter().map(|s| s.into()).collect()); self } - - /// Set options used to initialize storage backend - /// - /// Options may be passed in the HashMap or set as environment variables. - /// - /// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend. - /// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client. - /// [crate::builder::azure_storage_options] describes the available options for the Azure backend. - /// [crate::builder::gcp_storage_options] describes the available options for the Google Cloud Platform backend. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); - self - } - - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); - self - } } impl std::future::IntoFuture for LoadBuilder { @@ -76,20 +45,31 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let object_store = this.object_store.unwrap(); - let url = ensure_table_uri(object_store.root_uri())?; - let store = object_store.storage_backend().clone(); - let mut table = DeltaTable::new(object_store, Default::default()); - table.load().await?; + let table = DeltaTable::new_with_state(this.store, this.snapshot); + let schema = table.state.arrow_schema()?; + let projection = this + .columns + .map(|cols| { + cols.iter() + .map(|col| { + schema.column_with_name(col).map(|(idx, _)| idx).ok_or( + DeltaTableError::SchemaMismatch { + msg: format!("Column '{col}' does not exist in table schema."), + }, + ) + }) + .collect::>() + }) + .transpose()?; let ctx = SessionContext::new(); - ctx.state() - .runtime_env() - .register_object_store(url.scheme(), "", store); - let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; + let scan_plan = table + .scan(&ctx.state(), projection.as_ref(), &[], None) + .await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let stream = plan.execute(0, task_ctx)?; + Ok((table, stream)) }) } @@ -157,4 +137,34 @@ mod tests { assert_eq!(batch.schema(), data[0].schema()); Ok(()) } + + #[tokio::test] + async fn test_load_with_columns() -> TestResult { + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?; + + let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?; + let data = collect_sendable_stream(stream).await?; + + let expected = vec![ + "+----+-------+", + "| id | value |", + "+----+-------+", + "| A | 1 |", + "| B | 2 |", + "| A | 3 |", + "| B | 4 |", + "| A | 5 |", + "| A | 6 |", + "| A | 7 |", + "| B | 8 |", + "| B | 9 |", + "| A | 10 |", + "| A | 11 |", + "+----+-------+", + ]; + + assert_batches_sorted_eq!(&expected, &data); + Ok(()) + } } diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 7a554809a5..1f17d4b967 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -102,7 +102,7 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::default().with_object_store(self.0.object_store()) + LoadBuilder::new(self.0.object_store(), self.0.state) } /// Write data to Delta table diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index f521ac2062..b22c8f8e8e 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -11,6 +11,8 @@ use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference}; use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use itertools::Either; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; use sqlparser::tokenizer::Tokenizer; @@ -82,6 +84,43 @@ impl DeltaTableState { Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) } + + /// Get the pysical table schema. + /// + /// This will construct a schema derived from the parquet schema of the latest data file, + /// and fields for partition columns from the schema defined in table meta data. + pub async fn physical_arrow_schema( + &self, + object_store: Arc, + ) -> DeltaResult { + if let Some(add) = self.files().iter().max_by_key(|obj| obj.modification_time) { + let file_meta = add.try_into()?; + let file_reader = ParquetObjectReader::new(object_store, file_meta); + let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build()? + .schema() + .clone(); + + let table_schema = Arc::new(ArrowSchema::new( + self.arrow_schema()? + .fields + .clone() + .into_iter() + .map(|field| { + file_schema + .field_with_name(field.name()) + .cloned() + .unwrap_or(field) + }) + .collect(), + )); + + Ok(table_schema) + } else { + self.arrow_schema() + } + } } pub struct AddContainer<'a> { diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 466ccfc733..a6d18aad7e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -728,7 +728,40 @@ async fn test_datafusion_partitioned_types() -> Result<()> { ), ]); - assert_eq!(Arc::new(expected_schema), batches[0].schema()); + assert_eq!( + Arc::new(expected_schema), + Arc::new( + batches[0] + .schema() + .as_ref() + .clone() + .with_metadata(Default::default()) + ) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_datafusion_scan_timestamps() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/table_with_edge_timestamps") + .await + .unwrap(); + ctx.register_table("demo", Arc::new(table))?; + + let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + + let expected = vec![ + "+-------------------------------+---------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-------------------------------+---------------------+------------+", + "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", + "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", + "+-------------------------------+---------------------+------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); Ok(()) }