Skip to content

Commit

Permalink
fix: change map nullable value to false
Browse files Browse the repository at this point in the history
This value was true but where arrow defines it as always false
https://github.com/apache/arrow-rs/blob/master/arrow-schema/src/field.rs#L230.

This is also described in apache/arrow-rs#1697.

This also replaces `key_value` as the struct name with `entries` to remain
consistent with https://github.com/apache/arrow-rs/blob/878217b9e330b4f1ed13e798a214ea11fbeb2bbb/arrow-schema/src/datatype.rs#L247-L250
  • Loading branch information
cmackenzie1 committed Sep 8, 2023
1 parent 30c55d4 commit faae0ce
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
2 changes: 1 addition & 1 deletion rust/src/action/parquet2_read/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ where
ActType: ActionVariant,
SetMapFn: Fn(&mut ActType, (Vec<String>, Vec<Option<String>>)),
{
debug_assert!(field[0] == "key_value");
debug_assert!(field[0] == "entries");
#[cfg(debug_assertions)]
{
use parquet2::schema::types::PhysicalType;
Expand Down
10 changes: 5 additions & 5 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
]
.into(),
),
true,
false,
)),
false,
)),
Expand Down Expand Up @@ -305,7 +305,7 @@ macro_rules! arrow_map {
stringify!($fieldname),
ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
Expand All @@ -325,7 +325,7 @@ macro_rules! arrow_map {
stringify!($fieldname),
ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
Expand Down Expand Up @@ -840,15 +840,15 @@ mod tests {
fn test_delta_from_arrow_map_type() {
let arrow_map = ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Int8, false),
ArrowField::new("value", ArrowDataType::Binary, true),
]
.into(),
),
true,
false,
)),
false,
);
Expand Down
53 changes: 53 additions & 0 deletions rust/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use std::error::Error;
mod common;

mod local {
use deltalake::{writer::JsonWriter, SchemaTypeMap};

use super::*;
#[tokio::test]
#[serial]
Expand Down Expand Up @@ -933,6 +935,55 @@ mod local {

Ok(())
}

#[tokio::test]
async fn test_issue_1619_parquet_panic_using_map_type() -> Result<()> {
let _ = tokio::fs::remove_dir_all("./tests/data/issue-1619").await;
let fields: Vec<SchemaField> = vec![SchemaField::new(
"metadata".to_string(),
SchemaDataType::map(SchemaTypeMap::new(
Box::new(SchemaDataType::primitive("string".to_string())),
Box::new(SchemaDataType::primitive("string".to_string())),
true,
)),
true,
HashMap::new(),
)];
let schema = deltalake::Schema::new(fields);
let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?;
let _ = DeltaOps::from(table)
.create()
.with_columns(schema.get_fields().to_owned())
.await?;

let mut table = deltalake::open_table("./tests/data/issue-1619").await?;

let mut writer = JsonWriter::for_table(&table).unwrap();
let _ = writer
.write(vec![
serde_json::json!({"metadata": {"hello": "world", "something": null}}),
])
.await
.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();

let ctx = SessionContext::new();
ctx.register_table("t", Arc::new(table))?;

let batches = ctx.sql(r#"SELECT * FROM t"#).await?.collect().await?;

let expected = vec![
"+-----------------------------+",
"| metadata |",
"+-----------------------------+",
"| {hello: world, something: } |", // unclear why it doesn't say `null` for something...
"+-----------------------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
Expand Down Expand Up @@ -1027,6 +1078,8 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {
}

mod date_partitions {
use deltalake::SchemaField;

use super::*;

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
Expand Down

0 comments on commit faae0ce

Please sign in to comment.