From fb5edee5f64a283ac3f88c5d81d7a82d99d59cc9 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Fri, 3 May 2024 23:13:27 +0800 Subject: [PATCH 01/12] feat: add EqualityDeleteWriter --- .../base_writer/equality_delete_writer.rs | 200 ++++++++++++++++++ crates/iceberg/src/writer/base_writer/mod.rs | 1 + 2 files changed, 201 insertions(+) create mode 100644 crates/iceberg/src/writer/base_writer/equality_delete_writer.rs diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs new file mode 100644 index 000000000..a388dbd67 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provide `EqualityDeleteWriter`. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; +use itertools::Itertools; +use parquet::arrow::ProjectionMask; + +use crate::spec::DataFile; +use crate::writer::file_writer::FileWriter; +use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `EqualityDeleteWriter`. +#[derive(Clone)] +pub struct EqualityDeleteFileWriterBuilder { + inner: B, +} + +impl EqualityDeleteFileWriterBuilder { + /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B) -> Self { + Self { inner } + } +} + +/// Config for `EqualityDeleteWriter`. +pub struct EqualityDeleteWriterConfig { + equality_ids: Vec, + schema: SchemaRef, + column_id_meta_key: String, +} + +impl EqualityDeleteWriterConfig { + /// Create a new `DataFileWriterConfig` with equality ids. + pub fn new(equality_ids: Vec, schema: Schema, column_id_meta_key: &str) -> Self { + Self { + equality_ids, + schema: schema.into(), + column_id_meta_key: column_id_meta_key.to_string(), + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { + type R = EqualityDeleteFileWriter; + type C = EqualityDeleteWriterConfig; + + async fn build(self, config: Self::C) -> Result { + let (projector, fields) = FieldProjector::new( + config.schema.fields(), + &config.equality_ids, + &config.column_id_meta_key, + )?; + let delete_schema = Arc::new(arrow_schema::Schema::new(fields)); + Ok(EqualityDeleteFileWriter { + inner_writer: Some(self.inner.clone().build().await?), + projector, + delete_schema: delete_schema, + equality_ids: config.equality_ids, + }) + } +} + +/// A writer write data +pub struct EqualityDeleteFileWriter { + inner_writer: Option, + projector: FieldProjector, + delete_schema: SchemaRef, + equality_ids: Vec, +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeleteFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let batch = RecordBatch::try_new( + self.delete_schema.clone(), + self.projector.project(batch.columns()), + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))?; + self.inner_writer.as_mut().unwrap().write(&batch).await + } + + async fn close(&mut self) -> Result> { + let writer = self.inner_writer.take().unwrap(); + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::EqualityDeletes); + res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.build().expect("msg") + }) + .collect_vec()) + } +} + +/// Help to project specific field from `RecordBatch`` according to the column id. +pub struct FieldProjector { + index_vec_vec: Vec>, +} + +impl FieldProjector { + /// Init FieldProjector + pub fn new( + batch_fields: &Fields, + column_ids: &[usize], + column_id_meta_key: &str, + ) -> Result<(Self, Fields)> { + let mut index_vec_vec = Vec::with_capacity(column_ids.len()); + let mut fields = Vec::with_capacity(column_ids.len()); + for &id in column_ids { + let mut index_vec = vec![]; + if let Some(field) = Self::fetch_column_index( + batch_fields, + &mut index_vec, + id as i64, + column_id_meta_key, + ) { + fields.push(field.clone()); + index_vec_vec.push(index_vec); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Can't find source column id: {}", id), + )); + } + } + Ok((Self { index_vec_vec }, Fields::from_iter(fields))) + } + + fn fetch_column_index( + fields: &Fields, + index_vec: &mut Vec, + col_id: i64, + column_id_meta_key: &str, + ) -> Option { + for (pos, field) in fields.iter().enumerate() { + let id: i64 = field + .metadata() + .get(column_id_meta_key) + .expect("column_id must be set") + .parse() + .expect("column_id must can be parse as i64"); + if col_id == id { + index_vec.push(pos); + return Some(field.clone()); + } + if let DataType::Struct(inner) = field.data_type() { + let res = Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); + if !index_vec.is_empty() { + index_vec.push(pos); + return res; + } + } + } + None + } + /// Do projection with batch + pub fn project(&self, batch: &[ArrayRef]) -> Vec { + self.index_vec_vec + .iter() + .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) + .collect_vec() + } + + fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> ArrayRef { + let mut rev_iterator = index_vec.iter().rev(); + let mut array = batch[*rev_iterator.next().unwrap()].clone(); + for idx in rev_iterator { + array = array + .as_any() + .downcast_ref::() + .unwrap() + .column(*idx) + .clone(); + } + array + } +} diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37da2ab81..37ab97eb6 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -18,3 +18,4 @@ //! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`. pub mod data_file_writer; +pub mod equality_delete_writer; From cc793cbc11f335ae869239ba6851a2ee57874745 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Sat, 4 May 2024 22:31:54 +0800 Subject: [PATCH 02/12] WIP: add test cases --- .../base_writer/equality_delete_writer.rs | 219 +++++++++++++++++- 1 file changed, 217 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index a388dbd67..cae331afb 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch, StructArray}; use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; use itertools::Itertools; -use parquet::arrow::ProjectionMask; use crate::spec::DataFile; use crate::writer::file_writer::FileWriter; @@ -75,7 +74,7 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil Ok(EqualityDeleteFileWriter { inner_writer: Some(self.inner.clone().build().await?), projector, - delete_schema: delete_schema, + delete_schema, equality_ids: config.equality_ids, }) } @@ -198,3 +197,219 @@ impl FieldProjector { array } } + +#[cfg(test)] +mod test { + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; + use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties}; + use tempfile::TempDir; + + use crate::{ + io::FileIOBuilder, + spec::DataFileFormat, + writer::{ + base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder, + file_writer::{ + location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, + ParquetWriterBuilder, + }, + IcebergWriter, IcebergWriterBuilder, + }, + }; + + use super::EqualityDeleteWriterConfig; + + #[tokio::test] + async fn test_equality_delete_writer() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new( + "col1", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new( + "col3", + arrow_schema::DataType::List(Arc::new( + arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new( + "col4", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]) + .into_parts(); + arrow_array::ListArray::new( + Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )]))), + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; + let col4 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )]))] + .into(), + vec![Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let to_write = + RecordBatch::try_new(Arc::new(schema.clone()), vec![col0, col1, col2, col3, col4]) + .unwrap(); + + // prepare writer + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + to_write.schema(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let equality_ids = vec![1, 3]; + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) + .build(EqualityDeleteWriterConfig::new( + equality_ids, + schema.clone(), + PARQUET_FIELD_ID_META_KEY, + )) + .await?; + // write + equality_delete_writer.write(to_write.clone()).await?; + let res = equality_delete_writer.close().await?; + assert_eq!(res.len(), 1); + let _data_file = res.into_iter().next().unwrap(); + + // check + // check_parquet_data_file(&file_io, &data_file, &to_write).await; + Ok(()) + } +} From 974ae0cef8dcbd0b18c211b590220a627feb0cca Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Mon, 13 May 2024 14:02:06 +0800 Subject: [PATCH 03/12] fix: move delete schema out of writer --- .../base_writer/equality_delete_writer.rs | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index cae331afb..622a90d6d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -17,13 +17,11 @@ //! This module provide `EqualityDeleteWriter`. -use std::sync::Arc; - use arrow_array::{ArrayRef, RecordBatch, StructArray}; use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; use itertools::Itertools; -use crate::spec::DataFile; +use crate::spec::{DataFile, Struct}; use crate::writer::file_writer::FileWriter; use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; @@ -44,17 +42,24 @@ impl EqualityDeleteFileWriterBuilder { /// Config for `EqualityDeleteWriter`. pub struct EqualityDeleteWriterConfig { equality_ids: Vec, + projector: FieldProjector, schema: SchemaRef, - column_id_meta_key: String, + partition_value: Struct, } impl EqualityDeleteWriterConfig { /// Create a new `DataFileWriterConfig` with equality ids. - pub fn new(equality_ids: Vec, schema: Schema, column_id_meta_key: &str) -> Self { + pub fn new( + equality_ids: Vec, + projector: FieldProjector, + schema: Schema, + partition_value: Option, + ) -> Self { Self { equality_ids, + projector, schema: schema.into(), - column_id_meta_key: column_id_meta_key.to_string(), + partition_value: partition_value.unwrap_or(Struct::empty()), } } } @@ -65,17 +70,12 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil type C = EqualityDeleteWriterConfig; async fn build(self, config: Self::C) -> Result { - let (projector, fields) = FieldProjector::new( - config.schema.fields(), - &config.equality_ids, - &config.column_id_meta_key, - )?; - let delete_schema = Arc::new(arrow_schema::Schema::new(fields)); Ok(EqualityDeleteFileWriter { inner_writer: Some(self.inner.clone().build().await?), - projector, - delete_schema, + projector: config.projector, + delete_schema_ref: config.schema, equality_ids: config.equality_ids, + partition_value: config.partition_value, }) } } @@ -84,18 +84,25 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil pub struct EqualityDeleteFileWriter { inner_writer: Option, projector: FieldProjector, - delete_schema: SchemaRef, + delete_schema_ref: SchemaRef, equality_ids: Vec, + partition_value: Struct, +} + +impl EqualityDeleteFileWriter { + fn project_record_batch_columns(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.delete_schema_ref.clone(), + self.projector.project(batch.columns()), + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } } #[async_trait::async_trait] impl IcebergWriter for EqualityDeleteFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { - let batch = RecordBatch::try_new( - self.delete_schema.clone(), - self.projector.project(batch.columns()), - ) - .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))?; + let batch = self.project_record_batch_columns(batch)?; self.inner_writer.as_mut().unwrap().write(&batch).await } @@ -108,6 +115,7 @@ impl IcebergWriter for EqualityDeleteFileWriter { .map(|mut res| { res.content(crate::spec::DataContentType::EqualityDeletes); res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.partition(self.partition_value.clone()); res.build().expect("msg") }) .collect_vec()) From ca351610ce68fd2f43ead537506c5cc6f8938256 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Mon, 13 May 2024 14:02:19 +0800 Subject: [PATCH 04/12] test: add test case for equality delete writer --- .../base_writer/equality_delete_writer.rs | 35 +++++--- crates/iceberg/src/writer/mod.rs | 81 +++++++++++++++++++ 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 622a90d6d..696e5303c 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -218,11 +218,14 @@ mod test { io::FileIOBuilder, spec::DataFileFormat, writer::{ - base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder, + base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, FieldProjector, + }, file_writer::{ location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, ParquetWriterBuilder, }, + tests::check_parquet_data_file_with_equality_delete_write, IcebergWriter, IcebergWriterBuilder, }, }; @@ -390,34 +393,46 @@ mod test { ))], None, )); - let to_write = - RecordBatch::try_new(Arc::new(schema.clone()), vec![col0, col1, col2, col3, col4]) - .unwrap(); + let columns = vec![col0, col1, col2, col3, col4]; + + let equality_ids = vec![1, 3]; + let (projector, fields) = + FieldProjector::new(schema.fields(), &equality_ids, PARQUET_FIELD_ID_META_KEY)?; + let delete_schema = arrow_schema::Schema::new(fields); + let delete_schema_ref = Arc::new(delete_schema.clone()); // prepare writer + let to_write = RecordBatch::try_new(Arc::new(schema.clone()), columns).unwrap(); let pb = ParquetWriterBuilder::new( WriterProperties::builder().build(), - to_write.schema(), + delete_schema_ref.clone(), file_io.clone(), location_gen, file_name_gen, ); - let equality_ids = vec![1, 3]; + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) .build(EqualityDeleteWriterConfig::new( equality_ids, - schema.clone(), - PARQUET_FIELD_ID_META_KEY, + projector, + delete_schema.clone(), + None, )) .await?; // write equality_delete_writer.write(to_write.clone()).await?; let res = equality_delete_writer.close().await?; assert_eq!(res.len(), 1); - let _data_file = res.into_iter().next().unwrap(); + let data_file = res.into_iter().next().unwrap(); // check - // check_parquet_data_file(&file_io, &data_file, &to_write).await; + let to_write_projected = equality_delete_writer.project_record_batch_columns(to_write)?; + check_parquet_data_file_with_equality_delete_write( + &file_io, + &data_file, + &to_write_projected, + ) + .await; Ok(()) } } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 5f3ae5581..0aea6f36d 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -92,6 +92,7 @@ pub trait CurrentFileStatus { #[cfg(test)] mod tests { + use arrow_array::RecordBatch; use arrow_schema::Schema; use arrow_select::concat::concat_batches; @@ -185,4 +186,84 @@ mod tests { assert_eq!(v, expect); }); } + + pub(crate) async fn check_parquet_data_file_with_equality_delete_write( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let mut input_file = file_io + .new_input(data_file.file_path.clone()) + .unwrap() + .reader() + .await + .unwrap(); + let mut res = vec![]; + let file_size = input_file.read_to_end(&mut res).await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, file_size as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + + for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { + metadata + .row_groups() + .iter() + .map(|group| group.columns()) + .for_each(|column| { + assert_eq!( + *data_file.column_sizes.get(id).unwrap() as i64, + column.get(index).unwrap().compressed_size() + ); + }); + } + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + for (index, id) in data_file.null_value_counts().keys().enumerate() { + let expect = batch.column(index).null_count() as u64; + assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); + } + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } } From 76aba61805a2e5caba1291b999cdf44980f95f5c Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 22 May 2024 15:53:37 +0800 Subject: [PATCH 05/12] fix: refactor projector --- .../base_writer/equality_delete_writer.rs | 152 +++++++++++++++--- crates/iceberg/src/writer/mod.rs | 80 --------- 2 files changed, 129 insertions(+), 103 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 696e5303c..01d8b770d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -138,7 +138,7 @@ impl FieldProjector { let mut fields = Vec::with_capacity(column_ids.len()); for &id in column_ids { let mut index_vec = vec![]; - if let Some(field) = Self::fetch_column_index( + if let Ok(field) = Self::fetch_column_index( batch_fields, &mut index_vec, id as i64, @@ -149,7 +149,10 @@ impl FieldProjector { } else { return Err(Error::new( ErrorKind::DataInvalid, - format!("Can't find source column id: {}", id), + format!( + "Can't find source column id or column data type invalid: {}", + id + ), )); } } @@ -161,28 +164,45 @@ impl FieldProjector { index_vec: &mut Vec, col_id: i64, column_id_meta_key: &str, - ) -> Option { + ) -> Result { for (pos, field) in fields.iter().enumerate() { - let id: i64 = field - .metadata() - .get(column_id_meta_key) - .expect("column_id must be set") - .parse() - .expect("column_id must can be parse as i64"); - if col_id == id { - index_vec.push(pos); - return Some(field.clone()); - } - if let DataType::Struct(inner) = field.data_type() { - let res = Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); - if !index_vec.is_empty() { - index_vec.push(pos); - return res; + match field.data_type() { + DataType::Float16 | DataType::Float32 | DataType::Float64 => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete column data type cannot be float or double", + )); + } + _ => { + let id = field + .metadata() + .get(column_id_meta_key) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "column_id must be set"))? + .parse::() + .map_err(|_| { + Error::new(ErrorKind::DataInvalid, "column_id must be parsable as i64") + })?; + if col_id == id { + index_vec.push(pos); + return Ok(field.clone()); + } + if let DataType::Struct(inner) = field.data_type() { + let res = + Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key)?; + if !index_vec.is_empty() { + index_vec.push(pos); + } + return Ok(res); + } } } } - None + Err(Error::new( + ErrorKind::DataInvalid, + "Column ID not found in fields", + )) } + /// Do projection with batch pub fn project(&self, batch: &[ArrayRef]) -> Vec { self.index_vec_vec @@ -208,15 +228,22 @@ impl FieldProjector { #[cfg(test)] mod test { + use arrow_select::concat::concat_batches; + use bytes::Bytes; + use futures::AsyncReadExt; + use itertools::Itertools; use std::{collections::HashMap, sync::Arc}; use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; - use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties}; + use parquet::{ + arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, PARQUET_FIELD_ID_META_KEY}, + file::properties::WriterProperties, + }; use tempfile::TempDir; use crate::{ - io::FileIOBuilder, - spec::DataFileFormat, + io::{FileIO, FileIOBuilder}, + spec::{DataFile, DataFileFormat}, writer::{ base_writer::equality_delete_writer::{ EqualityDeleteFileWriterBuilder, FieldProjector, @@ -225,13 +252,92 @@ mod test { location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, ParquetWriterBuilder, }, - tests::check_parquet_data_file_with_equality_delete_write, IcebergWriter, IcebergWriterBuilder, }, }; use super::EqualityDeleteWriterConfig; + pub(crate) async fn check_parquet_data_file_with_equality_delete_write( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let mut input_file = file_io + .new_input(data_file.file_path.clone()) + .unwrap() + .reader() + .await + .unwrap(); + let mut res = vec![]; + let file_size = input_file.read_to_end(&mut res).await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, file_size as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + + for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { + metadata + .row_groups() + .iter() + .map(|group| group.columns()) + .for_each(|column| { + assert_eq!( + *data_file.column_sizes.get(id).unwrap() as i64, + column.get(index).unwrap().compressed_size() + ); + }); + } + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + for (index, id) in data_file.null_value_counts().keys().enumerate() { + let expect = batch.column(index).null_count() as u64; + assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); + } + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } + #[tokio::test] async fn test_equality_delete_writer() -> Result<(), anyhow::Error> { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 0aea6f36d..48c01d444 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -186,84 +186,4 @@ mod tests { assert_eq!(v, expect); }); } - - pub(crate) async fn check_parquet_data_file_with_equality_delete_write( - file_io: &FileIO, - data_file: &DataFile, - batch: &RecordBatch, - ) { - assert_eq!(data_file.file_format, DataFileFormat::Parquet); - - // read the written file - let mut input_file = file_io - .new_input(data_file.file_path.clone()) - .unwrap() - .reader() - .await - .unwrap(); - let mut res = vec![]; - let file_size = input_file.read_to_end(&mut res).await.unwrap(); - let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); - let metadata = reader_builder.metadata().clone(); - - // check data - let reader = reader_builder.build().unwrap(); - let batches = reader.map(|batch| batch.unwrap()).collect::>(); - let res = concat_batches(&batch.schema(), &batches).unwrap(); - assert_eq!(*batch, res); - - // check metadata - let expect_column_num = batch.num_columns(); - - assert_eq!( - data_file.record_count, - metadata - .row_groups() - .iter() - .map(|group| group.num_rows()) - .sum::() as u64 - ); - - assert_eq!(data_file.file_size_in_bytes, file_size as u64); - - assert_eq!(data_file.column_sizes.len(), expect_column_num); - - for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { - metadata - .row_groups() - .iter() - .map(|group| group.columns()) - .for_each(|column| { - assert_eq!( - *data_file.column_sizes.get(id).unwrap() as i64, - column.get(index).unwrap().compressed_size() - ); - }); - } - - assert_eq!(data_file.value_counts.len(), expect_column_num); - data_file.value_counts.iter().for_each(|(_, &v)| { - let expect = metadata - .row_groups() - .iter() - .map(|group| group.num_rows()) - .sum::() as u64; - assert_eq!(v, expect); - }); - - for (index, id) in data_file.null_value_counts().keys().enumerate() { - let expect = batch.column(index).null_count() as u64; - assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); - } - - assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); - data_file - .split_offsets - .iter() - .enumerate() - .for_each(|(i, &v)| { - let expect = metadata.row_groups()[i].file_offset().unwrap(); - assert_eq!(v, expect); - }); - } } From 66d4ddf754adb65c1556f27bc6229f5f0f1756c9 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 22 May 2024 16:35:06 +0800 Subject: [PATCH 06/12] fix: fix projector --- .../src/writer/base_writer/equality_delete_writer.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 01d8b770d..ee425c1c0 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -174,7 +174,7 @@ impl FieldProjector { )); } _ => { - let id = field + let id: i64 = field .metadata() .get(column_id_meta_key) .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "column_id must be set"))? @@ -188,21 +188,20 @@ impl FieldProjector { } if let DataType::Struct(inner) = field.data_type() { let res = - Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key)?; + Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key); if !index_vec.is_empty() { index_vec.push(pos); + return res; } - return Ok(res); } } } } Err(Error::new( ErrorKind::DataInvalid, - "Column ID not found in fields", + "Column id not found in fields", )) } - /// Do projection with batch pub fn project(&self, batch: &[ArrayRef]) -> Vec { self.index_vec_vec From c424edc41bb4e3891163097447892e4e7ef59812 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 22 May 2024 16:37:19 +0800 Subject: [PATCH 07/12] fix: add result --- .../writer/base_writer/equality_delete_writer.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index ee425c1c0..74e00b0d8 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -93,7 +93,7 @@ impl EqualityDeleteFileWriter { fn project_record_batch_columns(&self, batch: RecordBatch) -> Result { RecordBatch::try_new( self.delete_schema_ref.clone(), - self.projector.project(batch.columns()), + self.projector.project(batch.columns())?, ) .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) } @@ -203,25 +203,28 @@ impl FieldProjector { )) } /// Do projection with batch - pub fn project(&self, batch: &[ArrayRef]) -> Vec { + pub fn project(&self, batch: &[ArrayRef]) -> Result> { self.index_vec_vec .iter() .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec)) - .collect_vec() + .collect::>>() } - fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> ArrayRef { + fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result { let mut rev_iterator = index_vec.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); for idx in rev_iterator { array = array .as_any() .downcast_ref::() - .unwrap() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Cannot convert Array to StructArray", + ))? .column(*idx) .clone(); } - array + Ok(array) } } From e449a4ff431d6797aef3f960c17f92ddfaacb3d4 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 29 May 2024 13:26:28 +0800 Subject: [PATCH 08/12] test: add float and double column test for equality delete writer --- .../base_writer/equality_delete_writer.rs | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 74e00b0d8..65aa31ab7 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -230,6 +230,7 @@ impl FieldProjector { #[cfg(test)] mod test { + use anyhow::Ok; use arrow_select::concat::concat_batches; use bytes::Bytes; use futures::AsyncReadExt; @@ -260,7 +261,7 @@ mod test { use super::EqualityDeleteWriterConfig; - pub(crate) async fn check_parquet_data_file_with_equality_delete_write( + async fn check_parquet_data_file_with_equality_delete_write( file_io: &FileIO, data_file: &DataFile, batch: &RecordBatch, @@ -543,4 +544,42 @@ mod test { .await; Ok(()) } + + #[tokio::test] + async fn test_equality_delete_float_or_double_column() -> Result<(), anyhow::Error> { + // Float32, Float64 + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Float32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new("col1", arrow_schema::DataType::Float64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ]; + arrow_schema::Schema::new(fields) + }; + + let equality_id_float = vec![0]; + let result_float = FieldProjector::new( + schema.fields(), + &equality_id_float, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_float.is_err()); + + let equality_ids_double = vec![1]; + let result_double = FieldProjector::new( + schema.fields(), + &equality_ids_double, + PARQUET_FIELD_ID_META_KEY, + ); + assert!(result_double.is_err()); + + Ok(()) + } } From faf24ba1d58ff43dbd7fe1666ec6c8465fdcd706 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 29 May 2024 14:13:04 +0800 Subject: [PATCH 09/12] fmt --- .../iceberg/src/writer/base_writer/equality_delete_writer.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 65aa31ab7..6a7936e06 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -230,7 +230,6 @@ impl FieldProjector { #[cfg(test)] mod test { - use anyhow::Ok; use arrow_select::concat::concat_batches; use bytes::Bytes; use futures::AsyncReadExt; @@ -249,7 +248,7 @@ mod test { spec::{DataFile, DataFileFormat}, writer::{ base_writer::equality_delete_writer::{ - EqualityDeleteFileWriterBuilder, FieldProjector, + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, FieldProjector, }, file_writer::{ location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, @@ -259,8 +258,6 @@ mod test { }, }; - use super::EqualityDeleteWriterConfig; - async fn check_parquet_data_file_with_equality_delete_write( file_io: &FileIO, data_file: &DataFile, From a64660f0f8f0a15fde6936ae1a60c44deab59d69 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 29 May 2024 14:24:18 +0800 Subject: [PATCH 10/12] fix: compatibility with #364 --- .../base_writer/equality_delete_writer.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 6a7936e06..b07474bb5 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -232,7 +232,6 @@ impl FieldProjector { mod test { use arrow_select::concat::concat_batches; use bytes::Bytes; - use futures::AsyncReadExt; use itertools::Itertools; use std::{collections::HashMap, sync::Arc}; @@ -266,15 +265,11 @@ mod test { assert_eq!(data_file.file_format, DataFileFormat::Parquet); // read the written file - let mut input_file = file_io - .new_input(data_file.file_path.clone()) - .unwrap() - .reader() - .await - .unwrap(); - let mut res = vec![]; - let file_size = input_file.read_to_end(&mut res).await.unwrap(); - let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + // read the written file + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); let metadata = reader_builder.metadata().clone(); // check data @@ -295,7 +290,7 @@ mod test { .sum::() as u64 ); - assert_eq!(data_file.file_size_in_bytes, file_size as u64); + assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64); assert_eq!(data_file.column_sizes.len(), expect_column_num); From 8a28002d80f5dc8e8e9ecfc65a1620d8f5e22b14 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 29 May 2024 15:01:09 +0800 Subject: [PATCH 11/12] fix: remove unwrap --- .../base_writer/equality_delete_writer.rs | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index b07474bb5..ba198821a 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -103,22 +103,35 @@ impl EqualityDeleteFileWriter { impl IcebergWriter for EqualityDeleteFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { let batch = self.project_record_batch_columns(batch)?; - self.inner_writer.as_mut().unwrap().write(&batch).await + if let Some(writer) = self.inner_writer.as_mut() { + writer.write(&batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } } async fn close(&mut self) -> Result> { - let writer = self.inner_writer.take().unwrap(); - Ok(writer - .close() - .await? - .into_iter() - .map(|mut res| { - res.content(crate::spec::DataContentType::EqualityDeletes); - res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); - res.partition(self.partition_value.clone()); - res.build().expect("msg") - }) - .collect_vec()) + if let Some(writer) = self.inner_writer.take() { + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::EqualityDeletes); + res.equality_ids(self.equality_ids.iter().map(|id| *id as i32).collect_vec()); + res.partition(self.partition_value.clone()); + res.build().expect("msg") + }) + .collect_vec()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer does not exist", + )) + } } } @@ -231,7 +244,6 @@ impl FieldProjector { #[cfg(test)] mod test { use arrow_select::concat::concat_batches; - use bytes::Bytes; use itertools::Itertools; use std::{collections::HashMap, sync::Arc}; From d14019435cb3439bf3c70163dbf03025ad0b5d33 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Wed, 5 Jun 2024 13:38:02 +0800 Subject: [PATCH 12/12] fix: minor --- crates/iceberg/src/writer/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 48c01d444..5f3ae5581 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -92,7 +92,6 @@ pub trait CurrentFileStatus { #[cfg(test)] mod tests { - use arrow_array::RecordBatch; use arrow_schema::Schema; use arrow_select::concat::concat_batches;