-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Reading and Writing Extension FileTypes #8667
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,11 @@ | |
//! File type abstraction | ||
|
||
use crate::error::{DataFusionError, Result}; | ||
use crate::parsers::CompressionTypeVariant; | ||
|
||
use core::fmt; | ||
use std::fmt::Display; | ||
use std::hash::Hasher; | ||
use std::str::FromStr; | ||
|
||
/// The default file extension of arrow files | ||
|
@@ -41,7 +43,8 @@ pub trait GetExt { | |
} | ||
|
||
/// Readable file type | ||
#[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||
#[allow(clippy::derived_hash_with_manual_eq)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the manual and auto traits should be compatible, so silencing this is OK. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could probably also manually implement |
||
#[derive(Debug, Clone, Hash)] | ||
pub enum FileType { | ||
/// Apache Arrow file | ||
ARROW, | ||
|
@@ -54,8 +57,65 @@ pub enum FileType { | |
CSV, | ||
/// JSON file | ||
JSON, | ||
/// FileType Implemented Outside of DataFusion | ||
Extension(Box<dyn ExtensionFileType>), | ||
} | ||
|
||
/// A trait to enable externally implementing the functionality of a [FileType]. | ||
pub trait ExtensionFileType: | ||
std::fmt::Debug + ExtensionFileTypeClone + Send + Sync | ||
{ | ||
/// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned() | ||
/// The default_extension is also used to uniquely identify a specific FileType::Extension variant, | ||
/// so ensure this String is unique from any built in FileType and any other ExtensionFileTypes | ||
/// defined. | ||
fn default_extension(&self) -> String; | ||
|
||
/// Returns the file extension when it is compressed with a given [CompressionTypeVariant] | ||
fn extension_with_compression( | ||
&self, | ||
compression: CompressionTypeVariant, | ||
) -> Result<String>; | ||
} | ||
|
||
pub trait ExtensionFileTypeClone { | ||
fn clone_box(&self) -> Box<dyn ExtensionFileType>; | ||
} | ||
|
||
impl Clone for Box<dyn ExtensionFileType> { | ||
fn clone(&self) -> Box<dyn ExtensionFileType> { | ||
self.clone_box() | ||
} | ||
} | ||
|
||
impl std::hash::Hash for Box<dyn ExtensionFileType> { | ||
fn hash<H>(&self, state: &mut H) | ||
where | ||
H: Hasher, | ||
{ | ||
self.default_extension().hash(state) | ||
} | ||
} | ||
|
||
impl PartialEq for FileType { | ||
fn eq(&self, other: &Self) -> bool { | ||
match (self, other) { | ||
(FileType::ARROW, FileType::ARROW) => true, | ||
(FileType::AVRO, FileType::AVRO) => true, | ||
#[cfg(feature = "parquet")] | ||
(FileType::PARQUET, FileType::PARQUET) => true, | ||
(FileType::CSV, FileType::CSV) => true, | ||
(FileType::JSON, FileType::JSON) => true, | ||
(FileType::Extension(ext_self), FileType::Extension(ext_other)) => { | ||
ext_self.default_extension() == ext_other.default_extension() | ||
} | ||
_ => false, | ||
} | ||
} | ||
} | ||
|
||
impl Eq for FileType {} | ||
|
||
impl GetExt for FileType { | ||
fn get_ext(&self) -> String { | ||
match self { | ||
|
@@ -65,6 +125,7 @@ impl GetExt for FileType { | |
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), | ||
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), | ||
FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), | ||
FileType::Extension(ext) => ext.default_extension(), | ||
} | ||
} | ||
} | ||
|
@@ -78,6 +139,7 @@ impl Display for FileType { | |
FileType::PARQUET => "parquet", | ||
FileType::AVRO => "avro", | ||
FileType::ARROW => "arrow", | ||
FileType::Extension(ext) => return ext.fmt(f), | ||
}; | ||
write!(f, "{}", out) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -154,6 +154,10 @@ pub enum FileTypeWriterOptions { | |
JSON(JsonWriterOptions), | ||
Avro(AvroWriterOptions), | ||
Arrow(ArrowWriterOptions), | ||
/// For extension [FileType]s, FileTypeWriterOptions simply stores | ||
/// any passed StatementOptions to be handled later by any custom | ||
/// physical plans (Such as a FileFormat::create_writer_physical_plan) | ||
Extension(Option<StatementOptions>), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FileTypeWriterOptions does not handle any parsing or validation for externally defined FileTypes. It simply acts as a container for the options passed in. |
||
} | ||
|
||
impl FileTypeWriterOptions { | ||
|
@@ -184,14 +188,17 @@ impl FileTypeWriterOptions { | |
FileType::ARROW => { | ||
FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) | ||
} | ||
FileType::Extension(_) => { | ||
FileTypeWriterOptions::Extension(Some(statement_options.clone())) | ||
} | ||
}; | ||
|
||
Ok(file_type_write_options) | ||
} | ||
|
||
/// Constructs a FileTypeWriterOptions from session defaults only. | ||
pub fn build_default( | ||
file_type: &FileType, | ||
file_type: &mut FileType, | ||
config_defaults: &ConfigOptions, | ||
) -> Result<Self> { | ||
let empty_statement = StatementOptions::new(vec![]); | ||
|
@@ -214,6 +221,7 @@ impl FileTypeWriterOptions { | |
FileType::ARROW => { | ||
FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) | ||
} | ||
FileType::Extension(_) => FileTypeWriterOptions::Extension(None), | ||
}; | ||
|
||
Ok(file_type_write_options) | ||
|
@@ -290,6 +298,7 @@ impl Display for FileTypeWriterOptions { | |
FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", | ||
#[cfg(feature = "parquet")] | ||
FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", | ||
FileTypeWriterOptions::Extension(_) => "ExensionWriterOptions", | ||
}; | ||
write!(f, "{}", name) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,9 @@ impl ListingTableConfig { | |
), | ||
#[cfg(feature = "parquet")] | ||
FileType::PARQUET => Arc::new(ParquetFormat::default()), | ||
FileType::Extension(_) => { | ||
unreachable!("FileType::from_str cannot return Extension variant!") | ||
} | ||
}; | ||
|
||
Ok((file_format, ext)) | ||
|
@@ -768,7 +771,7 @@ impl TableProvider for ListingTable { | |
let file_type_writer_options = match &self.options().file_type_write_options { | ||
Some(opt) => opt.clone(), | ||
None => FileTypeWriterOptions::build_default( | ||
&file_format.file_type(), | ||
&mut file_format.file_type(), | ||
state.config_options(), | ||
)?, | ||
}; | ||
|
@@ -1716,6 +1719,9 @@ mod tests { | |
) | ||
.await?; | ||
} | ||
FileType::Extension(_) => { | ||
panic!("Extension file type not implemented in write path.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we instead return an NotImplement exception? |
||
} | ||
} | ||
|
||
// Create and register the source table with the provided schema and inserted data | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -605,6 +605,7 @@ impl DefaultPhysicalPlanner { | |
FileType::JSON => Arc::new(JsonFormat::default()), | ||
FileType::AVRO => Arc::new(AvroFormat {} ), | ||
FileType::ARROW => Arc::new(ArrowFormat {}), | ||
FileType::Extension(_ext) => return not_impl_err!("Extension FileTypes not supported in Copy To statements.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The built in Copy plans can't support external types (since we can't initialize them in DataFusion), but it should be not too difficult to build a custom COPY PhysicalPlan that uses a custom FileFormat, but otherwise is the same and relies on the build in Copy logical plan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried this (as well as combining ExtensionFileType trait into FileFormat), but that would require adding lots of dependencies between the DataFusion crates. FileType is in common and used in most of DataFusion while FileFormat is only in core. This could work but I think the consequence would be DataFusion becomes one big crate. I think the only way to keep the independent crates is to have at least two File related abstractions. One simple one for logical planning in Common and one heavy duty one for Execution plans in core. Even linking them is a challenge because then common depends on core 🤔 |
||
}; | ||
|
||
sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -891,6 +891,9 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { | |
FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => { | ||
return not_impl_err!("Arrow file sink protobuf serialization") | ||
} | ||
FileTypeWriterOptions::Extension(_) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be able to support serializing this since it is ultimately just a Vec<(String, String)> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could also punt it into the |
||
return not_impl_err!("Extension file sink protobuf serialization") | ||
} | ||
}; | ||
Ok(Self { | ||
file_type: Some(file_type), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the new stack size needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. Locally I did not need this. To get CI tests to pass I needed to boost stack size.