From a7ff7a5e97dbaf483ccde9408a53cfdf13120243 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 23 Dec 2024 00:57:40 +0800 Subject: [PATCH] Consolidate Example: dataframe_output.rs into dataframe.rs (#13877) --- datafusion-examples/README.md | 3 +- datafusion-examples/examples/dataframe.rs | 67 ++++++++++++++++ .../examples/dataframe_output.rs | 78 ------------------- 3 files changed, 68 insertions(+), 80 deletions(-) delete mode 100644 datafusion-examples/examples/dataframe_output.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index aca600e50e4f..a155920eadc4 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -57,8 +57,7 @@ cargo run --example dataframe - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 -- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data -- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame +- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file. - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 59766e881e8b..5d5414e3d8b4 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -17,8 +17,12 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::prelude::*; +use datafusion_common::config::CsvOptions; +use datafusion_common::parsers::CompressionTypeVariant; +use datafusion_common::DataFusionError; use std::fs::File; use std::io::Write; use std::sync::Arc; @@ -29,6 +33,11 @@ use tempfile::tempdir; /// * [read_parquet]: execute queries against parquet files /// * [read_csv]: execute queries against csv files /// * [read_memory]: execute queries against in-memory arrow data +/// +/// This example demonstrates the various methods to write out a DataFrame to local storage. +/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example +/// using a remote object store. +/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file #[tokio::main] async fn main() -> Result<()> { // The SessionContext is the main high level API for interacting with DataFusion @@ -36,6 +45,7 @@ async fn main() -> Result<()> { read_parquet(&ctx).await?; read_csv(&ctx).await?; read_memory(&ctx).await?; + write_out(&ctx).await?; Ok(()) } @@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> { Ok(()) } + +/// Use the DataFrame API to: +/// 1. Write out a DataFrame to a table +/// 2. Write out a DataFrame to a parquet file +/// 3. Write out a DataFrame to a csv file +/// 4. Write out a DataFrame to a json file +async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> { + let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap(); + + // Ensure the column names and types match the target table + df = df.with_column_renamed("column1", "tablecol1").unwrap(); + + ctx.sql( + "create external table + test(tablecol1 varchar) + stored as parquet + location './datafusion-examples/test_table/'", + ) + .await? + .collect() + .await?; + + // This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c'). + // The behavior of write_table depends on the TableProvider's implementation + // of the insert_into method. + df.clone() + .write_table("test", DataFrameWriteOptions::new()) + .await?; + + df.clone() + .write_parquet( + "./datafusion-examples/test_parquet/", + DataFrameWriteOptions::new(), + None, + ) + .await?; + + df.clone() + .write_csv( + "./datafusion-examples/test_csv/", + // DataFrameWriteOptions contains options which control how data is written + // such as compression codec + DataFrameWriteOptions::new(), + Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)), + ) + .await?; + + df.clone() + .write_json( + "./datafusion-examples/test_json/", + DataFrameWriteOptions::new(), + None, + ) + .await?; + + Ok(()) +} diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs deleted file mode 100644 index 60ca090d722d..000000000000 --- a/datafusion-examples/examples/dataframe_output.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::{dataframe::DataFrameWriteOptions, prelude::*}; -use datafusion_common::config::CsvOptions; -use datafusion_common::{parsers::CompressionTypeVariant, DataFusionError}; - -/// This example demonstrates the various methods to write out a DataFrame to local storage. -/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example -/// using a remote object store. -#[tokio::main] -async fn main() -> Result<(), DataFusionError> { - let ctx = SessionContext::new(); - - let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap(); - - // Ensure the column names and types match the target table - df = df.with_column_renamed("column1", "tablecol1").unwrap(); - - ctx.sql( - "create external table - test(tablecol1 varchar) - stored as parquet - location './datafusion-examples/test_table/'", - ) - .await? - .collect() - .await?; - - // This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c'). - // The behavior of write_table depends on the TableProvider's implementation - // of the insert_into method. - df.clone() - .write_table("test", DataFrameWriteOptions::new()) - .await?; - - df.clone() - .write_parquet( - "./datafusion-examples/test_parquet/", - DataFrameWriteOptions::new(), - None, - ) - .await?; - - df.clone() - .write_csv( - "./datafusion-examples/test_csv/", - // DataFrameWriteOptions contains options which control how data is written - // such as compression codec - DataFrameWriteOptions::new(), - Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)), - ) - .await?; - - df.clone() - .write_json( - "./datafusion-examples/test_json/", - DataFrameWriteOptions::new(), - None, - ) - .await?; - - Ok(()) -}