From 63daa7834ccd26458dca4182ed2936a4e5cea6d0 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Thu, 29 Feb 2024 11:08:49 +0100 Subject: [PATCH 1/4] try remove writer.rs --- crates/core/src/operations/delete.rs | 4 +- crates/core/src/operations/merge/mod.rs | 2 +- crates/core/src/operations/mod.rs | 1 - crates/core/src/operations/optimize.rs | 43 +- crates/core/src/operations/update.rs | 2 +- crates/core/src/operations/write.rs | 29 +- crates/core/src/operations/writer.rs | 597 ------------------------ crates/core/src/writer/json.rs | 8 +- crates/core/src/writer/mod.rs | 4 +- crates/core/src/writer/record_batch.rs | 41 +- python/deltalake/_internal.pyi | 1 + python/deltalake/writer.py | 1 + python/src/lib.rs | 5 + 13 files changed, 76 insertions(+), 662 deletions(-) delete mode 100644 crates/core/src/operations/writer.rs diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 2e3e99bde2..2f8c23bb76 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -131,7 +131,7 @@ async fn excute_non_empty_expr( metrics: &mut DeleteMetrics, rewrite: &[Add], writer_properties: Option, -) -> DeltaResult> { +) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -222,7 +222,7 @@ async fn execute( .unwrap() .as_millis() as i64; - let mut actions: Vec = add.into_iter().map(Action::Add).collect(); + let mut actions: Vec = add.clone(); let mut version = snapshot.version(); metrics.num_removed_files = remove.len(); metrics.num_added_files = actions.len(); diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 6190e8f724..f1ed5764da 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1385,7 +1385,7 @@ async fn execute( metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64; - let mut actions: Vec = add_actions.into_iter().map(Action::Add).collect(); + let mut actions: Vec = add_actions.clone(); metrics.num_target_files_added = actions.len(); let survivors = barrier diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 666b2dc66a..19907ca022 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -50,7 +50,6 @@ pub mod merge; pub mod update; #[cfg(feature = "datafusion")] pub mod write; -pub mod writer; // TODO make ops consume a snapshot ... diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 990997399e..1b038c020a 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -40,7 +40,6 @@ use serde::{Deserialize, Serialize}; use tracing::debug; use super::transaction::{commit, PROTOCOL}; -use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, PartitionsExt, Remove, Scalar}; use crate::logstore::LogStoreRef; @@ -48,6 +47,7 @@ use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; +use crate::writer::{DeltaWriter, RecordBatchWriter}; use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter}; /// Metrics from Optimize @@ -443,14 +443,9 @@ impl MergePlan { }; // Next, initialize the writer - let writer_config = PartitionWriterConfig::try_new( - task_parameters.file_schema.clone(), - partition_values.clone(), - Some(task_parameters.writer_properties.clone()), - Some(task_parameters.input_parameters.target_size as usize), - None, - )?; - let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?; + // TODO: task_parameters.input_parameters.target_size in RecordBatchWriter::for_storage + let mut writer = RecordBatchWriter::for_storage(object_store, task_parameters.writer_properties.clone(), + task_parameters.file_schema.clone(), Some(partition_values.keys().into_iter().map(|s|s.to_owned()).collect_vec()))?; let mut read_stream = read_stream.await?; @@ -460,21 +455,25 @@ impl MergePlan { batch = super::cast::cast_record_batch(&batch, task_parameters.file_schema.clone(), false)?; partial_metrics.num_batches += 1; - writer.write(&batch).await.map_err(DeltaTableError::from)?; + writer.write(batch).await.map_err(DeltaTableError::from)?; } - let add_actions = writer.close().await?.into_iter().map(|mut add| { - add.data_change = false; - - let size = add.size; - - partial_metrics.num_files_added += 1; - partial_metrics.files_added.total_files += 1; - partial_metrics.files_added.total_size += size; - partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size); - partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size); - - Action::Add(add) + let add_actions = writer.flush().await?.into_iter().map(|mut action| { + match &mut action { + Action::Add(add) => { + // add.partition_values = partition_values.into(); TODO: Required? + add.data_change = false; + let size = add.size; + + partial_metrics.num_files_added += 1; + partial_metrics.files_added.total_files += 1; + partial_metrics.files_added.total_size += size; + partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size); + partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size); + Action::Add(add.clone()) + } + o => o.clone() + } }); partial_actions.extend(add_actions); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index d07f3f9fc0..9f8c6c7088 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -377,7 +377,7 @@ async fn execute( .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as i64; - let mut actions: Vec = add_actions.into_iter().map(Action::Add).collect(); + let mut actions: Vec = add_actions.clone(); metrics.num_added_files = actions.len(); metrics.num_removed_files = candidates.candidates.len(); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 73c1599a7e..6bd0d43883 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -43,7 +43,6 @@ use parquet::file::properties::WriterProperties; use super::datafusion_utils::Expression; use super::transaction::PROTOCOL; -use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::expr::parse_predicate_expression; @@ -57,6 +56,7 @@ use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; +use crate::writer::{DeltaWriter, RecordBatchWriter}; use crate::DeltaTable; #[derive(thiserror::Error, Debug)] @@ -312,7 +312,7 @@ async fn write_execution_plan_with_predicate( writer_properties: Option, safe_cast: bool, overwrite_schema: bool, -) -> DeltaResult> { +) -> DeltaResult> { // Use input schema to prevent wrapping partitions columns into a dictionary. let schema: ArrowSchemaRef = if overwrite_schema { plan.schema() @@ -342,26 +342,21 @@ async fn write_execution_plan_with_predicate( let inner_plan = plan.clone(); let inner_schema = schema.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); - let config = WriterConfig::new( - inner_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - ); - let mut writer = DeltaWriter::new(object_store.clone(), config); + + let mut writer = RecordBatchWriter::for_storage(object_store.clone(), + writer_properties.clone().unwrap_or_default(), inner_schema.clone(), Some(partition_columns.clone()))?; let checker_stream = checker.clone(); let mut stream = inner_plan.execute(i, task_ctx)?; - let handle: tokio::task::JoinHandle>> = + let handle: tokio::task::JoinHandle>> = tokio::task::spawn(async move { while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; checker_stream.check_batch(&batch).await?; let arr = super::cast::cast_record_batch(&batch, inner_schema.clone(), safe_cast)?; - writer.write(&arr).await?; + writer.write(arr).await?; } - writer.close().await + writer.flush().await }); tasks.push(handle); @@ -392,7 +387,7 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, safe_cast: bool, overwrite_schema: bool, -) -> DeltaResult> { +) -> DeltaResult> { write_execution_plan_with_predicate( None, snapshot, @@ -417,7 +412,7 @@ async fn execute_non_empty_expr( expression: &Expr, rewrite: &[Add], writer_properties: Option, -) -> DeltaResult> { +) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -488,7 +483,7 @@ async fn prepare_predicate_actions( }; let remove = candidates.candidates; - let mut actions: Vec = add.into_iter().map(Action::Add).collect(); + let mut actions: Vec = add.clone(); for action in remove { actions.push(Action::Remove(Remove { @@ -644,7 +639,7 @@ impl std::future::IntoFuture for WriteBuilder { this.overwrite_schema, ) .await?; - actions.extend(add_actions.into_iter().map(Action::Add)); + actions.extend(add_actions.into_iter()); // Collect remove actions if we are overwriting the table if let Some(snapshot) = &this.snapshot { diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs deleted file mode 100644 index c778ddfad5..0000000000 --- a/crates/core/src/operations/writer.rs +++ /dev/null @@ -1,597 +0,0 @@ -//! Abstractions and implementations for writing data to delta tables - -use std::collections::HashMap; - -use arrow::datatypes::SchemaRef as ArrowSchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use bytes::Bytes; -use indexmap::IndexMap; -use object_store::{path::Path, ObjectStore}; -use parquet::arrow::ArrowWriter; -use parquet::basic::Compression; -use parquet::file::properties::WriterProperties; -use tracing::debug; - -use crate::crate_version; -use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, PartitionsExt, Scalar}; -use crate::storage::ObjectStoreRef; -use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; -use crate::writer::stats::create_add; -use crate::writer::utils::{ - arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, - ShareableBuffer, -}; - -// TODO databricks often suggests a file size of 100mb, should we set this default? -const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600; -const DEFAULT_WRITE_BATCH_SIZE: usize = 1024; - -#[derive(thiserror::Error, Debug)] -enum WriteError { - #[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")] - SchemaMismatch { - schema: ArrowSchemaRef, - expected_schema: ArrowSchemaRef, - }, - - #[error("Error creating add action: {source}")] - CreateAdd { - source: Box, - }, - - #[error("Error handling Arrow data: {source}")] - Arrow { - #[from] - source: ArrowError, - }, - - #[error("Error partitioning record batch: {0}")] - Partitioning(String), -} - -impl From for DeltaTableError { - fn from(err: WriteError) -> Self { - match err { - WriteError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch { - msg: err.to_string(), - }, - WriteError::Arrow { source } => DeltaTableError::Arrow { source }, - _ => DeltaTableError::GenericError { - source: Box::new(err), - }, - } - } -} - -/// Configuration to write data into Delta tables -#[derive(Debug)] -pub struct WriterConfig { - /// Schema of the delta table - table_schema: ArrowSchemaRef, - /// Column names for columns the table is partitioned by - partition_columns: Vec, - /// Properties passed to underlying parquet writer - writer_properties: WriterProperties, - /// Size above which we will write a buffered parquet file to disk. - target_file_size: usize, - /// Row chunks passed to parquet writer. This and the internal parquet writer settings - /// determine how fine granular we can track / control the size of resulting files. - write_batch_size: usize, -} - -impl WriterConfig { - /// Create a new instance of [WriterConfig]. - pub fn new( - table_schema: ArrowSchemaRef, - partition_columns: Vec, - writer_properties: Option, - target_file_size: Option, - write_batch_size: Option, - ) -> Self { - let writer_properties = writer_properties.unwrap_or_else(|| { - WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build() - }); - let target_file_size = target_file_size.unwrap_or(DEFAULT_TARGET_FILE_SIZE); - let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE); - - Self { - table_schema, - partition_columns, - writer_properties, - target_file_size, - write_batch_size, - } - } - - /// Schema of files written to disk - pub fn file_schema(&self) -> ArrowSchemaRef { - arrow_schema_without_partitions(&self.table_schema, &self.partition_columns) - } -} - -#[derive(Debug)] -/// A parquet writer implementation tailored to the needs of writing data to a delta table. -pub struct DeltaWriter { - /// An object store pointing at Delta table root - object_store: ObjectStoreRef, - /// configuration for the writers - config: WriterConfig, - /// partition writers for individual partitions - partition_writers: HashMap, -} - -impl DeltaWriter { - /// Create a new instance of [`DeltaWriter`] - pub fn new(object_store: ObjectStoreRef, config: WriterConfig) -> Self { - Self { - object_store, - config, - partition_writers: HashMap::new(), - } - } - - /// Apply custom writer_properties to the underlying parquet writer - pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { - self.config.writer_properties = writer_properties; - self - } - - fn divide_by_partition_values( - &mut self, - values: &RecordBatch, - ) -> DeltaResult> { - Ok(divide_by_partition_values( - self.config.file_schema(), - self.config.partition_columns.clone(), - values, - ) - .map_err(|err| WriteError::Partitioning(err.to_string()))?) - } - - /// Write a batch to the partition induced by the partition_values. The record batch is expected - /// to be pre-partitioned and only contain rows that belong into the same partition. - /// However, it should still contain the partition columns. - pub async fn write_partition( - &mut self, - record_batch: RecordBatch, - partition_values: &IndexMap, - ) -> DeltaResult<()> { - let partition_key = Path::parse(partition_values.hive_partition_path())?; - - let record_batch = - record_batch_without_partitions(&record_batch, &self.config.partition_columns)?; - - match self.partition_writers.get_mut(&partition_key) { - Some(writer) => { - writer.write(&record_batch).await?; - } - None => { - let config = PartitionWriterConfig::try_new( - self.config.file_schema(), - partition_values.clone(), - Some(self.config.writer_properties.clone()), - Some(self.config.target_file_size), - Some(self.config.write_batch_size), - )?; - let mut writer = - PartitionWriter::try_with_config(self.object_store.clone(), config)?; - writer.write(&record_batch).await?; - let _ = self.partition_writers.insert(partition_key, writer); - } - } - - Ok(()) - } - - /// Buffers record batches in-memory per partition up to appx. `target_file_size` for a partition. - /// Flushes data to storage once a full file can be written. - /// - /// The `close` method has to be invoked to write all data still buffered - /// and get the list of all written files. - pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - for result in self.divide_by_partition_values(batch)? { - self.write_partition(result.record_batch, &result.partition_values) - .await?; - } - Ok(()) - } - - /// Close the writer and get the new [Add] actions. - /// - /// This will flush all remaining data. - pub async fn close(mut self) -> DeltaResult> { - let writers = std::mem::take(&mut self.partition_writers); - let mut actions = Vec::new(); - for (_, writer) in writers { - let writer_actions = writer.close().await?; - actions.extend(writer_actions); - } - Ok(actions) - } -} - -#[derive(Debug)] -pub(crate) struct PartitionWriterConfig { - /// Schema of the data written to disk - file_schema: ArrowSchemaRef, - /// Prefix applied to all paths - prefix: Path, - /// Values for all partition columns - partition_values: IndexMap, - /// Properties passed to underlying parquet writer - writer_properties: WriterProperties, - /// Size above which we will write a buffered parquet file to disk. - target_file_size: usize, - /// Row chunks passed to parquet writer. This and the internal parquet writer settings - /// determine how fine granular we can track / control the size of resulting files. - write_batch_size: usize, -} - -impl PartitionWriterConfig { - pub fn try_new( - file_schema: ArrowSchemaRef, - partition_values: IndexMap, - writer_properties: Option, - target_file_size: Option, - write_batch_size: Option, - ) -> DeltaResult { - let part_path = partition_values.hive_partition_path(); - let prefix = Path::parse(part_path)?; - let writer_properties = writer_properties.unwrap_or_else(|| { - WriterProperties::builder() - .set_created_by(format!("delta-rs version {}", crate_version())) - .build() - }); - let target_file_size = target_file_size.unwrap_or(DEFAULT_TARGET_FILE_SIZE); - let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE); - - Ok(Self { - file_schema, - prefix, - partition_values, - writer_properties, - target_file_size, - write_batch_size, - }) - } -} - -#[derive(Debug)] -pub(crate) struct PartitionWriter { - object_store: ObjectStoreRef, - writer_id: uuid::Uuid, - config: PartitionWriterConfig, - buffer: ShareableBuffer, - arrow_writer: ArrowWriter, - part_counter: usize, - files_written: Vec, -} - -impl PartitionWriter { - /// Create a new instance of [`PartitionWriter`] from [`PartitionWriterConfig`] - pub fn try_with_config( - object_store: ObjectStoreRef, - config: PartitionWriterConfig, - ) -> DeltaResult { - let buffer = ShareableBuffer::default(); - let arrow_writer = ArrowWriter::try_new( - buffer.clone(), - config.file_schema.clone(), - Some(config.writer_properties.clone()), - )?; - - Ok(Self { - object_store, - writer_id: uuid::Uuid::new_v4(), - config, - buffer, - arrow_writer, - part_counter: 0, - files_written: Vec::new(), - }) - } - - fn next_data_path(&mut self) -> Path { - self.part_counter += 1; - - next_data_path( - &self.config.prefix, - self.part_counter, - &self.writer_id, - &self.config.writer_properties, - ) - } - - fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter, ShareableBuffer)> { - let new_buffer = ShareableBuffer::default(); - let arrow_writer = ArrowWriter::try_new( - new_buffer.clone(), - self.config.file_schema.clone(), - Some(self.config.writer_properties.clone()), - )?; - Ok(( - std::mem::replace(&mut self.arrow_writer, arrow_writer), - std::mem::replace(&mut self.buffer, new_buffer), - )) - } - - fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - Ok(self.arrow_writer.write(batch)?) - } - - async fn flush_arrow_writer(&mut self) -> DeltaResult<()> { - // replace counter / buffers and close the current writer - let (writer, buffer) = self.reset_writer()?; - let metadata = writer.close()?; - // don't write empty file - if metadata.num_rows == 0 { - return Ok(()); - } - - let buffer = match buffer.into_inner() { - Some(buffer) => Bytes::from(buffer), - None => return Ok(()), // Nothing to write - }; - - // collect metadata - let path = self.next_data_path(); - let file_size = buffer.len() as i64; - - // write file to object store - self.object_store.put(&path, buffer).await?; - self.files_written.push( - create_add( - &self.config.partition_values, - path.to_string(), - file_size, - &metadata, - ) - .map_err(|err| WriteError::CreateAdd { - source: Box::new(err), - })?, - ); - - Ok(()) - } - - /// Buffers record batches in-memory up to appx. `target_file_size`. - /// Flushes data to storage once a full file can be written. - /// - /// The `close` method has to be invoked to write all data still buffered - /// and get the list of all written files. - pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - if batch.schema() != self.config.file_schema { - return Err(WriteError::SchemaMismatch { - schema: batch.schema(), - expected_schema: self.config.file_schema.clone(), - } - .into()); - } - - let max_offset = batch.num_rows(); - for offset in (0..max_offset).step_by(self.config.write_batch_size) { - let length = usize::min(self.config.write_batch_size, max_offset - offset); - self.write_batch(&batch.slice(offset, length))?; - // flush currently buffered data to disk once we meet or exceed the target file size. - let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size(); - if estimated_size >= self.config.target_file_size { - debug!( - "Writing file with estimated size {:?} to disk.", - estimated_size - ); - self.flush_arrow_writer().await?; - } - } - - Ok(()) - } - - pub async fn close(mut self) -> DeltaResult> { - self.flush_arrow_writer().await?; - Ok(self.files_written) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::storage::utils::flatten_list_stream as list; - use crate::writer::test_utils::*; - use crate::DeltaTableBuilder; - use arrow::array::{Int32Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; - use std::sync::Arc; - - fn get_delta_writer( - object_store: ObjectStoreRef, - batch: &RecordBatch, - writer_properties: Option, - target_file_size: Option, - write_batch_size: Option, - ) -> DeltaWriter { - let config = WriterConfig::new( - batch.schema(), - vec![], - writer_properties, - target_file_size, - write_batch_size, - ); - DeltaWriter::new(object_store, config) - } - - fn get_partition_writer( - object_store: ObjectStoreRef, - batch: &RecordBatch, - writer_properties: Option, - target_file_size: Option, - write_batch_size: Option, - ) -> PartitionWriter { - let config = PartitionWriterConfig::try_new( - batch.schema(), - IndexMap::new(), - writer_properties, - target_file_size, - write_batch_size, - ) - .unwrap(); - PartitionWriter::try_with_config(object_store, config).unwrap() - } - - #[tokio::test] - async fn test_write_partition() { - let log_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap(); - let object_store = log_store.object_store(); - let batch = get_record_batch(None, false); - - // write single un-partitioned batch - let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None); - writer.write(&batch).await.unwrap(); - let files = list(object_store.as_ref(), None).await.unwrap(); - assert_eq!(files.len(), 0); - let adds = writer.close().await.unwrap(); - let files = list(object_store.as_ref(), None).await.unwrap(); - assert_eq!(files.len(), 1); - assert_eq!(files.len(), adds.len()); - let head = object_store - .head(&Path::from(adds[0].path.clone())) - .await - .unwrap(); - assert_eq!(head.size, adds[0].size as usize) - } - - #[tokio::test] - async fn test_write_partition_with_parts() { - let base_int = Arc::new(Int32Array::from((0..10000).collect::>())); - let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); - let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), - ])); - let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - - let object_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap() - .object_store(); - let properties = WriterProperties::builder() - .set_max_row_group_size(1024) - .build(); - // configure small target file size and and row group size so we can observe multiple files written - let mut writer = - get_partition_writer(object_store, &batch, Some(properties), Some(10_000), None); - writer.write(&batch).await.unwrap(); - - // check that we have written more then once file, and no more then 1 is below target size - let adds = writer.close().await.unwrap(); - assert!(adds.len() > 1); - let target_file_count = adds - .iter() - .fold(0, |acc, add| acc + (add.size > 10_000) as i32); - assert!(target_file_count >= adds.len() as i32 - 1) - } - - #[tokio::test] - async fn test_unflushed_row_group_size() { - let base_int = Arc::new(Int32Array::from((0..10000).collect::>())); - let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); - let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), - ])); - let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - - let object_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap() - .object_store(); - // configure small target file size so we can observe multiple files written - let mut writer = get_partition_writer(object_store, &batch, None, Some(10_000), None); - writer.write(&batch).await.unwrap(); - - // check that we have written more then once file, and no more then 1 is below target size - let adds = writer.close().await.unwrap(); - assert!(adds.len() > 1); - let target_file_count = adds - .iter() - .fold(0, |acc, add| acc + (add.size > 10_000) as i32); - assert!(target_file_count >= adds.len() as i32 - 1) - } - - #[tokio::test] - async fn test_do_not_write_empty_file_on_close() { - let base_int = Arc::new(Int32Array::from((0..10000_i32).collect::>())); - let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); - let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), - ])); - let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - - let object_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap() - .object_store(); - // configure high batch size and low file size to observe one file written and flushed immediately - // upon writing batch, then ensures the buffer is empty upon closing writer - let mut writer = get_partition_writer(object_store, &batch, None, Some(9000), Some(10000)); - writer.write(&batch).await.unwrap(); - - let adds = writer.close().await.unwrap(); - assert!(adds.len() == 1); - } - - #[tokio::test] - async fn test_write_mismatched_schema() { - let log_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap(); - let object_store = log_store.object_store(); - let batch = get_record_batch(None, false); - - // write single un-partitioned batch - let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None); - writer.write(&batch).await.unwrap(); - // Ensure the write hasn't been flushed - let files = list(object_store.as_ref(), None).await.unwrap(); - assert_eq!(files.len(), 0); - - // Create a second batch with a different schema - let second_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::Utf8, true), - ])); - let second_batch = RecordBatch::try_new( - second_schema, - vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2)])), - Arc::new(StringArray::from(vec![Some("will"), Some("robert")])), - ], - ) - .unwrap(); - - let result = writer.write(&second_batch).await; - assert!(result.is_err()); - - match result { - Ok(_) => { - assert!(false, "Should not have successfully written"); - } - Err(e) => { - match e { - DeltaTableError::SchemaMismatch { .. } => { - // this is expected - } - others => { - assert!(false, "Got the wrong error: {others:?}"); - } - } - } - }; - } -} diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 6740bc0204..f62a87907c 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -24,7 +24,7 @@ use super::utils::{ }; use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; -use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; +use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -347,7 +347,7 @@ impl DeltaWriter> for JsonWriter { } /// Writes the existing parquet bytes to storage and resets internal state to handle another file. - async fn flush(&mut self) -> Result, DeltaTableError> { + async fn flush(&mut self) -> Result, DeltaTableError> { let writers = std::mem::take(&mut self.arrow_writers); let mut actions = Vec::new(); @@ -362,12 +362,12 @@ impl DeltaWriter> for JsonWriter { let file_size = obj_bytes.len() as i64; self.storage.put(&path, obj_bytes).await?; - actions.push(create_add( + actions.push(Action::Add(create_add( &writer.partition_values, path.to_string(), file_size, &metadata, - )?); + )?)); } Ok(actions) } diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index 9d1cc521f1..800fde607d 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -146,12 +146,12 @@ pub trait DeltaWriter { /// Flush the internal write buffers to files in the delta table folder structure. /// The corresponding delta [`Add`] actions are returned and should be committed via a transaction. - async fn flush(&mut self) -> Result, DeltaTableError>; + async fn flush(&mut self) -> Result, DeltaTableError>; /// Flush the internal write buffers to files in the delta table folder structure. /// and commit the changes to the Delta log, creating a new table version. async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { - let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect(); + let adds: Vec<_> = self.flush().await?.drain(..).collect(); flush_and_commit(adds, table).await } } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index c62fc9b560..551d82a53b 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -79,6 +79,23 @@ impl RecordBatchWriter { }) } + pub fn for_storage( + storage: Arc, + writer_properties: WriterProperties, + schema: ArrowSchemaRef, + partition_columns: Option>, + ) -> Result { + Ok(Self { + storage, + arrow_schema_ref: schema.clone(), + original_schema_ref: schema, + writer_properties, + partition_columns: partition_columns.unwrap_or_default(), + should_evolve: false, + arrow_writers: HashMap::new(), + }) + } + /// Creates a [`RecordBatchWriter`] to write data to provided Delta Table pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema @@ -204,9 +221,10 @@ impl DeltaWriter for RecordBatchWriter { } /// Writes the existing parquet bytes to storage and resets internal state to handle another file. - async fn flush(&mut self) -> Result, DeltaTableError> { + async fn flush(&mut self) -> Result, DeltaTableError> { + use crate::kernel::{Metadata, StructType}; let writers = std::mem::take(&mut self.arrow_writers); - let mut actions = Vec::new(); + let mut actions: Vec = Vec::new(); for (_, writer) in writers { let metadata = writer.arrow_writer.close()?; @@ -217,32 +235,25 @@ impl DeltaWriter for RecordBatchWriter { let file_size = obj_bytes.len() as i64; self.storage.put(&path, obj_bytes).await?; - actions.push(create_add( + actions.push(Action::Add(create_add( &writer.partition_values, path.to_string(), file_size, &metadata, - )?); + )?)); } - Ok(actions) - } - - /// Flush the internal write buffers to files in the delta table folder structure. - /// and commit the changes to the Delta log, creating a new table version. - async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { - use crate::kernel::{Metadata, StructType}; - let mut adds: Vec = self.flush().await?.drain(..).map(Action::Add).collect(); - + if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve { let schema: StructType = self.arrow_schema_ref.clone().try_into()?; // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe // this should just propagate the existing columns in the new action let part_cols: Vec = vec![]; let metadata = Metadata::try_new(schema, part_cols, HashMap::new())?; - adds.push(Action::Metadata(metadata)); + actions.push(Action::Metadata(metadata)); } - super::flush_and_commit(adds, table).await + Ok(actions) } + } /// Helper container for partitioned record batches diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index e8994983f1..a2beb5a611 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -182,6 +182,7 @@ def write_to_deltalake( storage_options: Optional[Dict[str, str]], writer_properties: Optional[Dict[str, Optional[str]]], custom_metadata: Optional[Dict[str, str]], + data_arrow_schema: pyarrow.Schema ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index df76ded806..13ead8ce1a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -312,6 +312,7 @@ def write_deltalake( writer_properties._to_dict() if writer_properties else None ), custom_metadata=custom_metadata, + data_arrow_schema=schema ) if table: table.update_incremental() diff --git a/python/src/lib.rs b/python/src/lib.rs index 1992bae642..0cdeeea5de 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -42,6 +42,7 @@ use deltalake::parquet::errors::ParquetError; use deltalake::parquet::file::properties::WriterProperties; use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; +use deltalake::writer::RecordBatchWriter; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; use pyo3::exceptions::{PyRuntimeError, PyValueError}; @@ -1376,8 +1377,12 @@ fn write_to_deltalake( storage_options: Option>, writer_properties: Option>>, custom_metadata: Option>, + data_arrow_schema: PyArrowType ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); + + + let save_mode = mode.parse().map_err(PythonError::from)?; let options = storage_options.clone().unwrap_or_default(); From f42cd169f52657980a16c2d2bffe19f0346472b2 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Thu, 29 Feb 2024 13:17:39 +0100 Subject: [PATCH 2/4] compiles --- crates/core/src/writer/record_batch.rs | 1 + python/src/lib.rs | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 551d82a53b..8e3710e682 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -79,6 +79,7 @@ impl RecordBatchWriter { }) } + /// Create a new [`RecordBatchWriter`] instance for a given storage and writer_properties pub fn for_storage( storage: Arc, writer_properties: WriterProperties, diff --git a/python/src/lib.rs b/python/src/lib.rs index 0cdeeea5de..22760eb85e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -42,7 +42,6 @@ use deltalake::parquet::errors::ParquetError; use deltalake::parquet::file::properties::WriterProperties; use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; -use deltalake::writer::RecordBatchWriter; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; use pyo3::exceptions::{PyRuntimeError, PyValueError}; @@ -1376,8 +1375,7 @@ fn write_to_deltalake( configuration: Option>>, storage_options: Option>, writer_properties: Option>>, - custom_metadata: Option>, - data_arrow_schema: PyArrowType + custom_metadata: Option> ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); From 08679c494efe9f9a8898d154db21dc30efb97ba8 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Thu, 29 Feb 2024 13:20:11 +0100 Subject: [PATCH 3/4] fmt --- crates/core/src/operations/optimize.rs | 24 ++++++++++++++++++------ crates/core/src/operations/write.rs | 10 +++++++--- crates/core/src/writer/record_batch.rs | 3 +-- python/deltalake/_internal.pyi | 3 +-- python/src/lib.rs | 4 +--- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 1b038c020a..60e6d58530 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -444,8 +444,18 @@ impl MergePlan { // Next, initialize the writer // TODO: task_parameters.input_parameters.target_size in RecordBatchWriter::for_storage - let mut writer = RecordBatchWriter::for_storage(object_store, task_parameters.writer_properties.clone(), - task_parameters.file_schema.clone(), Some(partition_values.keys().into_iter().map(|s|s.to_owned()).collect_vec()))?; + let mut writer = RecordBatchWriter::for_storage( + object_store, + task_parameters.writer_properties.clone(), + task_parameters.file_schema.clone(), + Some( + partition_values + .keys() + .into_iter() + .map(|s| s.to_owned()) + .collect_vec(), + ), + )?; let mut read_stream = read_stream.await?; @@ -464,15 +474,17 @@ impl MergePlan { // add.partition_values = partition_values.into(); TODO: Required? add.data_change = false; let size = add.size; - + partial_metrics.num_files_added += 1; partial_metrics.files_added.total_files += 1; partial_metrics.files_added.total_size += size; - partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size); - partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size); + partial_metrics.files_added.max = + std::cmp::max(partial_metrics.files_added.max, size); + partial_metrics.files_added.min = + std::cmp::min(partial_metrics.files_added.min, size); Action::Add(add.clone()) } - o => o.clone() + o => o.clone(), } }); partial_actions.extend(add_actions); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 6bd0d43883..c01e5f04bf 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -342,9 +342,13 @@ async fn write_execution_plan_with_predicate( let inner_plan = plan.clone(); let inner_schema = schema.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); - - let mut writer = RecordBatchWriter::for_storage(object_store.clone(), - writer_properties.clone().unwrap_or_default(), inner_schema.clone(), Some(partition_columns.clone()))?; + + let mut writer = RecordBatchWriter::for_storage( + object_store.clone(), + writer_properties.clone().unwrap_or_default(), + inner_schema.clone(), + Some(partition_columns.clone()), + )?; let checker_stream = checker.clone(); let mut stream = inner_plan.execute(i, task_ctx)?; let handle: tokio::task::JoinHandle>> = diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 8e3710e682..0d8a10a8a4 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -243,7 +243,7 @@ impl DeltaWriter for RecordBatchWriter { &metadata, )?)); } - + if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve { let schema: StructType = self.arrow_schema_ref.clone().try_into()?; // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe @@ -254,7 +254,6 @@ impl DeltaWriter for RecordBatchWriter { } Ok(actions) } - } /// Helper container for partitioned record batches diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index a2beb5a611..c75b0bdb35 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -181,8 +181,7 @@ def write_to_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], writer_properties: Optional[Dict[str, Optional[str]]], - custom_metadata: Optional[Dict[str, str]], - data_arrow_schema: pyarrow.Schema + custom_metadata: Optional[Dict[str, str]] ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/src/lib.rs b/python/src/lib.rs index 22760eb85e..90637e07fd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1375,12 +1375,10 @@ fn write_to_deltalake( configuration: Option>>, storage_options: Option>, writer_properties: Option>>, - custom_metadata: Option> + custom_metadata: Option>, ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); - - let save_mode = mode.parse().map_err(PythonError::from)?; let options = storage_options.clone().unwrap_or_default(); From 8626044750316f861ee4b610742075bd483a9c16 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Thu, 29 Feb 2024 13:33:55 +0100 Subject: [PATCH 4/4] clippy --- crates/core/src/operations/optimize.rs | 1 - crates/core/src/operations/write.rs | 6 +++--- crates/core/src/writer/json.rs | 12 ++++++++++-- crates/core/src/writer/mod.rs | 2 +- crates/core/src/writer/record_batch.rs | 2 +- python/deltalake/writer.py | 3 +-- 6 files changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 60e6d58530..aa12e8b1f0 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -451,7 +451,6 @@ impl MergePlan { Some( partition_values .keys() - .into_iter() .map(|s| s.to_owned()) .collect_vec(), ), diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index c01e5f04bf..227bca6f14 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -307,8 +307,8 @@ async fn write_execution_plan_with_predicate( plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, - target_file_size: Option, - write_batch_size: Option, + target_file_size: Option, // TODO: Implement this in Record Batch Writer + write_batch_size: Option, // TODO: Implement this in Record Batch Writer writer_properties: Option, safe_cast: bool, overwrite_schema: bool, @@ -643,7 +643,7 @@ impl std::future::IntoFuture for WriteBuilder { this.overwrite_schema, ) .await?; - actions.extend(add_actions.into_iter()); + actions.extend(add_actions); // Collect remove actions if we are overwriting the table if let Some(snapshot) = &this.snapshot { diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index f62a87907c..e3b05ec107 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -24,7 +24,7 @@ use super::utils::{ }; use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; -use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; +use crate::kernel::{Action, PartitionsExt, Scalar, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -469,7 +469,15 @@ mod tests { writer.write(vec![data]).await.unwrap(); let add_actions = writer.flush().await.unwrap(); - let add = &add_actions[0]; + let action = &add_actions[0]; + let add = match action { + Action::Add(add) => { + add + } + _ => { + assert!(false, "Expected Add action"); + } + }; let path = table_dir.path().join(&add.path); let file = File::open(path.as_path()).unwrap(); diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index 800fde607d..4cdf237956 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -7,7 +7,7 @@ use parquet::errors::ParquetError; use serde_json::Value; use crate::errors::DeltaTableError; -use crate::kernel::{Action, Add}; +use crate::kernel::{Action}; use crate::operations::transaction::commit; use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode}; use crate::DeltaTable; diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 0d8a10a8a4..a221ed9d28 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -28,7 +28,7 @@ use super::utils::{ }; use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; -use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; +use crate::kernel::{Action, PartitionsExt, Scalar, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 13ead8ce1a..f6b588cb3a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -311,8 +311,7 @@ def write_deltalake( writer_properties=( writer_properties._to_dict() if writer_properties else None ), - custom_metadata=custom_metadata, - data_arrow_schema=schema + custom_metadata=custom_metadata ) if table: table.update_incremental()