From 7b33bc8c54bb59ddfdf3042dc00503fe11722a63 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 23 Apr 2024 14:30:45 -0700 Subject: [PATCH 1/6] chore: make explicit what ParquetWriterOptions are created from a subset of TableParquetOptions --- datafusion/common/src/file_options/mod.rs | 4 + .../common/src/file_options/parquet_writer.rs | 87 ++++++++++++------- 2 files changed, 60 insertions(+), 31 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index eb1ce1b364fd..a760619a7ba8 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -124,6 +124,10 @@ mod tests { 123 ); + // properties which remain as default on WriterProperties + assert_eq!(properties.key_value_metadata(), None); + assert_eq!(properties.sorting_columns(), None); + Ok(()) } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 28e73ba48f53..f651ff932a5a 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,7 +17,10 @@ //! Options related to how parquet files should be written -use crate::{config::TableParquetOptions, DataFusionError, Result}; +use crate::{ + config::{ParquetOptions, TableParquetOptions}, + DataFusionError, Result, +}; use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, @@ -47,53 +50,73 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; fn try_from(parquet_options: &TableParquetOptions) -> Result { - let parquet_session_options = &parquet_options.global; + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + dictionary_page_size_limit, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + bloom_filter_enabled, + encoding, + dictionary_enabled, + compression, + statistics_enabled, + max_statistics_size, + bloom_filter_fpp, + bloom_filter_ndv, + // below is not part of ParquetWriterOptions + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + } = &parquet_options.global; + let mut builder = WriterProperties::builder() - .set_data_page_size_limit(parquet_session_options.data_pagesize_limit) - .set_write_batch_size(parquet_session_options.write_batch_size) - .set_writer_version(parse_version_string( - &parquet_session_options.writer_version, - )?) - .set_dictionary_page_size_limit( - parquet_session_options.dictionary_page_size_limit, - ) - .set_max_row_group_size(parquet_session_options.max_row_group_size) - .set_created_by(parquet_session_options.created_by.clone()) - .set_column_index_truncate_length( - parquet_session_options.column_index_truncate_length, - ) - .set_data_page_row_count_limit( - parquet_session_options.data_page_row_count_limit, - ) - .set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled); - - if let Some(encoding) = &parquet_session_options.encoding { + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_enabled); + + if let Some(encoding) = &encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } - if let Some(enabled) = parquet_session_options.dictionary_enabled { - builder = builder.set_dictionary_enabled(enabled); + if let Some(enabled) = dictionary_enabled { + builder = builder.set_dictionary_enabled(*enabled); } - if let Some(compression) = &parquet_session_options.compression { + if let Some(compression) = &compression { builder = builder.set_compression(parse_compression_string(compression)?); } - if let Some(statistics) = &parquet_session_options.statistics_enabled { + if let Some(statistics) = &statistics_enabled { builder = builder.set_statistics_enabled(parse_statistics_string(statistics)?); } - if let Some(size) = parquet_session_options.max_statistics_size { - builder = builder.set_max_statistics_size(size); + if let Some(size) = max_statistics_size { + builder = builder.set_max_statistics_size(*size); } - if let Some(fpp) = parquet_session_options.bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(fpp); + if let Some(fpp) = bloom_filter_fpp { + builder = builder.set_bloom_filter_fpp(*fpp); } - if let Some(ndv) = parquet_session_options.bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(ndv); + if let Some(ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(*ndv); } for (column, options) in &parquet_options.column_specific_options { @@ -141,6 +164,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { builder.set_column_max_statistics_size(path, max_statistics_size); } } + + // ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns) Ok(ParquetWriterOptions { writer_options: builder.build(), }) From 86bf317385762166f8e6620793aa44d31ea7c482 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 23 Apr 2024 19:31:58 -0700 Subject: [PATCH 2/6] refactor: restore the ability to add kv metadata into the generated file sink --- datafusion/common/src/config.rs | 3 +++ .../common/src/file_options/parquet_writer.rs | 21 +++++++++++++++++-- .../src/datasource/file_format/parquet.rs | 20 ++++++++++++++++-- .../proto/src/physical_plan/from_proto.rs | 1 + 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 30ab9a339b54..a8f813b1952f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1370,6 +1370,9 @@ pub struct TableParquetOptions { pub global: ParquetOptions, /// Column specific options. Default usage is parquet.XX::column. pub column_specific_options: HashMap, + /// Additional metadata to be inserted into the key_value_metadata + /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). + pub key_value_metadata: HashMap>, } impl ConfigField for TableParquetOptions { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f651ff932a5a..4958246f54b4 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -24,7 +24,10 @@ use crate::{ use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::{ + metadata::KeyValue, + properties::{EnabledStatistics, WriterProperties, WriterVersion}, + }, schema::types::ColumnPath, }; @@ -79,6 +82,19 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { maximum_buffered_record_batches_per_stream: _, } = &parquet_options.global; + let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { + Some( + parquet_options + .key_value_metadata + .clone() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect::>(), + ) + } else { + None + }; + let mut builder = WriterProperties::builder() .set_data_page_size_limit(*data_pagesize_limit) .set_write_batch_size(*write_batch_size) @@ -88,7 +104,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_enabled); + .set_bloom_filter_enabled(*bloom_filter_enabled) + .set_key_value_metadata(key_value_metadata); if let Some(encoding) = &encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 66f506f9aa2e..0cf343560bba 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1857,7 +1857,13 @@ mod tests { }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, - TableParquetOptions::default(), + TableParquetOptions { + key_value_metadata: std::collections::HashMap::from([( + "my-data".to_string(), + Some("stuff".to_string()), + )]), + ..Default::default() + }, )); // create data @@ -1891,7 +1897,10 @@ mod tests { let ( path, FileMetaData { - num_rows, schema, .. + num_rows, + schema, + key_value_metadata, + .. }, ) = written.take(1).next().unwrap(); let path_parts = path.parts().collect::>(); @@ -1907,6 +1916,13 @@ mod tests { "output file metadata should contain col b" ); + let key_value_metadata = key_value_metadata.unwrap(); + let my_metadata = key_value_metadata + .iter() + .filter(|kv| kv.key == "my-data") + .collect::>(); + assert_eq!(my_metadata.len(), 1); + Ok(()) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 6184332ea581..1d3edb7b6075 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -969,6 +969,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, + key_value_metadata: Default::default(), }) } } From c35b32619b5c76a44bdcb6682474937b9247972c Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 24 Apr 2024 11:53:47 -0700 Subject: [PATCH 3/6] test: demomnstrate API contract for metadata TableParquetOptions --- datafusion/common/src/config.rs | 14 ++++++++++ datafusion/sqllogictest/test_files/copy.slt | 29 ++++++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a8f813b1952f..424f9ba23767 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1386,6 +1386,20 @@ impl ConfigField for TableParquetOptions { // Determine the key if it's a global or column-specific setting if key.contains("::") { self.column_specific_options.set(key, value) + } else if key.eq("metadata") { + for maybe_pair in value.split('_') { + let (k, v) = match maybe_pair.split(':').collect::>()[..] { + [k, v] => (k.into(), Some(v.into())), + [k] => (k.into(), None), + _ => { + return Err(DataFusionError::Configuration(format!( + "Invalid metadata provided \"{maybe_pair}\"" + ))) + } + }; + self.key_value_metadata.insert(k, v); + } + Ok(()) } else { self.global.set(key, value) } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 502dfd4fa6bb..23ae8ca2b849 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -283,11 +283,38 @@ OPTIONS ( 'format.statistics_enabled::col2' none, 'format.max_statistics_size' 123, 'format.bloom_filter_fpp' 0.001, -'format.bloom_filter_ndv' 100 +'format.bloom_filter_ndv' 100, +'format.metadata' 'foo:bar baz' ) ---- 2 +# valid vs invalid metadata + +statement ok +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' '' +) + +statement error +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' 'foo:bar:extra' +) + +statement error +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.wrong-metadata-key' 'foo:bar baz' +) + # validate multiple parquet file output with all options set statement ok CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; From cd91a77a95e14f69faa178ddb2838783b2520e09 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Apr 2024 09:40:44 -0700 Subject: [PATCH 4/6] chore: update code docs --- datafusion/common/src/config.rs | 6 +++++- datafusion/common/src/file_options/parquet_writer.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 424f9ba23767..3f86f6755566 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1364,13 +1364,17 @@ impl TableOptions { /// Options that control how Parquet files are read, including global options /// that apply to all columns and optional column-specific overrides +/// +/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions). +/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API +/// (e.g. sorting_columns). #[derive(Clone, Default, Debug, PartialEq)] pub struct TableParquetOptions { /// Global Parquet options that propagates to all columns. pub global: ParquetOptions, /// Column specific options. Default usage is parquet.XX::column. pub column_specific_options: HashMap, - /// Additional metadata to be inserted into the key_value_metadata + /// Additional file-level metadata to include. Inserted into the key_value_metadata /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). pub key_value_metadata: HashMap>, } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 4958246f54b4..8ac6bcaa7adf 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -182,7 +182,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { } } - // ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns) + // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) Ok(ParquetWriterOptions { writer_options: builder.build(), }) From 0cf04b52be15e62f2d9a490b73cee49fc9ff2dd9 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Apr 2024 09:51:54 -0700 Subject: [PATCH 5/6] fix: parse on proper delimiter, and improve tests --- datafusion/common/src/config.rs | 2 +- .../src/datasource/file_format/parquet.rs | 12 +++--- datafusion/sqllogictest/test_files/copy.slt | 37 +++++++++++++++++-- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3f86f6755566..68d7d6f0db80 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1391,7 +1391,7 @@ impl ConfigField for TableParquetOptions { if key.contains("::") { self.column_specific_options.set(key, value) } else if key.eq("metadata") { - for maybe_pair in value.split('_') { + for maybe_pair in value.split(' ') { let (k, v) = match maybe_pair.split(':').collect::>()[..] { [k, v] => (k.into(), Some(v.into())), [k] => (k.into(), None), diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0cf343560bba..e54cfcac7ff6 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1129,7 +1129,7 @@ mod tests { }; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::ParquetRecordBatchStreamBuilder; - use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; + use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex}; use parquet::file::page_index::index::Index; use tokio::fs::File; @@ -1917,11 +1917,11 @@ mod tests { ); let key_value_metadata = key_value_metadata.unwrap(); - let my_metadata = key_value_metadata - .iter() - .filter(|kv| kv.key == "my-data") - .collect::>(); - assert_eq!(my_metadata.len(), 1); + let expected_metadata = vec![KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }]; + assert_eq!(key_value_metadata, expected_metadata); Ok(()) } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 23ae8ca2b849..cffbc74a627b 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -291,6 +291,7 @@ OPTIONS ( # valid vs invalid metadata +# accepts empty map statement ok COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' @@ -299,7 +300,35 @@ OPTIONS ( 'format.metadata' '' ) -statement error +# accepts map with a single entry +statement ok +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' 'key:value' +) + +# accepts map with multiple entries +statement ok +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' 'key1:value1 key2:value2' +) + +# accepts entries which are key-only (no value) +statement ok +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' 'key1 key2:value2 key3' +) + +# errors for invalid key-value pair (extra `:`) +statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata provided "foo:bar:extra" COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET @@ -307,14 +336,16 @@ OPTIONS ( 'format.metadata' 'foo:bar:extra' ) -statement error +# errors for invalid property (not stating `format.metadata`) +statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata-key" not found on ParquetOptions COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.wrong-metadata-key' 'foo:bar baz' +'format.wrong-metadata-key' 'key:value' ) + # validate multiple parquet file output with all options set statement ok CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; From 5b06bf31099916c4b0ebae57fc3ecc4817f5ae0d Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Apr 2024 13:27:49 -0700 Subject: [PATCH 6/6] fix: enable any character in the metadata string value, by having any key parsing be a part of the format.metadata::key --- datafusion/common/src/config.rs | 72 +++++++++++++++---- .../src/datasource/file_format/parquet.rs | 25 ++++--- datafusion/sqllogictest/test_files/copy.slt | 34 +++++---- 3 files changed, 95 insertions(+), 36 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 68d7d6f0db80..8539ca0874dd 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1376,6 +1376,18 @@ pub struct TableParquetOptions { pub column_specific_options: HashMap, /// Additional file-level metadata to include. Inserted into the key_value_metadata /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). + /// + /// Multiple entries are permitted + /// ```sql + /// OPTIONS ( + /// 'format.metadata::key1' '', + /// 'format.metadata::key2' 'value', + /// 'format.metadata::key3' 'value has spaces', + /// 'format.metadata::key4' 'value has special chars :: :', + /// 'format.metadata::key_dupe' 'original will be overwritten', + /// 'format.metadata::key_dupe' 'final' + /// ) + /// ``` pub key_value_metadata: HashMap>, } @@ -1387,23 +1399,25 @@ impl ConfigField for TableParquetOptions { } fn set(&mut self, key: &str, value: &str) -> Result<()> { - // Determine the key if it's a global or column-specific setting - if key.contains("::") { - self.column_specific_options.set(key, value) - } else if key.eq("metadata") { - for maybe_pair in value.split(' ') { - let (k, v) = match maybe_pair.split(':').collect::>()[..] { - [k, v] => (k.into(), Some(v.into())), - [k] => (k.into(), None), + // Determine if the key is a global, metadata, or column-specific setting + if key.starts_with("metadata::") { + let k = + match key.split("::").collect::>()[..] { + [_meta] | [_meta, ""] => return Err(DataFusionError::Configuration( + "Invalid metadata key provided, missing key in metadata::" + .to_string(), + )), + [_meta, k] => k.into(), _ => { return Err(DataFusionError::Configuration(format!( - "Invalid metadata provided \"{maybe_pair}\"" - ))) + "Invalid metadata key provided, found too many '::' in \"{key}\"" + ))) } }; - self.key_value_metadata.insert(k, v); - } + self.key_value_metadata.insert(k, Some(value.into())); Ok(()) + } else if key.contains("::") { + self.column_specific_options.set(key, value) } else { self.global.set(key, value) } @@ -1794,4 +1808,38 @@ mod tests { .iter() .any(|item| item.key == "format.bloom_filter_enabled::col1")) } + + #[cfg(feature = "parquet")] + #[test] + fn parquet_table_options_config_metadata_entry() { + let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::PARQUET); + table_config.set("format.metadata::key1", "").unwrap(); + table_config.set("format.metadata::key2", "value2").unwrap(); + table_config + .set("format.metadata::key3", "value with spaces ") + .unwrap(); + table_config + .set("format.metadata::key4", "value with special chars :: :") + .unwrap(); + + let parsed_metadata = table_config.parquet.key_value_metadata.clone(); + assert_eq!(parsed_metadata.get("should not exist1"), None); + assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into()))); + assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into()))); + assert_eq!( + parsed_metadata.get("key3"), + Some(&Some("value with spaces ".into())) + ); + assert_eq!( + parsed_metadata.get("key4"), + Some(&Some("value with special chars :: :".into())) + ); + + // duplicate keys are overwritten + table_config.set("format.metadata::key_dupe", "A").unwrap(); + table_config.set("format.metadata::key_dupe", "B").unwrap(); + let parsed_metadata = table_config.parquet.key_value_metadata; + assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e54cfcac7ff6..7ec7d4540fff 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1858,10 +1858,10 @@ mod tests { let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, TableParquetOptions { - key_value_metadata: std::collections::HashMap::from([( - "my-data".to_string(), - Some("stuff".to_string()), - )]), + key_value_metadata: std::collections::HashMap::from([ + ("my-data".to_string(), Some("stuff".to_string())), + ("my-data-bool-key".to_string(), None), + ]), ..Default::default() }, )); @@ -1916,11 +1916,18 @@ mod tests { "output file metadata should contain col b" ); - let key_value_metadata = key_value_metadata.unwrap(); - let expected_metadata = vec![KeyValue { - key: "my-data".to_string(), - value: Some("stuff".to_string()), - }]; + let mut key_value_metadata = key_value_metadata.unwrap(); + key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); + let expected_metadata = vec![ + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, + }, + ]; assert_eq!(key_value_metadata, expected_metadata); Ok(()) diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index cffbc74a627b..d695e8514b07 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -284,65 +284,69 @@ OPTIONS ( 'format.max_statistics_size' 123, 'format.bloom_filter_fpp' 0.001, 'format.bloom_filter_ndv' 100, -'format.metadata' 'foo:bar baz' +'format.metadata::key' 'value' ) ---- 2 # valid vs invalid metadata -# accepts empty map +# accepts map with a single entry statement ok COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.metadata' '' + 'format.metadata::key' 'value' ) -# accepts map with a single entry +# accepts multiple entries (on different keys) statement ok COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.metadata' 'key:value' + 'format.metadata::key1' '', + 'format.metadata::key2' 'value', + 'format.metadata::key3' 'value with spaces', + 'format.metadata::key4' 'value with special chars :: :' ) -# accepts map with multiple entries +# accepts multiple entries with the same key (will overwrite) statement ok COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.metadata' 'key1:value1 key2:value2' + 'format.metadata::key1' 'value', + 'format.metadata::key1' 'value' ) -# accepts entries which are key-only (no value) -statement ok +# errors if key is missing +statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, missing key in metadata:: COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.metadata' 'key1 key2:value2 key3' + 'format.metadata::' 'value' ) -# errors for invalid key-value pair (extra `:`) -statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata provided "foo:bar:extra" +# errors if key contains internal '::' +statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, found too many '::' in "metadata::key::extra" COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.metadata' 'foo:bar:extra' + 'format.metadata::key::extra' 'value' ) # errors for invalid property (not stating `format.metadata`) -statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata-key" not found on ParquetOptions +statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata" not found on ColumnOptions COPY source_table TO 'test_files/scratch/copy/table_with_metadata/' STORED AS PARQUET OPTIONS ( -'format.wrong-metadata-key' 'key:value' + 'format.wrong-metadata::key' 'value' )