Skip to content

Commit

Permalink
feat: support lower_bound&&upper_bound for parquet writer (apache#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME authored and shaeqahmed committed Dec 9, 2024
1 parent 3ccddac commit afedb48
Show file tree
Hide file tree
Showing 6 changed files with 968 additions and 458 deletions.
24 changes: 15 additions & 9 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use rust_decimal::prelude::ToPrimitive;
use std::collections::HashMap;
use std::sync::Arc;

/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";

/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
Expand Down Expand Up @@ -500,9 +503,10 @@ impl SchemaVisitor for ToArrowSchemaConverter {
_ => unreachable!(),
};
let field = Field::new(
"entries",
DEFAULT_MAP_FIELD_NAME,
DataType::Struct(vec![key_field, value_field].into()),
map.value_field.required,
// Map field is always not nullable
false,
);

Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
Expand Down Expand Up @@ -562,7 +566,7 @@ impl SchemaVisitor for ToArrowSchemaConverter {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
}
crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Time32(TimeUnit::Microsecond),
DataType::Time64(TimeUnit::Microsecond),
)),
crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Microsecond, None),
Expand Down Expand Up @@ -659,10 +663,9 @@ mod tests {
let r#struct = DataType::Struct(fields);
let map = DataType::Map(
Arc::new(
Field::new("entries", r#struct, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"19".to_string(),
)])),
Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false).with_metadata(HashMap::from([
(PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string()),
])),
),
false,
);
Expand Down Expand Up @@ -1024,7 +1027,10 @@ mod tests {
]);

let r#struct = DataType::Struct(fields);
let map = DataType::Map(Arc::new(Field::new("entries", r#struct, false)), false);
let map = DataType::Map(
Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
false,
);

let fields = Fields::from(vec![
Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([(
Expand Down Expand Up @@ -1088,7 +1094,7 @@ mod tests {
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
Field::new("i", DataType::Time32(TimeUnit::Microsecond), false).with_metadata(
Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string())]),
),
Field::new(
Expand Down
17 changes: 17 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ pub struct ListType {
pub element_field: NestedFieldRef,
}

impl ListType {
/// Construct a list type with the given element field.
pub fn new(element_field: NestedFieldRef) -> Self {
Self { element_field }
}
}

/// Module for type serialization/deserialization.
pub(super) mod _serde {
use crate::spec::datatypes::Type::Map;
Expand Down Expand Up @@ -788,6 +795,16 @@ pub struct MapType {
pub value_field: NestedFieldRef,
}

impl MapType {
/// Construct a map type with the given key and value fields.
pub fn new(key_field: NestedFieldRef, value_field: NestedFieldRef) -> Self {
Self {
key_field,
value_field,
}
}
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ mod tests {
(schema, record)
}

fn table_schema_nested() -> Schema {
pub fn table_schema_nested() -> Schema {
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
Expand Down
187 changes: 15 additions & 172 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {

#[cfg(test)]
mod test {
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
use crate::{
spec::{DataContentType, Schema, Struct},
Result,
};
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::{
Expand All @@ -123,195 +126,35 @@ mod test {
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
ParquetWriterBuilder,
},
tests::check_parquet_data_file,
IcebergWriter, IcebergWriterBuilder,
},
};

#[tokio::test]
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
async fn test_parquet_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// prepare data
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
let schema = {
let fields = vec![
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
arrow_schema::Field::new(
"col1",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
arrow_schema::Field::new(
"col3",
arrow_schema::DataType::List(Arc::new(
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
)),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new(
"col4",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let col1 = Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
));
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
"test";
1024
])) as ArrayRef;
let col3 = Arc::new({
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(
vec![Some(1),]
);
1024
])
.into_parts();
arrow_array::ListArray::new(
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)]))),
list_parts.1,
list_parts.2,
list_parts.3,
)
}) as ArrayRef;
let col4 = Arc::new(StructArray::new(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
vec![Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
))],
None,
));
let to_write =
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();

// prepare writer
let pb = ParquetWriterBuilder::new(
let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
to_write.schema(),
Arc::new(Schema::builder().build().unwrap()),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut data_file_writer = DataFileWriterBuilder::new(pb)
let mut data_file_writer = DataFileWriterBuilder::new(pw)
.build(DataFileWriterConfig::new(None))
.await?;

// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;
let data_file = data_file_writer.close().await.unwrap();
assert_eq!(data_file.len(), 1);
assert_eq!(data_file[0].file_format, DataFileFormat::Parquet);
assert_eq!(data_file[0].content, DataContentType::Data);
assert_eq!(data_file[0].partition, Struct::empty());

Ok(())
}
Expand Down
Loading

0 comments on commit afedb48

Please sign in to comment.