-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support COPY TO Externally Defined File Formats, add FileType trait #11060
Conversation
…pache#10879) * adding new function contains * adding substrait test * adding doc * adding doc * Update docs/source/user-guide/sql/scalar_functions.md Co-authored-by: Alex Huang <[email protected]> * adding entry --------- Co-authored-by: Alex Huang <[email protected]>
This looks awesome -- thank you @devinjdangelo -- I plan to review this carefully, but probably not until tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @alamb @thinkharderdev I finally had some time to figure out how to make #8667 work.
There are a number of tasks to be done, preferably as follow on PRs given the size/complexity of the changes in this PR already:
- end-to-end example of implementing a custom
FileFormat
and usingCOPY
with it - Finishing
LogicalExtensionCodec
impls for csv, json, parquet, arrow, and avroFileFormat
- Moving
FileFormat
impls and supporting code out of core and into dedicated crates - (maybe) Moving
ListingTable
to a dedicated crate
/// These file types have special built in behavior for configuration. | ||
/// Use TableOptions::Extensions for configuring other file types. | ||
#[derive(Debug, Clone)] | ||
pub enum ConfigFileType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the last vestige of the old FileType
enum. It can be completely ignored if using a custom format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 perhaps we should add some Trait to unify the handling of options for built in formats and custom formats 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could potentially remove the TableOptions
code and instead have each FileFormatFactory
handle configuration. This is actually mostly the case in this PR already, but TableOptions
is a common helper.
@@ -40,107 +37,10 @@ pub trait GetExt { | |||
fn get_ext(&self) -> String; | |||
} | |||
|
|||
/// Readable file type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the primary change in the PR. Most other changes are just to recover the original behavior for these file types, but with the new trait.
/// The former trait is a superset of the latter trait, which includes execution time | ||
/// relevant methods. [FileType] is only used in logical planning and only implements | ||
/// the subset of methods required during logical planning. | ||
pub struct DefaultFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design pattern is copied from how the DefaultTableSource
allows a TableProvider
to be used during logical planning as a TableSource
.
In other words, FileType
is to FileFormat
as TableSource
is to TableProvider
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the naming confusing -- I would have expected that DefaultFileFormat
implemented FileFormatFactory
or something
Maybe we could call this FileFormatSource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this to DefaultFileType
.
#[derive(Debug)] | ||
pub struct CsvLogicalExtensionCodec; | ||
|
||
// TODO! This is a placeholder for now and needs to be implemented for real. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this comment that I have extended LogicalExtensionCodec
and created placeholder impls, but have not actually implemented logic to serde all built in FileFormat
yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will file follow on tickets
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF}; | ||
use datafusion_sql::planner::ContextProvider; | ||
|
||
struct MockCsvType {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct could be pulled out of this test as an example of LogicalPlanning a COPY
query with a custom FileType
/// File format options. | ||
pub format_options: FormatOptions, | ||
/// File type trait | ||
pub file_type: Arc<dyn FileType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If using DataFusion core
for execution plans, this FileType
can be converted to a FileFormatFactory
which will produce a FileFormat
at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason it can't be FileFormatFactory
directly 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would introduce a dependency between datafusion-expr and datafusion-core. The FileType
trait exists to avoid this dependency and so users only using datafusion for logical planning would not have to implement all of the physical execution related FileFormat
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- I had a good look at this PR, and I think it looks really nice
I did not quite get through the entire PR yet, but I wanted to provide some early feeback
I think it very much adheres to DataFusion's design goals of permiting extension at all points and makes it easier to support new file formats
One thing I didn't fully grok at first was whyn there was a need for FileFormatFactory
-- like why not simply use FileFormat
everywhere.
Then I realized that the reason was that the FileFormat
s have state, specifically default options and that the format itself then carries the specific options
pub struct JsonFormatFactory {
/// default options
options: Option<JsonOptions>,
}
Perhaps we can document this rationale somewhere so others are not confused as well.
Suggestion: Add an example
I think something that would be awesome to add (as a follow on PR perhaps) is an example of creating a custom file format and read/write to it.
@@ -58,6 +74,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug { | |||
/// downcast to a specific implementation. | |||
fn as_any(&self) -> &dyn Any; | |||
|
|||
/// Returns the extension for this FileFormat, e.g. "file.csv" -> csv | |||
fn get_ext(&self) -> String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Is there any reason this has to return an owned String
? Could it return &str
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think &str would be too restrictive, but we could avoid requiring a String allocation with a cow instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep it String for now then
fn get_ext_with_compression( | ||
&self, | ||
_file_compression_type: &FileCompressionType, | ||
) -> Result<String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about owned String
/// The former trait is a superset of the latter trait, which includes execution time | ||
/// relevant methods. [FileType] is only used in logical planning and only implements | ||
/// the subset of methods required during logical planning. | ||
pub struct DefaultFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the naming confusing -- I would have expected that DefaultFileFormat
implemented FileFormatFactory
or something
Maybe we could call this FileFormatSource
?
} | ||
} | ||
|
||
impl FileType for DefaultFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than this wrapper, I wonder if we could implement FileType
for dyn FileFormatFactory
impl FileType for &dyn FileFormatFactory{
...
}
And avoid having to have this struct 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried briefly, but ran into various errors. I am not sure if it is possible with the traits defined in different crates to make this work without a wrapper. I think that is why DefaultTableSource
exists as well.
@@ -237,6 +248,33 @@ impl SessionState { | |||
function_factory: None, | |||
}; | |||
|
|||
#[cfg(feature = "parquet")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -786,32 +779,9 @@ impl DefaultPhysicalPlanner { | |||
table_partition_cols, | |||
overwrite: false, | |||
}; | |||
let mut table_options = session_state.default_table_options(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great this code has been removed from the default planner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @devinjdangelo -- I made it through this PR and I think it looks really nice
I do think it is worth considering how to simplify the traits (possibly unifying the configuration) and there look like there are a few todos, but otherwise this PR looks really nice to me
/// These file types have special built in behavior for configuration. | ||
/// Use TableOptions::Extensions for configuring other file types. | ||
#[derive(Debug, Clone)] | ||
pub enum ConfigFileType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 perhaps we should add some Trait to unify the handling of options for built in formats and custom formats 🤔
match value { | ||
FileType::ARROW => FormatOptions::ARROW, | ||
FileType::AVRO => FormatOptions::AVRO, | ||
#[cfg(feature = "parquet")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is certainly nice to avoid many of these #[cfg(feature = "parquet")]
that were sprinkled throughout the code
datafusion/common/src/config.rs
Outdated
/// # Parameters | ||
/// | ||
/// * `format`: The file format to use (e.g., CSV, Parquet). | ||
pub fn set_file_format(&mut self, format: ConfigFileType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the parameter is called "format" but the struct is ConfigFileType
Maybe we should call this set_config_type
? Or maybe it could just pass in a `Arc~?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the method name for now.
It might make sense for TableOptions
to be broken up and its functionality moved into each implementation of FileFormatFactory
. I have avoided any reference to FileFormat
in datafusion-common, since that would add an indirect dependency on datafusion-core
to datafusion-sql
and other crates.
use bytes::Bytes; | ||
use futures::StreamExt; | ||
|
||
#[test] | ||
fn get_ext_with_compression() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we port these tests instead of just removing them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I moved this functionality to a trait method on each FileFormatFactory
. We could add a test for each format which is similar to this test.
/// File format options. | ||
pub format_options: FormatOptions, | ||
/// File type trait | ||
pub file_type: Arc<dyn FileType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason it can't be FileFormatFactory
directly 🤔
#[derive(Debug)] | ||
pub struct JsonLogicalExtensionCodec; | ||
|
||
// TODO! This is a placeholder for now and needs to be implemented for real. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to complete the TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.
I think it would be OK to cut an follow up ticket for this.
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb for the thorough review as always! I have made a few small changes.
datafusion/common/src/config.rs
Outdated
/// # Parameters | ||
/// | ||
/// * `format`: The file format to use (e.g., CSV, Parquet). | ||
pub fn set_file_format(&mut self, format: ConfigFileType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the method name for now.
It might make sense for TableOptions
to be broken up and its functionality moved into each implementation of FileFormatFactory
. I have avoided any reference to FileFormat
in datafusion-common, since that would add an indirect dependency on datafusion-core
to datafusion-sql
and other crates.
use bytes::Bytes; | ||
use futures::StreamExt; | ||
|
||
#[test] | ||
fn get_ext_with_compression() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I moved this functionality to a trait method on each FileFormatFactory
. We could add a test for each format which is similar to this test.
@@ -58,6 +74,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug { | |||
/// downcast to a specific implementation. | |||
fn as_any(&self) -> &dyn Any; | |||
|
|||
/// Returns the extension for this FileFormat, e.g. "file.csv" -> csv | |||
fn get_ext(&self) -> String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think &str would be too restrictive, but we could avoid requiring a String allocation with a cow instead.
/// The former trait is a superset of the latter trait, which includes execution time | ||
/// relevant methods. [FileType] is only used in logical planning and only implements | ||
/// the subset of methods required during logical planning. | ||
pub struct DefaultFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this to DefaultFileType
.
} | ||
} | ||
|
||
impl FileType for DefaultFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried briefly, but ran into various errors. I am not sure if it is possible with the traits defined in different crates to make this work without a wrapper. I think that is why DefaultTableSource
exists as well.
/// File format options. | ||
pub format_options: FormatOptions, | ||
/// File type trait | ||
pub file_type: Arc<dyn FileType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would introduce a dependency between datafusion-expr and datafusion-core. The FileType
trait exists to avoid this dependency and so users only using datafusion for logical planning would not have to implement all of the physical execution related FileFormat
methods.
#[derive(Debug)] | ||
pub struct JsonLogicalExtensionCodec; | ||
|
||
// TODO! This is a placeholder for now and needs to be implemented for real. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.
I think it would be OK to cut an follow up ticket for this.
/// These file types have special built in behavior for configuration. | ||
/// Use TableOptions::Extensions for configuring other file types. | ||
#[derive(Debug, Clone)] | ||
pub enum ConfigFileType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could potentially remove the TableOptions
code and instead have each FileFormatFactory
handle configuration. This is actually mostly the case in this PR already, but TableOptions
is a common helper.
Thank you, @devinjdangelo, for proposing this extensibility feature. I would like to clarify a few points to ensure I understand correctly:
If these points are correct, I am considering whether we could move these states (extension options, like |
Thank you for taking a look @berkaysynnada
That is correct. Logical planning cannot depend on FileFormatFactory or FileFormat to avoid a dependency on datafusion's execution engine during logical planning.
Yes, |
I wonder if it wuld be possible to store the file format state outside the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets see if we can find some way to avoid the extra level of stateful factories. If not then I think this PR could still be merged
Before we merge I'll file follow on tickets for supporting protobuf encoding
I think this is possible, but it would also make implementing pub trait FileFormat{
async fn create_physical_plan(
&self,
state: &SessionState,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn ExecutionPlan>>;
...
} And each method would have to parse the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should proceed with this PR, even though some of the traits are less than ideal (due to having to handle state)
Also before merging we should file the follow on tickets you suggest in #11060 (review)
Let's leave it open for another day or two to gather additional comments
Thanks again for pushing this part of the code forward
I merged up to resolve a conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #11150 to track moving the extension codes
I plan to merge this PR when CI is complete
Forward ! Thank you @devinjdangelo and @Weijun-H |
…pache#11060) * wip create and register ext file types with session * Add contains function, and support in datafusion substrait consumer (apache#10879) * adding new function contains * adding substrait test * adding doc * adding doc * Update docs/source/user-guide/sql/scalar_functions.md Co-authored-by: Alex Huang <[email protected]> * adding entry --------- Co-authored-by: Alex Huang <[email protected]> * logical planning updated * compiling * removing filetype enum * compiling * working on tests * fix some tests * test fixes * cli fix * cli fmt * Update datafusion/core/src/datasource/file_format/mod.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/execution/session_state.rs Co-authored-by: Andrew Lamb <[email protected]> * review comments * review comments * review comments * typo fix * fmt * fix err log style * fmt --------- Co-authored-by: Lordworms <[email protected]> Co-authored-by: Alex Huang <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
…pache#11060) * wip create and register ext file types with session * Add contains function, and support in datafusion substrait consumer (apache#10879) * adding new function contains * adding substrait test * adding doc * adding doc * Update docs/source/user-guide/sql/scalar_functions.md Co-authored-by: Alex Huang <[email protected]> * adding entry --------- Co-authored-by: Alex Huang <[email protected]> * logical planning updated * compiling * removing filetype enum * compiling * working on tests * fix some tests * test fixes * cli fix * cli fmt * Update datafusion/core/src/datasource/file_format/mod.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/execution/session_state.rs Co-authored-by: Andrew Lamb <[email protected]> * review comments * review comments * review comments * typo fix * fmt * fix err log style * fmt --------- Co-authored-by: Lordworms <[email protected]> Co-authored-by: Alex Huang <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
Which issue does this PR close?
Closes #8345
Revives #8667
Rationale for this change
Previously,
COPY TO
queries could only be planned/executed for a preset enum ofFileTypes
.CREATE EXTERNAL TABLE
supports arbitrary file formats via theFileFormat
andTableProvider
traits. This PR removes theFileType
enum in favor of aFileType
trait and updatesCOPY TO
plans to work with any externally implementeddyn FileFormat
.This PR also enables future work to remove csv, parquet, json, ect... formats into dedicated crates (datafusion-csv, ...).
What changes are included in this PR?
FileType
enum and replaces with a traitFileFormat
that implementsFileType
to enable conversions between the twoLogicalExtensionCodec
to allow serde ofdyn FileType
Are these changes tested?
Existing tests verify behavior for currently supported formats remains the same.
Are there any user-facing changes?
Yes, there are API changes relevant for developers using DataFusion.