Skip to content

Commit

Permalink
rust: implement ZstdCompressor.multi_compress_to_buffer()
Browse files Browse the repository at this point in the history
The implementation is horribly inefficient compared to the C version
due to excessive memory copying. But it should be functionally correct
and it is multi-threaded, courtesy of the rayon crate.
  • Loading branch information
indygreg committed Feb 16, 2021
1 parent 072f9f5 commit 4b3ae8e
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 10 deletions.
99 changes: 99 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "rust-ext/src/lib.rs"
[dependencies]
libc = "0.2"
num_cpus = "1"
rayon = "1.5"

[dependencies.zstd-safe]
version = "3.0.0+zstd.1.4.8"
Expand Down
29 changes: 20 additions & 9 deletions rust-ext/src/buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use {

#[repr(C)]
#[derive(Clone, Debug)]
struct BufferSegment {
offset: u64,
length: u64,
pub(crate) struct BufferSegment {
pub offset: u64,
pub length: u64,
}

#[pyclass(module = "zstandard.backend_rust", name = "BufferSegment")]
Expand All @@ -37,7 +37,7 @@ pub struct ZstdBufferSegment {
}

impl ZstdBufferSegment {
fn as_slice(&self) -> &[u8] {
pub fn as_slice(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(self.buffer.buf_ptr().add(self.offset) as *const _, self.len)
}
Expand Down Expand Up @@ -125,8 +125,8 @@ impl PyBufferProtocol for ZstdBufferSegments {
#[pyclass(module = "zstandard.backend_rust", name = "BufferWithSegments")]
pub struct ZstdBufferWithSegments {
source: PyObject,
buffer: PyBuffer<u8>,
segments: Vec<BufferSegment>,
pub(crate) buffer: PyBuffer<u8>,
pub(crate) segments: Vec<BufferSegment>,
}

impl ZstdBufferWithSegments {
Expand All @@ -135,12 +135,23 @@ impl ZstdBufferWithSegments {
std::slice::from_raw_parts(self.buffer.buf_ptr() as *const _, self.buffer.len_bytes())
}
}

pub fn get_segment_slice<'p>(&self, _py: Python<'p>, i: usize) -> &'p [u8] {
let segment = &self.segments[i];

unsafe {
std::slice::from_raw_parts(
self.buffer.buf_ptr().add(segment.offset as usize) as *const _,
segment.length as usize,
)
}
}
}

#[pymethods]
impl ZstdBufferWithSegments {
#[new]
fn new(py: Python, data: &PyAny, segments: PyBuffer<u8>) -> PyResult<Self> {
pub fn new(py: Python, data: &PyAny, segments: PyBuffer<u8>) -> PyResult<Self> {
let data_buffer = PyBuffer::get(data)?;

if segments.len_bytes() % std::mem::size_of::<BufferSegment>() != 0 {
Expand Down Expand Up @@ -258,15 +269,15 @@ impl PyBufferProtocol for ZstdBufferWithSegments {
)]
pub struct ZstdBufferWithSegmentsCollection {
// Py<ZstdBufferWithSegments>.
buffers: Vec<PyObject>,
pub(crate) buffers: Vec<PyObject>,
first_elements: Vec<usize>,
}

#[pymethods]
impl ZstdBufferWithSegmentsCollection {
#[new]
#[args(py_args = "*")]
fn new(py: Python, py_args: &PyTuple) -> PyResult<Self> {
pub fn new(py: Python, py_args: &PyTuple) -> PyResult<Self> {
if py_args.is_empty() {
return Err(PyValueError::new_err("must pass at least 1 argument"));
}
Expand Down
12 changes: 12 additions & 0 deletions rust-ext/src/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

use {
crate::{
buffers::ZstdBufferWithSegmentsCollection,
compression_chunker::ZstdCompressionChunker,
compression_dict::ZstdCompressionDict,
compression_parameters::{CCtxParams, ZstdCompressionParameters},
compression_reader::ZstdCompressionReader,
compression_writer::ZstdCompressionWriter,
compressionobj::ZstdCompressionObj,
compressor_iterator::ZstdCompressorIterator,
compressor_multi::multi_compress_to_buffer,
zstd_safe::CCtx,
ZstdError,
},
Expand Down Expand Up @@ -330,6 +332,16 @@ impl ZstdCompressor {
Ok((total_read, total_write))
}

#[args(data, threads = "0")]
fn multi_compress_to_buffer(
&self,
py: Python,
data: &PyAny,
threads: isize,
) -> PyResult<ZstdBufferWithSegmentsCollection> {
multi_compress_to_buffer(py, &self.params, &self.dict, data, threads)
}

#[args(reader, size = "None", read_size = "None", write_size = "None")]
fn read_to_iter(
&self,
Expand Down
Loading

0 comments on commit 4b3ae8e

Please sign in to comment.