diff --git a/rust/src/action/parquet2_read/map.rs b/rust/src/action/parquet2_read/map.rs index ed730e383b..f21464d889 100644 --- a/rust/src/action/parquet2_read/map.rs +++ b/rust/src/action/parquet2_read/map.rs @@ -24,7 +24,7 @@ where ActType: ActionVariant, SetMapFn: Fn(&mut ActType, (Vec, Vec>)), { - debug_assert!(field[0] == "key_value"); + debug_assert!(field[0] == "entries"); #[cfg(debug_assertions)] { use parquet2::schema::types::PhysicalType; diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 7f4fa7a154..e557bc0c9d 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -167,7 +167,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { ] .into(), ), - true, + false, )), false, )), @@ -305,7 +305,7 @@ macro_rules! arrow_map { stringify!($fieldname), ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Utf8, false), @@ -325,7 +325,7 @@ macro_rules! arrow_map { stringify!($fieldname), ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Utf8, false), @@ -840,7 +840,7 @@ mod tests { fn test_delta_from_arrow_map_type() { let arrow_map = ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Int8, false), @@ -848,7 +848,7 @@ mod tests { ] .into(), ), - true, + false, )), false, ); diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index beab2561f4..aca3e3c915 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -45,6 +45,8 @@ use std::error::Error; mod common; mod local { + use deltalake::{writer::JsonWriter, SchemaTypeMap}; + use super::*; #[tokio::test] #[serial] @@ -933,6 +935,55 @@ mod local { Ok(()) } + + #[tokio::test] + async fn test_issue_1619_parquet_panic_using_map_type() -> Result<()> { + let _ = tokio::fs::remove_dir_all("./tests/data/issue-1619").await; + let fields: Vec = vec![SchemaField::new( + "metadata".to_string(), + SchemaDataType::map(SchemaTypeMap::new( + Box::new(SchemaDataType::primitive("string".to_string())), + Box::new(SchemaDataType::primitive("string".to_string())), + true, + )), + true, + HashMap::new(), + )]; + let schema = deltalake::Schema::new(fields); + let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?; + let _ = DeltaOps::from(table) + .create() + .with_columns(schema.get_fields().to_owned()) + .await?; + + let mut table = deltalake::open_table("./tests/data/issue-1619").await?; + + let mut writer = JsonWriter::for_table(&table).unwrap(); + let _ = writer + .write(vec![ + serde_json::json!({"metadata": {"hello": "world", "something": null}}), + ]) + .await + .unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(table))?; + + let batches = ctx.sql(r#"SELECT * FROM t"#).await?.collect().await?; + + let expected = vec![ + "+-----------------------------+", + "| metadata |", + "+-----------------------------+", + "| {hello: world, something: } |", // unclear why it doesn't say `null` for something... + "+-----------------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) + } } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] @@ -1027,6 +1078,8 @@ async fn simple_query(context: &IntegrationContext) -> TestResult { } mod date_partitions { + use deltalake::SchemaField; + use super::*; async fn setup_test() -> Result> {