-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: add example for custom file format with COPY TO
#11174
Merged
Merged
Changes from 9 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
9c02757
feat: add example for copy to
tshauck 19ab39a
better docs plus tempdir
tshauck 2d410b3
build: clean examples if over 10GB
tshauck 96a4b3c
only 1GB
tshauck 9c31385
build: try clearing some disk space before running
tshauck 01147d2
build: remove sudo
tshauck c1af955
build: try clean
tshauck 506212f
build: run clean
tshauck fdbd2c8
build: only clean examples
tshauck 77ff942
docs: better output for example
tshauck File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::{any::Any, sync::Arc}; | ||
|
||
use arrow::array::{RecordBatch, StringArray, UInt8Array}; | ||
use arrow_schema::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion::{ | ||
datasource::{ | ||
file_format::{ | ||
csv::CsvFormatFactory, file_compression_type::FileCompressionType, | ||
FileFormat, FileFormatFactory, | ||
}, | ||
physical_plan::{FileScanConfig, FileSinkConfig}, | ||
MemTable, | ||
}, | ||
error::Result, | ||
execution::{context::SessionState, runtime_env::RuntimeEnv}, | ||
physical_plan::ExecutionPlan, | ||
prelude::{SessionConfig, SessionContext}, | ||
}; | ||
use datafusion_common::{GetExt, Statistics}; | ||
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; | ||
use object_store::{ObjectMeta, ObjectStore}; | ||
use tempfile::tempdir; | ||
|
||
/// Example of a custom file format that reads and writes TSV files. | ||
/// | ||
/// TSVFileFormatFactory is responsible for creating instances of TSVFileFormat. | ||
/// The former, once registered with the SessionState, will then be used | ||
/// to facilitate SQL operations on TSV files, such as `COPY TO` shown here. | ||
|
||
#[derive(Debug)] | ||
/// Custom file format that reads and writes TSV files | ||
/// | ||
/// This file format is a wrapper around the CSV file format | ||
/// for demonstration purposes. | ||
struct TSVFileFormat { | ||
csv_file_format: Arc<dyn FileFormat>, | ||
} | ||
|
||
impl TSVFileFormat { | ||
pub fn new(csv_file_format: Arc<dyn FileFormat>) -> Self { | ||
Self { csv_file_format } | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl FileFormat for TSVFileFormat { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
|
||
fn get_ext(&self) -> String { | ||
"tsv".to_string() | ||
} | ||
|
||
fn get_ext_with_compression( | ||
&self, | ||
c: &FileCompressionType, | ||
) -> datafusion::error::Result<String> { | ||
if c == &FileCompressionType::UNCOMPRESSED { | ||
Ok("tsv".to_string()) | ||
} else { | ||
todo!("Compression not supported") | ||
} | ||
} | ||
|
||
async fn infer_schema( | ||
&self, | ||
state: &SessionState, | ||
store: &Arc<dyn ObjectStore>, | ||
objects: &[ObjectMeta], | ||
) -> Result<SchemaRef> { | ||
self.csv_file_format | ||
.infer_schema(state, store, objects) | ||
.await | ||
} | ||
|
||
async fn infer_stats( | ||
&self, | ||
state: &SessionState, | ||
store: &Arc<dyn ObjectStore>, | ||
table_schema: SchemaRef, | ||
object: &ObjectMeta, | ||
) -> Result<Statistics> { | ||
self.csv_file_format | ||
.infer_stats(state, store, table_schema, object) | ||
.await | ||
} | ||
|
||
async fn create_physical_plan( | ||
&self, | ||
state: &SessionState, | ||
conf: FileScanConfig, | ||
filters: Option<&Arc<dyn PhysicalExpr>>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
self.csv_file_format | ||
.create_physical_plan(state, conf, filters) | ||
.await | ||
} | ||
|
||
async fn create_writer_physical_plan( | ||
&self, | ||
input: Arc<dyn ExecutionPlan>, | ||
state: &SessionState, | ||
conf: FileSinkConfig, | ||
order_requirements: Option<Vec<PhysicalSortRequirement>>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
self.csv_file_format | ||
.create_writer_physical_plan(input, state, conf, order_requirements) | ||
.await | ||
} | ||
} | ||
|
||
#[derive(Default)] | ||
/// Factory for creating TSV file formats | ||
/// | ||
/// This factory is a wrapper around the CSV file format factory | ||
/// for demonstration purposes. | ||
pub struct TSVFileFactory { | ||
csv_file_factory: CsvFormatFactory, | ||
} | ||
|
||
impl TSVFileFactory { | ||
pub fn new() -> Self { | ||
Self { | ||
csv_file_factory: CsvFormatFactory::new(), | ||
} | ||
} | ||
} | ||
|
||
impl FileFormatFactory for TSVFileFactory { | ||
fn create( | ||
&self, | ||
state: &SessionState, | ||
format_options: &std::collections::HashMap<String, String>, | ||
) -> Result<std::sync::Arc<dyn FileFormat>> { | ||
let mut new_options = format_options.clone(); | ||
new_options.insert("format.delimiter".to_string(), "\t".to_string()); | ||
|
||
let csv_file_format = self.csv_file_factory.create(state, &new_options)?; | ||
let tsv_file_format = Arc::new(TSVFileFormat::new(csv_file_format)); | ||
|
||
Ok(tsv_file_format) | ||
} | ||
|
||
fn default(&self) -> std::sync::Arc<dyn FileFormat> { | ||
todo!() | ||
} | ||
} | ||
|
||
impl GetExt for TSVFileFactory { | ||
fn get_ext(&self) -> String { | ||
"tsv".to_string() | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
// Create a new context with the default configuration | ||
let config = SessionConfig::new(); | ||
let runtime = RuntimeEnv::default(); | ||
let mut state = SessionState::new_with_config_rt(config, Arc::new(runtime)); | ||
|
||
// Register the custom file format | ||
let file_format = Arc::new(TSVFileFactory::new()); | ||
state.register_file_format(file_format, true).unwrap(); | ||
|
||
// Create a new context with the custom file format | ||
let ctx = SessionContext::new_with_state(state); | ||
|
||
let mem_table = create_mem_table(); | ||
ctx.register_table("mem_table", mem_table).unwrap(); | ||
|
||
let temp_dir = tempdir().unwrap(); | ||
let table_save_path = temp_dir.path().join("mem_table.tsv"); | ||
|
||
let d = ctx | ||
.sql(&format!( | ||
"COPY mem_table TO '{}' STORED AS TSV;", | ||
table_save_path.display(), | ||
)) | ||
.await?; | ||
|
||
let results = d.collect().await?; | ||
println!("Number of inserted rows: {:?}", results[0]); | ||
tshauck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Ok(()) | ||
} | ||
|
||
// create a simple mem table | ||
fn create_mem_table() -> Arc<MemTable> { | ||
let fields = vec![ | ||
Field::new("id", DataType::UInt8, false), | ||
Field::new("data", DataType::Utf8, false), | ||
]; | ||
let schema = Arc::new(Schema::new(fields)); | ||
|
||
let partitions = RecordBatch::try_new( | ||
schema.clone(), | ||
vec![ | ||
Arc::new(UInt8Array::from(vec![1, 2])), | ||
Arc::new(StringArray::from(vec!["foo", "bar"])), | ||
], | ||
) | ||
.unwrap(); | ||
|
||
Arc::new(MemTable::try_new(schema, vec![vec![partitions]]).unwrap()) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into consistent build issues where the runner would run out of disk space. This is the best solution I could come up with. It seems to add about 5-10 seconds to the example action, which takes about 12 minutes overall.
Certainly open to alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am hitting similar issues with trying to add new examples in #11089
This seems like a good idea to me
My best hope is to move some of the example binaries into inline examples in the docs instead: #11178
Hopefully that will free up some additional space as well as make the examples easier to navigate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
er, sorry this came up. There are a few other tricks here (actions/runner-images#2840 (comment)) to clear up some disk space that may help also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries! I think this is a good excuse / reason to take another pass through the examples directory (and the library guide that you started however long ago).