Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Compression in parquet-fromcsv #4160

Merged
merged 6 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion parquet/src/bin/parquet-fromcsv-help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ Options:

[possible values: true, false]

-C, --csv-compression <CSV_COMPRESSION>
compression mode of csv

[default: UNCOMPRESSED]

-c, --parquet-compression <PARQUET_COMPRESSION>
compression mode
compression mode of parquet

[default: SNAPPY]

Expand Down
126 changes: 116 additions & 10 deletions parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
//! ```text
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-C`, `--csv-compression` : Compression option for csv, default is UNCOMPRESSED
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
Expand All @@ -72,20 +73,24 @@
use std::{
fmt::Display,
fs::{read_to_string, File},
io::Read,
path::{Path, PathBuf},
sync::Arc,
};

use arrow_csv::ReaderBuilder;
use arrow_schema::{ArrowError, Schema};
use brotli::Decompressor;
use clap::{Parser, ValueEnum};
use flate2::read::GzDecoder;
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
use parquet::{
arrow::{parquet_to_arrow_schema, ArrowWriter},
basic::Compression,
errors::ParquetError,
file::properties::{WriterProperties, WriterVersion},
schema::{parser::parse_message_type, types::SchemaDescriptor},
};
use snap::read::FrameDecoder;

#[derive(Debug)]
enum ParquetFromCsvError {
Expand Down Expand Up @@ -193,7 +198,10 @@ struct Args {
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
#[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)]
#[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
#[clap(value_parser=compression_from_str)]
csv_compression: Compression,
#[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
#[clap(value_parser=compression_from_str)]
parquet_compression: Compression,

Expand Down Expand Up @@ -368,9 +376,28 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
&format!("Failed to open input file {:#?}", &args.input_file),
)
})?;

// open input file decoder
let input_file_decoder = match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => Box::new(FrameDecoder::new(input_file)) as Box<dyn Read>,
Compression::GZIP(_) => Box::new(GzDecoder::new(input_file)) as Box<dyn Read>,
Compression::BROTLI(_) => {
Box::new(Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create lz4::Decoder")
})?) as Box<dyn Read>,
Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
})?) as Box<dyn Read>,
// TODO: I wonder which crates should i use to decompress lzo and lz4_raw?
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
_ => panic!("compression type not support yet"),
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
};

// create input csv reader
let builder = configure_reader_builder(args, arrow_schema);
let reader = builder.build(input_file)?;
let reader = builder.build(input_file_decoder)?;
for batch_result in reader {
let batch = batch_result.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
Expand All @@ -393,13 +420,17 @@ fn main() -> Result<(), ParquetFromCsvError> {
#[cfg(test)]
mod tests {
use std::{
io::{Seek, Write},
io::Write,
path::{Path, PathBuf},
};

use super::*;
use arrow::datatypes::{DataType, Field};
use brotli::CompressorWriter;
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;

#[test]
Expand Down Expand Up @@ -558,6 +589,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand Down Expand Up @@ -593,6 +625,7 @@ mod tests {
escape_char: Some('\\'),
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand All @@ -616,8 +649,7 @@ mod tests {
assert_debug_text(&builder_debug, "escape", "Some(92)");
}

#[test]
fn test_convert_csv_to_parquet() {
fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message schema {
optional int32 id;
Expand All @@ -626,14 +658,71 @@ mod tests {
schema.as_file().write_all(schema_text.as_bytes()).unwrap();

let mut input_file = NamedTempFile::new().unwrap();
{
let csv = input_file.as_file_mut();

fn wirte_tmp_file<T: Write>(w: &mut T) {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
for index in 1..2000 {
write!(csv, "{index},\"name_{index}\"\r\n").unwrap();
write!(w, "{index},\"name_{index}\"\r\n").unwrap();
}
csv.flush().unwrap();
csv.rewind().unwrap();
w.flush().unwrap();
}

// make sure the input_file's lifetime being long enough
input_file = match csv_compression {
Compression::UNCOMPRESSED => {
wirte_tmp_file(&mut input_file);
input_file
}
Compression::SNAPPY => {
let mut encoder = FrameEncoder::new(input_file);
wirte_tmp_file(&mut encoder);
encoder.into_inner().unwrap()
}
Compression::GZIP(level) => {
let mut encoder = GzEncoder::new(
input_file,
flate2::Compression::new(level.compression_level()),
);
wirte_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::BROTLI(level) => {
let mut encoder =
CompressorWriter::new(input_file, 0, level.compression_level(), 0);
wirte_tmp_file(&mut encoder);
encoder.into_inner()
}
Compression::LZ4 => {
let mut encoder = lz4::EncoderBuilder::new()
.build(input_file)
.map_err(|e| {
ParquetFromCsvError::with_context(
e,
"Failed to create lz4::Encoder",
)
})
.unwrap();
wirte_tmp_file(&mut encoder);
let (inner, err) = encoder.finish();
err.unwrap();
inner
}

Compression::ZSTD(level) => {
let mut encoder =
zstd::Encoder::new(input_file, level.compression_level())
.map_err(|e| {
ParquetFromCsvError::with_context(
e,
"Failed to create zstd::Encoder",
)
})
.unwrap();
wirte_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
_ => panic!("compression type not support yet"),
};

let output_parquet = NamedTempFile::new().unwrap();

let args = Args {
Expand All @@ -648,6 +737,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand All @@ -657,4 +747,20 @@ mod tests {
};
convert_csv_to_parquet(&args).unwrap();
}

#[test]
fn test_convert_csv_to_parquet() {
test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
test_convert_compressed_csv_to_parquet(Compression::GZIP(
GzipLevel::try_new(1).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::BROTLI(
BrotliLevel::try_new(2).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::LZ4);
test_convert_compressed_csv_to_parquet(Compression::ZSTD(
ZstdLevel::try_new(1).unwrap(),
));
}
}