From 5ba458d2ac1486c89d0faa1a8468b163d4490951 Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Wed, 16 Aug 2023 14:41:57 -0400 Subject: [PATCH 1/6] Testing out adding zstd support --- Cargo.toml | 5 ++++- src/io/mod.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6120112..391b444 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,12 @@ keywords = ["utilities"] [dependencies] thiserror = "^1" -# For auto-gzip handing of files +# For auto-gzip handling of files flate2 = "^1" +# For auto-zstd handling of files +zstd = "0.12.4" + # For auto-serialization of structs to csv/tsv csv = "^1" serde = { version = "^1.0.123", features = ["derive"] } diff --git a/src/io/mod.rs b/src/io/mod.rs index 1d7c2b6..68a67c8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -4,7 +4,7 @@ //! I/O activities, such a slurping a file by lines, or writing a collection of `Serializable` //! objects to a path. //! -//! The two core parts of this module are teh [`Io`] and [`DelimFile`] structs. These structs provide +//! The two core parts of this module are the [`Io`] and [`DelimFile`] structs. These structs provide //! methods for reading and writing to files that transparently handle compression based on the //! file extension of the path given to the methods. //! @@ -51,10 +51,14 @@ use flate2::bufread::MultiGzDecoder; use flate2::write::GzEncoder; use flate2::Compression; use serde::{de::DeserializeOwned, Serialize}; +use zstd::stream::{Decoder, Encoder}; /// The set of file extensions to treat as GZIPPED const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; +/// The set of file extensions to treat as ZSTD compressed +const ZSTD_EXTENSIONS: [&str; 1] = ["zst"]; + /// The default buffer size when creating buffered readers/writers const BUFFER_SIZE: usize = 64 * 1024; @@ -90,8 +94,19 @@ impl Io { } } - /// Opens a file for reading. Transparently handles reading gzipped files based - /// extension. + /// Returns true if the path ends with a recognized ZSTD file extension + fn is_zstd_path>(p: &P) -> bool { + if let Some(ext) = p.as_ref().extension() { + match ext.to_str() { + Some(x) => ZSTD_EXTENSIONS.contains(&x), + None => false, + } + } else { + false + } + } + + /// Opens a file for reading. Transparently handles decoding gzip and zstd files. pub fn new_reader

(&self, p: &P) -> Result> where P: AsRef, @@ -101,13 +116,14 @@ impl Io { if Self::is_gzip_path(p) { Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf)))) + } else if Self::is_zstd_path(p) { + Ok(Box::new(BufReader::with_capacity(self.buffer_size, Decoder::new(buf).unwrap()))) } else { Ok(Box::new(buf)) } } - /// Opens a file for writing. Transparently handles writing GZIP'd data if the file - /// ends with a recognized GZIP extension. + /// Opens a file for writing. Transparently handles encoding data in gzip and zstd formats. pub fn new_writer

(&self, p: &P) -> Result>> where P: AsRef, @@ -115,6 +131,8 @@ impl Io { let file = File::create(p).map_err(FgError::IoError)?; let write: Box = if Io::is_gzip_path(p) { Box::new(GzEncoder::new(file, self.compression)) + } else if Io::is_zstd_path(p) { + Box::new(Encoder::new(file, 0).unwrap().auto_finish()) } else { Box::new(file) }; @@ -313,6 +331,27 @@ mod tests { assert_ne!(text.metadata().unwrap().len(), gzipped.metadata().unwrap().len()); } + #[test] + fn test_reading_and_writing_zstd_files() { + let lines = vec!["foo", "bar", "baz"]; + let tempdir = TempDir::new().unwrap(); + let text = tempdir.path().join("text.txt"); + let zstd_compressed = tempdir.path().join("zstd_compressed.txt.zst"); + + let io = Io::default(); + io.write_lines(&text, &mut lines.iter()).unwrap(); + io.write_lines(&zstd_compressed, &mut lines.iter()).unwrap(); + + let r1 = io.read_lines(&text).unwrap(); + let r2 = io.read_lines(&zstd_compressed).unwrap(); + + assert_eq!(r1, lines); + assert_eq!(r2, lines); + + // Also check that we actually wrote zstd encoded data to the zstd file! + assert_ne!(text.metadata().unwrap().len(), zstd_compressed.metadata().unwrap().len()); + } + #[test] fn test_reading_and_writing_empty_delim_file() { let recs: Vec = vec![]; From 3344b984fa3f6d924a45d262099d69fd2a9ca670 Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Wed, 16 Aug 2023 15:08:24 -0400 Subject: [PATCH 2/6] Update rust-toolchain version --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 76b44e4..7f9aa4d 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.58.1" +channel = "1.71.1" components = ["rustfmt", "clippy"] From 6dd4d2d793411b208e9d15f8f9e20a94cb26d32d Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Wed, 16 Aug 2023 22:48:56 -0400 Subject: [PATCH 3/6] Better module names, zstd_path checking assertions, and a correction. --- src/io/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 68a67c8..34acec0 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -51,7 +51,7 @@ use flate2::bufread::MultiGzDecoder; use flate2::write::GzEncoder; use flate2::Compression; use serde::{de::DeserializeOwned, Serialize}; -use zstd::stream::{Decoder, Encoder}; +use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder}; /// The set of file extensions to treat as GZIPPED const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; @@ -117,7 +117,7 @@ impl Io { if Self::is_gzip_path(p) { Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf)))) } else if Self::is_zstd_path(p) { - Ok(Box::new(BufReader::with_capacity(self.buffer_size, Decoder::new(buf).unwrap()))) + Ok(Box::new(BufReader::with_capacity(self.buffer_size, ZstdDecoder::new(buf).unwrap()))) } else { Ok(Box::new(buf)) } @@ -132,7 +132,7 @@ impl Io { let write: Box = if Io::is_gzip_path(p) { Box::new(GzEncoder::new(file, self.compression)) } else if Io::is_zstd_path(p) { - Box::new(Encoder::new(file, 0).unwrap().auto_finish()) + Box::new(ZstdEncoder::new(file, 0).unwrap().auto_finish()) } else { Box::new(file) }; @@ -338,6 +338,9 @@ mod tests { let text = tempdir.path().join("text.txt"); let zstd_compressed = tempdir.path().join("zstd_compressed.txt.zst"); + assert_eq!(Io::is_zstd_path(&text), false); + assert_eq!(Io::is_zstd_path(&zstd_compressed), true); + let io = Io::default(); io.write_lines(&text, &mut lines.iter()).unwrap(); io.write_lines(&zstd_compressed, &mut lines.iter()).unwrap(); @@ -348,7 +351,7 @@ mod tests { assert_eq!(r1, lines); assert_eq!(r2, lines); - // Also check that we actually wrote zstd encoded data to the zstd file! + // Check whether the two files are different assert_ne!(text.metadata().unwrap().len(), zstd_compressed.metadata().unwrap().len()); } From ecfe4b67a35dccf8bbf197ad564c826dc60b5552 Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Thu, 17 Aug 2023 15:19:33 -0400 Subject: [PATCH 4/6] Replace unwrap() with map_err() --- src/io/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 34acec0..fdde115 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -117,7 +117,10 @@ impl Io { if Self::is_gzip_path(p) { Ok(Box::new(BufReader::with_capacity(self.buffer_size, MultiGzDecoder::new(buf)))) } else if Self::is_zstd_path(p) { - Ok(Box::new(BufReader::with_capacity(self.buffer_size, ZstdDecoder::new(buf).unwrap()))) + Ok(Box::new(BufReader::with_capacity( + self.buffer_size, + ZstdDecoder::new(buf).map_err(FgError::IoError)?, + ))) } else { Ok(Box::new(buf)) } @@ -132,7 +135,7 @@ impl Io { let write: Box = if Io::is_gzip_path(p) { Box::new(GzEncoder::new(file, self.compression)) } else if Io::is_zstd_path(p) { - Box::new(ZstdEncoder::new(file, 0).unwrap().auto_finish()) + Box::new(ZstdEncoder::new(file, 0).map_err(FgError::IoError)?.auto_finish()) } else { Box::new(file) }; From 7b35638d58905bd04a1429401dd92cd85f765b81 Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Thu, 17 Aug 2023 15:41:14 -0400 Subject: [PATCH 5/6] Refactoring file extension/path checking code --- src/io/mod.rs | 65 +++++++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index fdde115..697002e 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -53,15 +53,14 @@ use flate2::Compression; use serde::{de::DeserializeOwned, Serialize}; use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder}; -/// The set of file extensions to treat as GZIPPED -const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; - -/// The set of file extensions to treat as ZSTD compressed -const ZSTD_EXTENSIONS: [&str; 1] = ["zst"]; - /// The default buffer size when creating buffered readers/writers const BUFFER_SIZE: usize = 64 * 1024; +/// The set of file extensions to treat as FASTQ, GZIPPED, or ZSTD +const FASTQ_EXTENSIONS: [&str; 2] = ["fastq", "fq"]; +const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; +const ZSTD_EXTENSIONS: [&str; 1] = ["zst"]; + /// Unit-struct that contains associated functions for reading and writing Structs to/from /// unstructured files. pub struct Io { @@ -82,30 +81,6 @@ impl Io { Io { compression: flate2::Compression::new(compression), buffer_size } } - /// Returns true if the path ends with a recognized GZIP file extension - fn is_gzip_path>(p: &P) -> bool { - if let Some(ext) = p.as_ref().extension() { - match ext.to_str() { - Some(x) => GZIP_EXTENSIONS.contains(&x), - None => false, - } - } else { - false - } - } - - /// Returns true if the path ends with a recognized ZSTD file extension - fn is_zstd_path>(p: &P) -> bool { - if let Some(ext) = p.as_ref().extension() { - match ext.to_str() { - Some(x) => ZSTD_EXTENSIONS.contains(&x), - None => false, - } - } else { - false - } - } - /// Opens a file for reading. Transparently handles decoding gzip and zstd files. pub fn new_reader

(&self, p: &P) -> Result> where @@ -171,6 +146,36 @@ impl Io { out.flush().map_err(FgError::IoError) } + + /// Returns true if the path ends with a recognized file extension + fn is_path_with_extension, const N: usize>( + p: &P, + extensions: [&str; N], + ) -> bool { + if let Some(ext) = p.as_ref().extension() { + match ext.to_str() { + Some(x) => extensions.contains(&x), + None => false, + } + } else { + false + } + } + + /// Returns true if the path ends with a recognized FASTQ file extension + pub fn is_fastq_path>(p: &P) -> bool { + Self::is_path_with_extension(p, FASTQ_EXTENSIONS) + } + + /// Returns true if the path ends with a recognized GZIP file extension + pub fn is_gzip_path>(p: &P) -> bool { + Self::is_path_with_extension(p, GZIP_EXTENSIONS) + } + + /// Returns true if the path ends with a recognized ZSTD file extension + pub fn is_zstd_path>(p: &P) -> bool { + Self::is_path_with_extension(p, ZSTD_EXTENSIONS) + } } /// Unit-struct that contains associated functions for reading and writing Structs to/from From 3be23f4c2f7f2e38e4e6784aece1320200ba80cc Mon Sep 17 00:00:00 2001 From: Can Kockan Date: Thu, 17 Aug 2023 15:51:46 -0400 Subject: [PATCH 6/6] Move the relevant tests too --- src/io/mod.rs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/io/mod.rs b/src/io/mod.rs index 697002e..0e4bb84 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -287,6 +287,7 @@ impl DelimFile { #[cfg(test)] mod tests { use crate::io::{DelimFile, Io}; + use rstest::rstest; use serde::{Deserialize, Serialize}; use tempfile::TempDir; @@ -399,4 +400,51 @@ mod tests { assert_eq!(from_csv, recs); assert_eq!(from_tsv, recs); } + + // ############################################################################################ + // Tests is_gzip_path() + // ############################################################################################ + + #[rstest] + #[case("test_fastq.fq.gz", true)] // .fq.gz is valid gzip + #[case("test_fastq.fq.bgz", true)] // .fq.bgz is valid gzip + #[case("test_fastq.fq.tar", false)] // .fq.tar is invalid gzip + fn test_is_gzip_path(#[case] file_name: &str, #[case] expected: bool) { + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join(file_name); + let result = Io::is_gzip_path(&file_path); + assert_eq!(result, expected); + } + + // ############################################################################################ + // Tests is_zstd_path() + // ############################################################################################ + + #[rstest] + #[case("test_fastq.fq", false)] // .fq is invalid zstd + #[case("test_fastq.fq.gz", false)] // .fq.gz is invalid zstd + #[case("test_fastq.fq.bgz", false)] // .fq.bgz is invalid zstd + #[case("test_fastq.fq.tar", false)] // .fq.tar is invalid zstd + #[case("test_fastq.fq.zst", true)] // .fq.zst is valid zstd + fn test_is_zstd_path(#[case] file_name: &str, #[case] expected: bool) { + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join(file_name); + let result = Io::is_zstd_path(&file_path); + assert_eq!(result, expected); + } + + // ############################################################################################ + // Tests is_fastq_path() + // ############################################################################################ + + #[rstest] + #[case("test_fastq.fq", true)] // .fq is valid fastq + #[case("test_fastq.fastq", true)] // .fastq is valid fastq + #[case("test_fastq.sam", false)] // .sam is invalid fastq + fn test_is_fastq_path(#[case] file_name: &str, #[case] expected: bool) { + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join(file_name); + let result = Io::is_fastq_path(&file_path); + assert_eq!(result, expected); + } }