Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow adding user defined metadata to ParquetSink #10224

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,12 +1364,19 @@ impl TableOptions {

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
///
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
/// (e.g. sorting_columns).
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Additional file-level metadata to include. Inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl ConfigField for TableParquetOptions {
Expand All @@ -1383,6 +1390,20 @@ impl ConfigField for TableParquetOptions {
// Determine the key if it's a global or column-specific setting
if key.contains("::") {
self.column_specific_options.set(key, value)
} else if key.eq("metadata") {
for maybe_pair in value.split(' ') {
let (k, v) = match maybe_pair.split(':').collect::<Vec<_>>()[..] {
[k, v] => (k.into(), Some(v.into())),
[k] => (k.into(), None),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid metadata provided \"{maybe_pair}\""
)))
}
};
self.key_value_metadata.insert(k, v);
}
Ok(())
} else {
self.global.set(key, value)
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ mod tests {
123
);

// properties which remain as default on WriterProperties
assert_eq!(properties.key_value_metadata(), None);
assert_eq!(properties.sorting_columns(), None);

Ok(())
}

Expand Down
104 changes: 73 additions & 31 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

//! Options related to how parquet files should be written

use crate::{config::TableParquetOptions, DataFusionError, Result};
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand All @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
.set_write_batch_size(parquet_session_options.write_batch_size)
.set_writer_version(parse_version_string(
&parquet_session_options.writer_version,
)?)
.set_dictionary_page_size_limit(
parquet_session_options.dictionary_page_size_limit,
)
.set_max_row_group_size(parquet_session_options.max_row_group_size)
.set_created_by(parquet_session_options.created_by.clone())
.set_column_index_truncate_length(
parquet_session_options.column_index_truncate_length,
let ParquetOptions {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
.set_data_page_row_count_limit(
parquet_session_options.data_page_row_count_limit,
)
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
} else {
None
};

if let Some(encoding) = &parquet_session_options.encoding {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = parquet_session_options.dictionary_enabled {
builder = builder.set_dictionary_enabled(enabled);
if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &parquet_session_options.compression {
if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &parquet_session_options.statistics_enabled {
if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = parquet_session_options.max_statistics_size {
builder = builder.set_max_statistics_size(size);
if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(fpp);
if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
Expand Down Expand Up @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
Expand Down
22 changes: 19 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;

Expand Down Expand Up @@ -1857,7 +1857,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([(
"my-data".to_string(),
Some("stuff".to_string()),
)]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1891,7 +1897,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1907,6 +1916,13 @@ mod tests {
"output file metadata should contain col b"
);

let key_value_metadata = key_value_metadata.unwrap();
let expected_metadata = vec![KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
}];
assert_eq!(key_value_metadata, expected_metadata);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
})
}
}
Expand Down
60 changes: 59 additions & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,69 @@ OPTIONS (
'format.statistics_enabled::col2' none,
'format.max_statistics_size' 123,
'format.bloom_filter_fpp' 0.001,
'format.bloom_filter_ndv' 100
'format.bloom_filter_ndv' 100,
'format.metadata' 'foo:bar baz'
)
----
2

# valid vs invalid metadata

# accepts empty map
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' ''
)

# accepts map with a single entry
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' 'key:value'
)

# accepts map with multiple entries
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' 'key1:value1 key2:value2'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite and I missed it the first time around -- nice eyes @devinjdangelo . @wiedld could you please add documentation to the TableParquetOptions that documents this behavior?

Specifically, I would be interested to know "what if you want to store metadata values that have spaces in them" (key1:my value with spaces)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW it would be fine if the answer is "you will get an error / is not supported yet" -- it might just be good to document that behavior

I could see wanting to support things like key1:"my awesome value" key2:"my other awesome value" (again not in this PR, but we should at least document how it works I think)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could leverage a syntax like:

(
'format.metadata.key1' 'val1',
'format.metadata.key2' 'val2 with space',
...
)

to support values with spaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆 You beat me to it.

I started with the internal double quotes approach, also considered escaped spaces; then I realized that these are introducing lexical rules which did not feel SQL appropriate. Landed on the approach suggested by @devinjdangelo , commit will be up shortly.

)

# accepts entries which are key-only (no value)
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' 'key1 key2:value2 key3'
)

# errors for invalid key-value pair (extra `:`)
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata provided "foo:bar:extra"
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' 'foo:bar:extra'
)

# errors for invalid property (not stating `format.metadata`)
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata-key" not found on ParquetOptions
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.wrong-metadata-key' 'key:value'
)


# validate multiple parquet file output with all options set
statement ok
CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/';
Expand Down