Skip to content

Commit

Permalink
Support Compression in parquet-fromcsv (#4160)
Browse files Browse the repository at this point in the history
* parquet-fromcsv support read compressed

* add test

* fix for clippy

* fix label error

* add dependences for parquet_fromcsv

* Unified import format of decompression packages
  • Loading branch information
suxiaogang223 authored May 3, 2023
1 parent eb5ac69 commit 547cb80
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 12 deletions.
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ required-features = ["cli"]

[[bin]]
name = "parquet-fromcsv"
required-features = ["arrow", "cli"]
required-features = ["arrow", "cli", "snap", "brotli", "flate2", "lz4", "zstd"]

[[bin]]
name = "parquet-show-bloom-filter"
Expand Down
7 changes: 6 additions & 1 deletion parquet/src/bin/parquet-fromcsv-help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ Options:

[possible values: true, false]

-C, --csv-compression <CSV_COMPRESSION>
compression mode of csv

[default: UNCOMPRESSED]

-c, --parquet-compression <PARQUET_COMPRESSION>
compression mode
compression mode of parquet

[default: SNAPPY]

Expand Down
126 changes: 116 additions & 10 deletions parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
//! ```text
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-C`, `--csv-compression` : Compression option for csv, default is UNCOMPRESSED
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
Expand All @@ -72,6 +73,7 @@
use std::{
fmt::Display,
fs::{read_to_string, File},
io::Read,
path::{Path, PathBuf},
sync::Arc,
};
Expand Down Expand Up @@ -193,7 +195,10 @@ struct Args {
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
#[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)]
#[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
#[clap(value_parser=compression_from_str)]
csv_compression: Compression,
#[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
#[clap(value_parser=compression_from_str)]
parquet_compression: Compression,

Expand Down Expand Up @@ -368,9 +373,31 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
&format!("Failed to open input file {:#?}", &args.input_file),
)
})?;

// open input file decoder
let input_file_decoder = match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => {
Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::GZIP(_) => {
Box::new(flate2::read::GzDecoder::new(input_file)) as Box<dyn Read>
}
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create lz4::Decoder")
})?) as Box<dyn Read>,
Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
})?) as Box<dyn Read>,
d => unimplemented!("compression type {d}"),
};

// create input csv reader
let builder = configure_reader_builder(args, arrow_schema);
let reader = builder.build(input_file)?;
let reader = builder.build(input_file_decoder)?;
for batch_result in reader {
let batch = batch_result.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
Expand All @@ -393,13 +420,17 @@ fn main() -> Result<(), ParquetFromCsvError> {
#[cfg(test)]
mod tests {
use std::{
io::{Seek, Write},
io::Write,
path::{Path, PathBuf},
};

use super::*;
use arrow::datatypes::{DataType, Field};
use brotli::CompressorWriter;
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;

#[test]
Expand Down Expand Up @@ -558,6 +589,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand Down Expand Up @@ -593,6 +625,7 @@ mod tests {
escape_char: Some('\\'),
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand All @@ -616,8 +649,7 @@ mod tests {
assert_debug_text(&builder_debug, "escape", "Some(92)");
}

#[test]
fn test_convert_csv_to_parquet() {
fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message schema {
optional int32 id;
Expand All @@ -626,14 +658,71 @@ mod tests {
schema.as_file().write_all(schema_text.as_bytes()).unwrap();

let mut input_file = NamedTempFile::new().unwrap();
{
let csv = input_file.as_file_mut();

fn write_tmp_file<T: Write>(w: &mut T) {
for index in 1..2000 {
write!(csv, "{index},\"name_{index}\"\r\n").unwrap();
write!(w, "{index},\"name_{index}\"\r\n").unwrap();
}
csv.flush().unwrap();
csv.rewind().unwrap();
w.flush().unwrap();
}

// make sure the input_file's lifetime being long enough
input_file = match csv_compression {
Compression::UNCOMPRESSED => {
write_tmp_file(&mut input_file);
input_file
}
Compression::SNAPPY => {
let mut encoder = FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
encoder.into_inner().unwrap()
}
Compression::GZIP(level) => {
let mut encoder = GzEncoder::new(
input_file,
flate2::Compression::new(level.compression_level()),
);
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::BROTLI(level) => {
let mut encoder =
CompressorWriter::new(input_file, 0, level.compression_level(), 0);
write_tmp_file(&mut encoder);
encoder.into_inner()
}
Compression::LZ4 => {
let mut encoder = lz4::EncoderBuilder::new()
.build(input_file)
.map_err(|e| {
ParquetFromCsvError::with_context(
e,
"Failed to create lz4::Encoder",
)
})
.unwrap();
write_tmp_file(&mut encoder);
let (inner, err) = encoder.finish();
err.unwrap();
inner
}

Compression::ZSTD(level) => {
let mut encoder =
zstd::Encoder::new(input_file, level.compression_level())
.map_err(|e| {
ParquetFromCsvError::with_context(
e,
"Failed to create zstd::Encoder",
)
})
.unwrap();
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
d => unimplemented!("compression type {d}"),
};

let output_parquet = NamedTempFile::new().unwrap();

let args = Args {
Expand All @@ -648,6 +737,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
Expand All @@ -657,4 +747,20 @@ mod tests {
};
convert_csv_to_parquet(&args).unwrap();
}

#[test]
fn test_convert_csv_to_parquet() {
test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
test_convert_compressed_csv_to_parquet(Compression::GZIP(
GzipLevel::try_new(1).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::BROTLI(
BrotliLevel::try_new(2).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::LZ4);
test_convert_compressed_csv_to_parquet(Compression::ZSTD(
ZstdLevel::try_new(1).unwrap(),
));
}
}

0 comments on commit 547cb80

Please sign in to comment.