Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support lower_bound&&upper_bound for parquet writer #383

Merged
merged 6 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use rust_decimal::prelude::ToPrimitive;
use std::collections::HashMap;
use std::sync::Arc;

/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask what's the reason of changing entries to key_value? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://github.com/apache/iceberg-python/blob/9b9ed534b2022cb9a687f4ed876fadcc2457b31b/pyiceberg/io/pyarrow.py#L2197, we can see that the field name of map type in the column name path of parquet is key_value


/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
Expand Down Expand Up @@ -500,9 +503,10 @@ impl SchemaVisitor for ToArrowSchemaConverter {
_ => unreachable!(),
};
let field = Field::new(
"entries",
DEFAULT_MAP_FIELD_NAME,
DataType::Struct(vec![key_field, value_field].into()),
map.value_field.required,
// Map field is always not nullable
false,
);

Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
Expand Down Expand Up @@ -562,7 +566,7 @@ impl SchemaVisitor for ToArrowSchemaConverter {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
}
crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Time32(TimeUnit::Microsecond),
DataType::Time64(TimeUnit::Microsecond),
)),
crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Microsecond, None),
Expand Down Expand Up @@ -659,10 +663,9 @@ mod tests {
let r#struct = DataType::Struct(fields);
let map = DataType::Map(
Arc::new(
Field::new("entries", r#struct, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"19".to_string(),
)])),
Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false).with_metadata(HashMap::from([
(PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string()),
])),
),
false,
);
Expand Down Expand Up @@ -1024,7 +1027,10 @@ mod tests {
]);

let r#struct = DataType::Struct(fields);
let map = DataType::Map(Arc::new(Field::new("entries", r#struct, false)), false);
let map = DataType::Map(
Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
false,
);

let fields = Fields::from(vec![
Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([(
Expand Down Expand Up @@ -1088,7 +1094,7 @@ mod tests {
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
Field::new("i", DataType::Time32(TimeUnit::Microsecond), false).with_metadata(
Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string())]),
),
Field::new(
Expand Down
17 changes: 17 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,13 @@ pub struct ListType {
pub element_field: NestedFieldRef,
}

impl ListType {
/// Construct a list type with the given element field.
pub fn new(element_field: NestedFieldRef) -> Self {
Self { element_field }
}
}

/// Module for type serialization/deserialization.
pub(super) mod _serde {
use crate::spec::datatypes::Type::Map;
Expand Down Expand Up @@ -782,6 +789,16 @@ pub struct MapType {
pub value_field: NestedFieldRef,
}

impl MapType {
/// Construct a map type with the given key and value fields.
pub fn new(key_field: NestedFieldRef, value_field: NestedFieldRef) -> Self {
Self {
key_field,
value_field,
}
}
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ mod tests {
(schema, record)
}

fn table_schema_nested() -> Schema {
pub fn table_schema_nested() -> Schema {
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
Expand Down
187 changes: 15 additions & 172 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {

#[cfg(test)]
mod test {
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
use crate::{
spec::{DataContentType, Schema, Struct},
Result,
};
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::{
Expand All @@ -123,195 +126,35 @@ mod test {
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
ParquetWriterBuilder,
},
tests::check_parquet_data_file,
IcebergWriter, IcebergWriterBuilder,
},
};

#[tokio::test]
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
async fn test_parquet_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// prepare data
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
let schema = {
let fields = vec![
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
arrow_schema::Field::new(
"col1",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
arrow_schema::Field::new(
"col3",
arrow_schema::DataType::List(Arc::new(
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
)),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new(
"col4",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let col1 = Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
));
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
"test";
1024
])) as ArrayRef;
let col3 = Arc::new({
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(
vec![Some(1),]
);
1024
])
.into_parts();
arrow_array::ListArray::new(
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)]))),
list_parts.1,
list_parts.2,
list_parts.3,
)
}) as ArrayRef;
let col4 = Arc::new(StructArray::new(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
vec![Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
))],
None,
));
let to_write =
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();

// prepare writer
let pb = ParquetWriterBuilder::new(
let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
to_write.schema(),
Arc::new(Schema::builder().build().unwrap()),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut data_file_writer = DataFileWriterBuilder::new(pb)
let mut data_file_writer = DataFileWriterBuilder::new(pw)
.build(DataFileWriterConfig::new(None))
.await?;

// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;
let data_file = data_file_writer.close().await.unwrap();
assert_eq!(data_file.len(), 1);
assert_eq!(data_file[0].file_format, DataFileFormat::Parquet);
assert_eq!(data_file[0].content, DataContentType::Data);
assert_eq!(data_file[0].partition, Struct::empty());

Ok(())
}
Expand Down
Loading
Loading