Skip to content

Commit

Permalink
Make GenericColumnWriter Send
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 26, 2023
1 parent 3fd744b commit 031ed98
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 14 deletions.
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<W: Write> Debug for ArrowWriter<W> {
}
}

impl<W: Write> ArrowWriter<W> {
impl<W: Write + Send> ArrowWriter<W> {
/// Try to create a new Arrow writer
///
/// The writer will fail if:
Expand Down Expand Up @@ -273,7 +273,7 @@ impl<W: Write> ArrowWriter<W> {
}
}

impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
self.write(batch).map_err(|e| e.into())
}
Expand All @@ -284,7 +284,7 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
}
}

fn write_leaves<W: Write>(
fn write_leaves<W: Write + Send>(
row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
arrays: &[ArrayRef],
levels: &mut [Vec<LevelInfo>],
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
///
/// It is reasonable to assume that all pages will be written in the correct order, e.g.
/// dictionary page followed by data pages, or a set of data pages, etc.
pub trait PageWriter {
pub trait PageWriter: Send {
/// Writes a page into the output stream/sink.
/// Returns `PageWriteSpec` that contains information about written page metrics,
/// including number of bytes, size, number of values, offset, etc.
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,12 @@ mod tests {
);
}

#[test]
fn test_send() {
fn test<T: Send>() {}
test::<ColumnWriterImpl<Int32Type>>();
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/encodings/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod dict_encoder;
///
/// Currently this allocates internal buffers for the encoded values. After done putting
/// values, caller should call `flush_buffer()` to get an immutable buffer pointer.
pub trait Encoder<T: DataType> {
pub trait Encoder<T: DataType>: Send {
/// Encodes data from `values`.
fn put(&mut self, values: &[T::T]) -> Result<()>;

Expand Down
10 changes: 5 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
}
}

impl<W: Write> SerializedFileWriter<W> {
impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
let mut buf = TrackedWrite::new(buf);
Expand Down Expand Up @@ -406,7 +406,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
on_close: Option<OnCloseRowGroup<'a>>,
}

impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
/// Creates a new `SerializedRowGroupWriter` with:
///
/// - `schema_descr` - the schema to write
Expand Down Expand Up @@ -700,7 +700,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
}
}

impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
Expand Down Expand Up @@ -1336,7 +1336,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
W: Write,
W: Write + Send,
R: ChunkReader + From<W> + 'static,
{
test_roundtrip::<W, R, Int32Type, _>(
Expand All @@ -1356,7 +1356,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
W: Write,
W: Write + Send,
R: ChunkReader + From<W> + 'static,
D: DataType,
F: Fn(Row) -> D::T,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/record/record_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::super::errors::ParquetError;
use super::super::file::writer::SerializedRowGroupWriter;

pub trait RecordWriter<T> {
fn write_to_row_group<W: std::io::Write>(
fn write_to_row_group<W: std::io::Write + Send>(
&self,
row_group_writer: &mut SerializedRowGroupWriter<W>,
) -> Result<(), ParquetError>;
Expand Down
2 changes: 1 addition & 1 deletion parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke

(quote! {
impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
fn write_to_row_group<W: ::std::io::Write>(
fn write_to_row_group<W: ::std::io::Write + Send>(
&self,
row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), ::parquet::errors::ParquetError> {
Expand Down
3 changes: 1 addition & 2 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ mod tests {
use std::{env, fs, io::Write, sync::Arc};

use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
record::RecordWriter,
file::writer::SerializedFileWriter, record::RecordWriter,
schema::parser::parse_message_type,
};

Expand Down

0 comments on commit 031ed98

Please sign in to comment.