Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement physical plan serialization for COPY plans CsvLogicalExtensionCodec #11150

Closed
alamb opened this issue Jun 27, 2024 · 7 comments · Fixed by #11735
Closed

Implement physical plan serialization for COPY plans CsvLogicalExtensionCodec #11150

alamb opened this issue Jun 27, 2024 · 7 comments · Fixed by #11735
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jun 27, 2024

Is your feature request related to a problem or challenge?

As part of #11060, @devinjdangelo made file format support into a Trait which is good!

However the code to serialize these new (dynamic) structures is not yet implemented

As @devinjdangelo says https://github.com/apache/datafusion/pull/11060/files#r1650268578

Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.

It turns out there are no unit tests for them either so no tests failed

Describe the solution you'd like

Implement the named codec for serializing plans and a test for it

Describe alternatives you've considered

The code is here: datafusion/proto/src/logical_plan/file_formats.rs
The test would go here: https://github.com/apache/datafusion/blob/main/datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Note there is already coverage for LogicalPlans here:

async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
let ctx = SessionContext::new();
let input = create_csv_scan(&ctx).await?;
let mut table_options = ctx.copied_table_options();
table_options.set_file_format(FileType::CSV);
table_options.set("format.delimiter", ";")?;
let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.csv".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
format_options: FormatOptions::CSV(table_options.csv.clone()),
options: Default::default(),
});
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
Ok(())
}

Additional context

There are several other codecs needed:

  • JsonLogicalExtensionCodec
  • ParquetLogicalExtensionCodec
  • ArrowLogicalExtensionCodec
  • AvroLogicalExtensionCodec

However, I think we need to get one example done and then we can file tickets to fill out the others

Maybe this is what @lewiszlw was getting at with #11095 / https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/composed_extension_codec.rs

@Lordworms
Copy link
Contributor

take

@Lordworms
Copy link
Contributor

starts to work on this one, was delayed by the substrait one.

@Lordworms
Copy link
Contributor

When I was doing this one, I added a test like

#[tokio::test]
async fn roundtrip_physical_plan_copy_to_sql_options() -> Result<()> {
    // Create a new SessionContext
    let ctx = SessionContext::new();
    let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
    // Create a CSV scan as input
    let input = create_csv_scan(&ctx).await?;
    let plan = LogicalPlan::Copy(CopyTo {
        input: Arc::new(input),
        output_url: "test.csv".to_string(),
        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
        file_type,
        options: Default::default(),
    });

    // Convert the logical plan to a physical plan
    let planner = DefaultPhysicalPlanner::default();
    let physical_plan = planner.create_physical_plan(&plan, &ctx.state()).await?;
    roundtrip_test(physical_plan)
}

I find that it seems like the roundtrip in physical plan test should be ok since the fileformatfactory is translated into Fileformat automaticly here

LogicalPlan::Copy(CopyTo {
                input,
                output_url,
                file_type,
                partition_by,
                options: source_option_tuples,
            }) => {
                let input_exec = children.one()?;
                let parsed_url = ListingTableUrl::parse(output_url)?;
                let object_store_url = parsed_url.object_store();

                let schema: Schema = (**input.schema()).clone().into();

                // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
                // from the schema of the RecordBatch being written. This allows COPY statements to specify only
                // the column name rather than column name + explicit data type.
                let table_partition_cols = partition_by
                    .iter()
                    .map(|s| (s.to_string(), arrow_schema::DataType::Null))
                    .collect::<Vec<_>>();

                let keep_partition_by_columns = match source_option_tuples
                    .get("execution.keep_partition_by_columns")
                    .map(|v| v.trim()) {
                    None => session_state.config().options().execution.keep_partition_by_columns,
                    Some("true") => true,
                    Some("false") => false,
                    Some(value) =>
                        return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
                };

                // Set file sink related options
                let config = FileSinkConfig {
                    object_store_url,
                    table_paths: vec![parsed_url],
                    file_groups: vec![],
                    output_schema: Arc::new(schema),
                    table_partition_cols,
                    overwrite: false,
                    keep_partition_by_columns,
                };

                let sink_format = file_type_to_format(file_type)?
                    .create(session_state, source_option_tuples)?;

                sink_format
                    .create_writer_physical_plan(input_exec, session_state, config, None)
                    .await?
            }

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

@alamb
Copy link
Contributor Author

alamb commented Jul 20, 2024

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Jul 20, 2024

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

@Lordworms
Copy link
Contributor

Lordworms commented Jul 20, 2024

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

yes, that's why I am confused since it is transformed internally

@Lordworms
Copy link
Contributor

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

let me take a look and

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

sure, I'll take a look then

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants