-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow adding user defined metadata to ParquetSink
#10224
Changes from all commits
7b33bc8
86bf317
c35b326
cd91a77
0cf04b5
5b06bf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1364,12 +1364,31 @@ impl TableOptions { | |
|
||
/// Options that control how Parquet files are read, including global options | ||
/// that apply to all columns and optional column-specific overrides | ||
/// | ||
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions). | ||
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API | ||
/// (e.g. sorting_columns). | ||
#[derive(Clone, Default, Debug, PartialEq)] | ||
pub struct TableParquetOptions { | ||
/// Global Parquet options that propagates to all columns. | ||
pub global: ParquetOptions, | ||
/// Column specific options. Default usage is parquet.XX::column. | ||
pub column_specific_options: HashMap<String, ColumnOptions>, | ||
/// Additional file-level metadata to include. Inserted into the key_value_metadata | ||
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). | ||
/// | ||
/// Multiple entries are permitted | ||
/// ```sql | ||
/// OPTIONS ( | ||
/// 'format.metadata::key1' '', | ||
/// 'format.metadata::key2' 'value', | ||
/// 'format.metadata::key3' 'value has spaces', | ||
/// 'format.metadata::key4' 'value has special chars :: :', | ||
/// 'format.metadata::key_dupe' 'original will be overwritten', | ||
/// 'format.metadata::key_dupe' 'final' | ||
/// ) | ||
/// ``` | ||
pub key_value_metadata: HashMap<String, Option<String>>, | ||
} | ||
|
||
impl ConfigField for TableParquetOptions { | ||
|
@@ -1380,8 +1399,24 @@ impl ConfigField for TableParquetOptions { | |
} | ||
|
||
fn set(&mut self, key: &str, value: &str) -> Result<()> { | ||
// Determine the key if it's a global or column-specific setting | ||
if key.contains("::") { | ||
// Determine if the key is a global, metadata, or column-specific setting | ||
if key.starts_with("metadata::") { | ||
let k = | ||
match key.split("::").collect::<Vec<_>>()[..] { | ||
[_meta] | [_meta, ""] => return Err(DataFusionError::Configuration( | ||
"Invalid metadata key provided, missing key in metadata::<key>" | ||
.to_string(), | ||
)), | ||
[_meta, k] => k.into(), | ||
_ => { | ||
return Err(DataFusionError::Configuration(format!( | ||
"Invalid metadata key provided, found too many '::' in \"{key}\"" | ||
))) | ||
} | ||
}; | ||
self.key_value_metadata.insert(k, Some(value.into())); | ||
Ok(()) | ||
} else if key.contains("::") { | ||
self.column_specific_options.set(key, value) | ||
} else { | ||
self.global.set(key, value) | ||
|
@@ -1773,4 +1808,38 @@ mod tests { | |
.iter() | ||
.any(|item| item.key == "format.bloom_filter_enabled::col1")) | ||
} | ||
|
||
#[cfg(feature = "parquet")] | ||
#[test] | ||
fn parquet_table_options_config_metadata_entry() { | ||
let mut table_config = TableOptions::new(); | ||
table_config.set_file_format(FileType::PARQUET); | ||
table_config.set("format.metadata::key1", "").unwrap(); | ||
table_config.set("format.metadata::key2", "value2").unwrap(); | ||
table_config | ||
.set("format.metadata::key3", "value with spaces ") | ||
.unwrap(); | ||
table_config | ||
.set("format.metadata::key4", "value with special chars :: :") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
.unwrap(); | ||
|
||
let parsed_metadata = table_config.parquet.key_value_metadata.clone(); | ||
assert_eq!(parsed_metadata.get("should not exist1"), None); | ||
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into()))); | ||
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into()))); | ||
assert_eq!( | ||
parsed_metadata.get("key3"), | ||
Some(&Some("value with spaces ".into())) | ||
); | ||
assert_eq!( | ||
parsed_metadata.get("key4"), | ||
Some(&Some("value with special chars :: :".into())) | ||
); | ||
|
||
// duplicate keys are overwritten | ||
table_config.set("format.metadata::key_dupe", "A").unwrap(); | ||
table_config.set("format.metadata::key_dupe", "B").unwrap(); | ||
let parsed_metadata = table_config.parquet.key_value_metadata; | ||
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -283,11 +283,73 @@ OPTIONS ( | |
'format.statistics_enabled::col2' none, | ||
'format.max_statistics_size' 123, | ||
'format.bloom_filter_fpp' 0.001, | ||
'format.bloom_filter_ndv' 100 | ||
'format.bloom_filter_ndv' 100, | ||
'format.metadata::key' 'value' | ||
) | ||
---- | ||
2 | ||
|
||
# valid vs invalid metadata | ||
|
||
# accepts map with a single entry | ||
statement ok | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.metadata::key' 'value' | ||
) | ||
|
||
# accepts multiple entries (on different keys) | ||
statement ok | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.metadata::key1' '', | ||
'format.metadata::key2' 'value', | ||
'format.metadata::key3' 'value with spaces', | ||
'format.metadata::key4' 'value with special chars :: :' | ||
) | ||
|
||
# accepts multiple entries with the same key (will overwrite) | ||
statement ok | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.metadata::key1' 'value', | ||
'format.metadata::key1' 'value' | ||
) | ||
Comment on lines
+315
to
+323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This overwriting is a common feature to all of the config There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree there is no need to change it in this PR |
||
|
||
# errors if key is missing | ||
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, missing key in metadata::<key> | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.metadata::' 'value' | ||
) | ||
|
||
# errors if key contains internal '::' | ||
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, found too many '::' in "metadata::key::extra" | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.metadata::key::extra' 'value' | ||
) | ||
|
||
# errors for invalid property (not stating `format.metadata`) | ||
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata" not found on ColumnOptions | ||
COPY source_table | ||
TO 'test_files/scratch/copy/table_with_metadata/' | ||
STORED AS PARQUET | ||
OPTIONS ( | ||
'format.wrong-metadata::key' 'value' | ||
) | ||
|
||
|
||
# validate multiple parquet file output with all options set | ||
statement ok | ||
CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍