diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index df28f11aae..f8a223560a 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -207,6 +207,7 @@ macro_rules! table_config { } /// Well known delta table configuration +#[derive(Debug)] pub struct TableConfig<'a>(pub(crate) &'a HashMap>); /// Default num index cols diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index a04dceb3bd..abb46ed91e 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -9,7 +9,6 @@ use bytes::Bytes; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; use object_store::path::Path; -use object_store::ObjectStore; use parquet::{ arrow::ArrowWriter, basic::Compression, errors::ParquetError, file::properties::WriterProperties, @@ -26,24 +25,26 @@ use super::utils::{ use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; use crate::kernel::{scalars::ScalarExt, Add, PartitionsExt, StructType}; -use crate::storage::ObjectStoreRetryExt; +use crate::storage::retry_ext::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; -use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. +#[derive(Debug)] pub struct JsonWriter { - storage: Arc, - arrow_schema_ref: Arc, + table: DeltaTable, + /// Optional schema to use, otherwise try to rely on the schema from the [DeltaTable] + schema_ref: Option, writer_properties: WriterProperties, partition_columns: Vec, arrow_writers: HashMap, } /// Writes messages to an underlying arrow buffer. +#[derive(Debug)] pub(crate) struct DataArrowWriter { arrow_schema: Arc, writer_properties: WriterProperties, @@ -181,16 +182,16 @@ impl DataArrowWriter { impl JsonWriter { /// Create a new JsonWriter instance - pub fn try_new( + pub async fn try_new( table_uri: String, - schema: ArrowSchemaRef, + schema_ref: ArrowSchemaRef, partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()?; - + .load() + .await?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression @@ -198,8 +199,8 @@ impl JsonWriter { .build(); Ok(Self { - storage: storage.object_store(), - arrow_schema_ref: schema, + table, + schema_ref: Some(schema_ref), writer_properties, partition_columns: partition_columns.unwrap_or_default(), arrow_writers: HashMap::new(), @@ -210,8 +211,6 @@ impl JsonWriter { pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema let metadata = table.metadata()?; - let arrow_schema = >::try_from(&metadata.schema()?)?; - let arrow_schema_ref = Arc::new(arrow_schema); let partition_columns = metadata.partition_columns.clone(); // Initialize writer properties for the underlying arrow writer @@ -221,10 +220,10 @@ impl JsonWriter { .build(); Ok(Self { - storage: table.object_store(), - arrow_schema_ref, + table: table.clone(), writer_properties, partition_columns, + schema_ref: None, arrow_writers: HashMap::new(), }) } @@ -248,10 +247,20 @@ impl JsonWriter { self.arrow_writers.clear(); } - /// Returns the arrow schema representation of the delta table schema defined for the wrapped + /// Returns the user-defined arrow schema representation or the schema defined for the wrapped /// table. + /// pub fn arrow_schema(&self) -> Arc { - self.arrow_schema_ref.clone() + if let Some(schema_ref) = self.schema_ref.as_ref() { + return schema_ref.clone(); + } + let schema = self + .table + .schema() + .expect("Failed to unwrap schema for table"); + >::try_from(schema) + .expect("Failed to coerce delta schema to arrow") + .into() } fn divide_by_partition_values( @@ -349,7 +358,11 @@ impl DeltaWriter> for JsonWriter { Ok(()) } - /// Writes the existing parquet bytes to storage and resets internal state to handle another file. + /// Writes the existing parquet bytes to storage and resets internal state to handle another + /// file. + /// + /// This function returns the [Add] actions which should be committed to the [DeltaTable] for + /// the written data files async fn flush(&mut self) -> Result, DeltaTableError> { let writers = std::mem::take(&mut self.arrow_writers); let mut actions = Vec::new(); @@ -363,17 +376,20 @@ impl DeltaWriter> for JsonWriter { let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties); let obj_bytes = Bytes::from(writer.buffer.to_vec()); let file_size = obj_bytes.len() as i64; - self.storage + self.table + .object_store() .put_with_retries(&path, obj_bytes.into(), 15) .await?; + let table_config = self.table.snapshot()?.table_config(); + actions.push(create_add( &writer.partition_values, path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + table_config.num_indexed_cols(), + &table_config.stats_columns(), )?); } Ok(actions) @@ -435,35 +451,49 @@ fn extract_partition_values( #[cfg(test)] mod tests { + use super::*; + use arrow_schema::ArrowError; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use std::fs::File; - use std::sync::Arc; - use super::*; use crate::arrow::array::Int32Array; - use crate::arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - }; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; use crate::kernel::DataType; + use crate::operations::create::CreateBuilder; use crate::writer::test_utils::get_delta_schema; - use crate::writer::DeltaWriter; - use crate::writer::JsonWriter; - #[tokio::test] - async fn test_partition_not_written_to_parquet() { - let table_dir = tempfile::tempdir().unwrap(); + /// Generate a simple test table which has been pre-created at version 0 + async fn get_test_table(table_dir: &tempfile::TempDir) -> DeltaTable { let schema = get_delta_schema(); let path = table_dir.path().to_str().unwrap().to_string(); - let arrow_schema = >::try_from(&schema).unwrap(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 0); + table + } + + #[tokio::test] + async fn test_partition_not_written_to_parquet() { + let table_dir = tempfile::tempdir().unwrap(); + let table = get_test_table(&table_dir).await; + let schema = table.schema().unwrap(); + let arrow_schema = >::try_from(schema).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -535,16 +565,17 @@ mod tests { #[tokio::test] async fn test_parsing_error() { let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); + let table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let arrow_schema = + >::try_from(table.schema().unwrap()).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -572,16 +603,17 @@ mod tests { #[tokio::test] async fn test_json_write_mismatched_values() { let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); + let table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let arrow_schema = + >::try_from(table.schema().unwrap()).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -610,28 +642,18 @@ mod tests { #[tokio::test] async fn test_json_write_mismatched_schema() { - use crate::operations::create::CreateBuilder; let table_dir = tempfile::tempdir().unwrap(); - let schema = get_delta_schema(); - let path = table_dir.path().to_str().unwrap().to_string(); - - let mut table = CreateBuilder::new() - .with_location(&path) - .with_table_name("test-table") - .with_comment("A table for running tests") - .with_columns(schema.fields().cloned()) - .await - .unwrap(); - table.load().await.expect("Failed to load table"); - assert_eq!(table.version(), 0); + let mut table = get_test_table(&table_dir).await; - let arrow_schema = >::try_from(&schema).unwrap(); + let schema = table.schema().unwrap(); + let arrow_schema = >::try_from(schema).unwrap(); let mut writer = JsonWriter::try_new( - path.clone(), + table.table_uri(), Arc::new(arrow_schema), Some(vec!["modified".to_string()]), None, ) + .await .unwrap(); let data = serde_json::json!( @@ -662,7 +684,6 @@ mod tests { #[tokio::test] async fn test_json_write_checkpoint() { - use crate::operations::create::CreateBuilder; use std::fs; let table_dir = tempfile::tempdir().unwrap(); @@ -708,4 +729,100 @@ mod tests { .collect(); assert_eq!(entries.len(), 1); } + + #[tokio::test] + async fn test_json_write_data_skipping_stats_columns() { + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\",\"value\":42},\"maxValues\":{\"id\":\"A\",\"value\":42},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_json_write_data_skipping_num_indexed_cols() { + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + + writer.write(vec![data]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"A\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 10ba52ae62..2197d64f5f 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -44,6 +44,8 @@ pub struct RecordBatchWriter { should_evolve: bool, partition_columns: Vec, arrow_writers: HashMap, + num_indexed_cols: i32, + stats_columns: Option>, } impl std::fmt::Debug for RecordBatchWriter { @@ -60,25 +62,39 @@ impl RecordBatchWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(table_uri) + let delta_table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) - .build_storage()? - .object_store(); - + .build()?; // Initialize writer properties for the underlying arrow writer let writer_properties = WriterProperties::builder() // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + // if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns + let configuration: HashMap> = delta_table.metadata().map_or_else( + |_| HashMap::new(), + |metadata| metadata.configuration.clone(), + ); + Ok(Self { - storage, + storage: delta_table.object_store(), arrow_schema_ref: schema.clone(), original_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -96,6 +112,8 @@ impl RecordBatchWriter { // NOTE: Consider extracting config for writer properties and setting more than just compression .set_compression(Compression::SNAPPY) .build(); + let configuration: HashMap> = + table.metadata()?.configuration.clone(); Ok(Self { storage: table.object_store(), @@ -105,6 +123,16 @@ impl RecordBatchWriter { partition_columns, should_evolve: false, arrow_writers: HashMap::new(), + num_indexed_cols: configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + stats_columns: configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| { + v.as_ref() + .map(|v| v.split(',').map(|s| s.to_string()).collect()) + }), }) } @@ -233,8 +261,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, - DEFAULT_NUM_INDEX_COLS, - &None, + self.num_indexed_cols, + &self.stats_columns, )?); } Ok(actions) @@ -985,4 +1013,100 @@ mod tests { ); } } + + #[tokio::test] + async fn test_write_data_skipping_stats_columns() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingStatsColumns".to_string(), + Some("id,value".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats ="{\"numRecords\":11,\"minValues\":{\"value\":1,\"id\":\"A\"},\"maxValues\":{\"id\":\"B\",\"value\":11},\"nullCount\":{\"id\":0,\"value\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } + + #[tokio::test] + async fn test_write_data_skipping_num_indexed_colsn() { + let batch = get_record_batch(None, false); + let partition_cols: &[String] = &vec![]; + let table_schema: StructType = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + let config: HashMap> = vec![( + "delta.dataSkippingNumIndexedCols".to_string(), + Some("1".to_string()), + )] + .into_iter() + .collect(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().cloned()) + .with_configuration(config) + .with_partition_columns(partition_cols) + .await + .unwrap(); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + let partitions = writer.divide_by_partition_values(&batch).unwrap(); + + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].record_batch, batch); + writer.write(batch).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(table.version(), 1); + let add_actions = table.state.unwrap().file_actions().unwrap(); + assert_eq!(add_actions.len(), 1); + let expected_stats = "{\"numRecords\":11,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"B\"},\"nullCount\":{\"id\":0}}"; + assert_eq!( + expected_stats.parse::().unwrap(), + add_actions + .into_iter() + .nth(0) + .unwrap() + .stats + .unwrap() + .parse::() + .unwrap() + ); + } } diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 984754d510..c09efbf651 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -26,7 +26,7 @@ pub fn create_add( size: i64, file_metadata: &FileMetaData, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let stats = stats_from_file_metadata( partition_values, @@ -99,7 +99,7 @@ fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; @@ -126,7 +126,7 @@ fn stats_from_metadata( row_group_metadata: Vec, num_rows: i64, num_indexed_cols: i32, - stats_columns: &Option>, + stats_columns: &Option>>, ) -> Result { let mut min_values: HashMap = HashMap::new(); let mut max_values: HashMap = HashMap::new(); @@ -138,7 +138,7 @@ fn stats_from_metadata( .into_iter() .map(|v| { match sqlparser::parser::Parser::new(&dialect) - .try_with_sql(v) + .try_with_sql(v.as_ref()) .map_err(|e| DeltaTableError::generic(e.to_string()))? .parse_multipart_identifier() { diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index be0dfebb66..93f9b5a225 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -291,7 +291,7 @@ pub fn create_bare_table() -> DeltaTable { } pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable { - let table_schema = get_delta_schema(); + let table_schema: StructType = get_delta_schema(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path();