-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement physical plan serialization for COPY plans CsvLogicalExtensionCodec
#11150
Comments
take |
starts to work on this one, was delayed by the substrait one. |
When I was doing this one, I added a test like #[tokio::test]
async fn roundtrip_physical_plan_copy_to_sql_options() -> Result<()> {
// Create a new SessionContext
let ctx = SessionContext::new();
let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
// Create a CSV scan as input
let input = create_csv_scan(&ctx).await?;
let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.csv".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
// Convert the logical plan to a physical plan
let planner = DefaultPhysicalPlanner::default();
let physical_plan = planner.create_physical_plan(&plan, &ctx.state()).await?;
roundtrip_test(physical_plan)
} I find that it seems like the roundtrip in physical plan test should be ok since the fileformatfactory is translated into Fileformat automaticly here LogicalPlan::Copy(CopyTo {
input,
output_url,
file_type,
partition_by,
options: source_option_tuples,
}) => {
let input_exec = children.one()?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();
let schema: Schema = (**input.schema()).clone().into();
// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
// from the schema of the RecordBatch being written. This allows COPY statements to specify only
// the column name rather than column name + explicit data type.
let table_partition_cols = partition_by
.iter()
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();
let keep_partition_by_columns = match source_option_tuples
.get("execution.keep_partition_by_columns")
.map(|v| v.trim()) {
None => session_state.config().options().execution.keep_partition_by_columns,
Some("true") => true,
Some("false") => false,
Some(value) =>
return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
};
// Set file sink related options
let config = FileSinkConfig {
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols,
overwrite: false,
keep_partition_by_columns,
};
let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
sink_format
.create_writer_physical_plan(input_exec, session_state, config, None)
.await?
} So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb |
Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea |
The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior. datafusion/datafusion/core/src/datasource/file_format/csv.rs Lines 60 to 62 in 827d0e3
The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented: datafusion/datafusion/proto/src/logical_plan/file_formats.rs Lines 74 to 89 in 827d0e3
You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes. |
yes, that's why I am confused since it is transformed internally |
let me take a look and
sure, I'll take a look then |
Is your feature request related to a problem or challenge?
As part of #11060, @devinjdangelo made file format support into a Trait which is good!
However the code to serialize these new (dynamic) structures is not yet implemented
As @devinjdangelo says https://github.com/apache/datafusion/pull/11060/files#r1650268578
It turns out there are no unit tests for them either so no tests failed
Describe the solution you'd like
Implement the named codec for serializing plans and a test for it
Describe alternatives you've considered
The code is here:
datafusion/proto/src/logical_plan/file_formats.rs
The test would go here: https://github.com/apache/datafusion/blob/main/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Note there is already coverage for LogicalPlans here:
datafusion/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Lines 325 to 346 in d2ff218
Additional context
There are several other codecs needed:
JsonLogicalExtensionCodec
ParquetLogicalExtensionCodec
ArrowLogicalExtensionCodec
AvroLogicalExtensionCodec
However, I think we need to get one example done and then we can file tickets to fill out the others
Maybe this is what @lewiszlw was getting at with #11095 / https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/composed_extension_codec.rs
The text was updated successfully, but these errors were encountered: