diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs new file mode 100644 index 000000000..ba198821a --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -0,0 +1,589 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provide `EqualityDeleteWriter`. + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; +use itertools::Itertools; + +use crate::spec::{DataFile, Struct}; +use crate::writer::file_writer::FileWriter; +use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `EqualityDeleteWriter`. +#[derive(Clone)] +pub struct EqualityDeleteFileWriterBuilder { + inner: B, +} + +impl EqualityDeleteFileWriterBuilder { + /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B) -> Self { + Self { inner } + } +} + +/// Config for `EqualityDeleteWriter`. +pub struct EqualityDeleteWriterConfig { + equality_ids: Vec, + projector: FieldProjector, + schema: SchemaRef, + partition_value: Struct, +} + +impl EqualityDeleteWriterConfig { + /// Create a new `DataFileWriterConfig` with equality ids. + pub fn new( + equality_ids: Vec, + projector: FieldProjector, + schema: Schema, + partition_value: Option, + ) -> Self { + Self { + equality_ids, + projector, + schema: schema.into(), + partition_value: partition_value.unwrap_or(Struct::empty()), + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { + type R = EqualityDeleteFileWriter; + type C = EqualityDeleteWriterConfig; + + async fn build(self, config: Self::C) -> Result { + Ok(EqualityDeleteFileWriter { + inner_writer: Some(self.inner.clone().build().await?), + projector: config.projector, + delete_schema_ref: config.schema, + equality_ids: config.equality_ids, + partition_value: config.partition_value, + }) + } +} + +/// A writer write data +pub struct EqualityDeleteFileWriter { + inner_writer: Option, + projector: FieldProjector, + delete_schema_ref: SchemaRef, + equality_ids: Vec, + partition_value: Struct, +} + +impl EqualityDeleteFileWriter { + fn project_record_batch_columns(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.delete_schema_ref.clone(), + self.projector.project(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeleteFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let batch = self.project_record_batch_columns(batch)?; + if let Some(writer) = self.inner_writer.as_mut() { + writer.write(&batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.inner_writer.take() { + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::EqualityDeletes); + res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.partition(self.partition_value.clone()); + res.build().expect("msg") + }) + .collect_vec()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } + } +} + +/// Help to project specific field from `RecordBatch`` according to the column id. +pub struct FieldProjector { + index_vec_vec: Vec>, +} + +impl FieldProjector { + /// Init FieldProjector + pub fn new( + batch_fields: &Fields, + column_ids: &[usize], + column_id_meta_key: &str, + ) -> Result<(Self, Fields)> { + let mut index_vec_vec = Vec::with_capacity(column_ids.len()); + let mut fields = Vec::with_capacity(column_ids.len()); + for &id in column_ids { + let mut index_vec = vec![]; + if let Ok(field) = Self::fetch_column_index( + batch_fields, + &mut index_vec, + id as i64, + column_id_meta_key, + ) { + fields.push(field.clone()); + index_vec_vec.push(index_vec); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Can't find source column id or column data type invalid: {}", + id + ), + )); + } + } + Ok((Self { index_vec_vec }, Fields::from_iter(fields))) + } + + fn fetch_column_index( + fields: &Fields, + index_vec: &mut Vec, + col_id: i64, + column_id_meta_key: &str, + ) -> Result { + for (pos, field) in fields.iter().enumerate() { + match field.data_type() { + DataType::Float16 | DataType::Float32 | DataType::Float64 => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete column data type cannot be float or double", + )); + } + _ => { + let id: i64 = field + .metadata() + .get(column_id_meta_key) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "column_id must be set"))? + .parse::() + .map_err(|_| { + Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") + })?; + if col_id == id { + index_vec.push(pos); + return Ok(field.clone()); + } + if let DataType::Struct(inner) = field.data_type() { + let res = + Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); + if !index_vec.is_empty() { + index_vec.push(pos); + return res; + } + } + } + } + } + Err(Error::new( + ErrorKind::DataInvalid, + "Column id not found in fields", + )) + } + /// Do projection with batch + pub fn project(&self, batch: &[ArrayRef]) -> Result> { + self.index_vec_vec + .iter() + .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) + .collect::>>() + } + + fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result { + let mut rev_iterator = index_vec.iter().rev(); + let mut array = batch[*rev_iterator.next().unwrap()].clone(); + for idx in rev_iterator { + array = array + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Cannot convert Array to StructArray", + ))? + .column(*idx) + .clone(); + } + Ok(array) + } +} + +#[cfg(test)] +mod test { + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; + use parquet::{ + arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, PARQUET_FIELD_ID_META_KEY}, + file::properties::WriterProperties, + }; + use tempfile::TempDir; + + use crate::{ + io::{FileIO, FileIOBuilder}, + spec::{DataFile, DataFileFormat}, + writer::{ + base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, FieldProjector, + }, + file_writer::{ + location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, + ParquetWriterBuilder, + }, + IcebergWriter, IcebergWriterBuilder, + }, + }; + + async fn check_parquet_data_file_with_equality_delete_write( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + // read the written file + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + + for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { + metadata + .row_groups() + .iter() + .map(|group| group.columns()) + .for_each(|column| { + assert_eq!( + *data_file.column_sizes.get(id).unwrap() as i64, + column.get(index).unwrap().compressed_size() + ); + }); + } + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + for (index, id) in data_file.null_value_counts().keys().enumerate() { + let expect = batch.column(index).null_count() as u64; + assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); + } + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } + + #[tokio::test] + async fn test_equality_delete_writer() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new( + "col1", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new( + "col3", + arrow_schema::DataType::List(Arc::new( + arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new( + "col4", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]) + .into_parts(); + arrow_array::ListArray::new( + Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )]))), + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; + let col4 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + vec![Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let columns = vec![col0, col1, col2, col3, col4]; + + let equality_ids = vec![1, 3]; + let (projector, fields) = + FieldProjector::new(schema.fields(), &equality_ids, PARQUET_FIELD_ID_META_KEY)?; + let delete_schema = arrow_schema::Schema::new(fields); + let delete_schema_ref = Arc::new(delete_schema.clone()); + + // prepare writer + let to_write = RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap(); + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + delete_schema_ref.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) + .build(EqualityDeleteWriterConfig::new( + equality_ids, + projector, + delete_schema.clone(), + None, + )) + .await?; + // write + equality_delete_writer.write(to_write.clone()).await?; + let res = equality_delete_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); + + // check + let to_write_projected = equality_delete_writer.project_record_batch_columns(to_write)?; + check_parquet_data_file_with_equality_delete_write( + &file_io, + &data_file, + &to_write_projected, + ) + .await; + Ok(()) + } + + #[tokio::test] + async fn test_equality_delete_float_or_double_column() -> Result<(), anyhow::Error> { + // Float32, Float64 + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Float32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new("col1", arrow_schema::DataType::Float64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + + let equality_id_float = vec![0]; + let result_float = FieldProjector::new( + schema.fields(), + &equality_id_float, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_float.is_err()); + + let equality_ids_double = vec![1]; + let result_double = FieldProjector::new( + schema.fields(), + &equality_ids_double, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_double.is_err()); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37da2ab81..37ab97eb6 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -18,3 +18,4 @@ //! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`. pub mod data_file_writer; +pub mod equality_delete_writer;