From 14bc547cfc1e6a26cd879b1cbb1c27022756820a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 17 Apr 2022 17:02:10 -0700 Subject: [PATCH 1/3] IPC read/write nested dict in map --- arrow/src/datatypes/field.rs | 2 +- arrow/src/ipc/reader.rs | 51 ++++++++++++++++++++++++++++++++++++ arrow/src/ipc/writer.rs | 33 +++++++++++++++++++++-- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/arrow/src/datatypes/field.rs b/arrow/src/datatypes/field.rs index c841216a540f..f64fd105d2ba 100644 --- a/arrow/src/datatypes/field.rs +++ b/arrow/src/datatypes/field.rs @@ -131,7 +131,7 @@ impl Field { DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) - | DataType::Map(field, _) => collected_fields.push(field), + | DataType::Map(field, _) => collected_fields.extend(field.fields()), DataType::Dictionary(_, value_field) => { collected_fields.append(&mut self._fields(value_field.as_ref())) } diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 143fa929da7c..024cf2200845 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -1481,4 +1481,55 @@ mod tests { let output_batch = roundtrip_ipc_stream(&input_batch); assert_eq!(input_batch, output_batch); } + + #[test] + fn test_roundtrip_stream_nested_dict_dict_in_map() { + let values = StringArray::from_iter_values(["a", "b", "c"]); + let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1]); + let dict_array = DictionaryArray::::try_new(&keys, &values).unwrap(); + + let keys_array = Int32Array::from_iter_values([0, 0, 1, 2, 0, 1]); + let keys_field = Field::new("keys", DataType::Int32, false); + let values_field = Field::new_dict( + "values", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), + false, + 1, + false, + ); + let entry_struct = StructArray::from(vec![ + (keys_field, make_array(keys_array.data().clone())), + (values_field, make_array(dict_array.data().clone())), + ]); + let map_data_type = DataType::Map( + Box::new(Field::new( + "entries", + entry_struct.data_type().clone(), + true, + )), + false, + ); + + let entry_offsets = Buffer::from_slice_ref(&[0, 2, 4, 6]); + let map_data = ArrayData::builder(map_data_type) + .len(3) + .add_buffer(entry_offsets) + .add_child_data(entry_struct.data().clone()) + .build() + .unwrap(); + let map_array = MapArray::from(map_data); + + let dict_dict_array = + DictionaryArray::::try_new(&keys, &map_array).unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "f1", + dict_dict_array.data_type().clone(), + false, + )])); + let input_batch = + RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap(); + let output_batch = roundtrip_ipc_stream(&input_batch); + assert_eq!(input_batch, output_batch); + } } diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index a5b35f364e72..f18f32b47f32 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -26,7 +26,8 @@ use std::io::{BufWriter, Write}; use flatbuffers::FlatBufferBuilder; use crate::array::{ - as_list_array, as_struct_array, as_union_array, make_array, ArrayData, ArrayRef, + as_list_array, as_map_array, as_struct_array, as_union_array, make_array, Array, + ArrayData, ArrayRef, }; use crate::buffer::{Buffer, MutableBuffer}; use crate::datatypes::*; @@ -146,7 +147,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, ) -> Result<()> { - // TODO: Handle other nested types (map, etc) + // TODO: Handle other nested types (LargeList, FixedSizeList) match column.data_type() { DataType::Struct(fields) => { let s = as_struct_array(column); @@ -170,6 +171,34 @@ impl IpcDataGenerator { write_options, )?; } + DataType::Map(field, _) => { + let map_array = as_map_array(column); + + let (keys, values) = match field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => { + (&fields[0], &fields[1]) + } + _ => panic!("Incorrect field data type {:?}", field.data_type()), + }; + + // keys + self.encode_dictionaries( + keys, + &map_array.keys(), + encoded_dictionaries, + dictionary_tracker, + write_options, + )?; + + // values + self.encode_dictionaries( + values, + &map_array.values(), + encoded_dictionaries, + dictionary_tracker, + write_options, + )?; + } DataType::Union(fields, _) => { let union = as_union_array(column); for (field, ref column) in fields From e79b0f5428d6add73ed88fef69dd7a23e1c4730a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 19 Apr 2022 10:39:47 -0700 Subject: [PATCH 2/3] For review comments --- arrow/src/ipc/reader.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 024cf2200845..c280aa678e20 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -1483,23 +1483,33 @@ mod tests { } #[test] - fn test_roundtrip_stream_nested_dict_dict_in_map() { - let values = StringArray::from_iter_values(["a", "b", "c"]); - let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1]); - let dict_array = DictionaryArray::::try_new(&keys, &values).unwrap(); - - let keys_array = Int32Array::from_iter_values([0, 0, 1, 2, 0, 1]); - let keys_field = Field::new("keys", DataType::Int32, false); + fn test_roundtrip_stream_nested_dict_of_map_of_dict() { + let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]); + let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]); + let value_dict_array = + DictionaryArray::::try_new(&value_dict_keys, &values).unwrap(); + + let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]); + let key_dict_array = + DictionaryArray::::try_new(&key_dict_keys, &values).unwrap(); + + let keys_field = Field::new_dict( + "keys", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), + true, + 1, + false, + ); let values_field = Field::new_dict( "values", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), - false, + true, 1, false, ); let entry_struct = StructArray::from(vec![ - (keys_field, make_array(keys_array.data().clone())), - (values_field, make_array(dict_array.data().clone())), + (keys_field, make_array(key_dict_array.data().clone())), + (values_field, make_array(value_dict_array.data().clone())), ]); let map_data_type = DataType::Map( Box::new(Field::new( @@ -1519,8 +1529,9 @@ mod tests { .unwrap(); let map_array = MapArray::from(map_data); + let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]); let dict_dict_array = - DictionaryArray::::try_new(&keys, &map_array).unwrap(); + DictionaryArray::::try_new(&dict_keys, &map_array).unwrap(); let schema = Arc::new(Schema::new(vec![Field::new( "f1", From 1f444f8dca6d4aa156dc0e3ee74a4183d2386329 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 Apr 2022 09:37:33 -0700 Subject: [PATCH 3/3] update comment --- arrow/src/ipc/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 76ef2d708759..1f73d16d2b74 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -147,7 +147,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, ) -> Result<()> { - // TODO: Handle other nested types (LargeList, FixedSizeList) + // TODO: Handle other nested types (FixedSizeList) match column.data_type() { DataType::Struct(fields) => { let s = as_struct_array(column);