Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GenericColumnWriter Send #4287

Merged
merged 1 commit into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<W: Write> Debug for ArrowWriter<W> {
}
}

impl<W: Write> ArrowWriter<W> {
impl<W: Write + Send> ArrowWriter<W> {
/// Try to create a new Arrow writer
///
/// The writer will fail if:
Expand Down Expand Up @@ -273,7 +273,7 @@ impl<W: Write> ArrowWriter<W> {
}
}

impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
self.write(batch).map_err(|e| e.into())
}
Expand All @@ -284,7 +284,7 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> {
}
}

fn write_leaves<W: Write>(
fn write_leaves<W: Write + Send>(
row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
arrays: &[ArrayRef],
levels: &mut [Vec<LevelInfo>],
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
///
/// It is reasonable to assume that all pages will be written in the correct order, e.g.
/// dictionary page followed by data pages, or a set of data pages, etc.
pub trait PageWriter {
pub trait PageWriter: Send {
/// Writes a page into the output stream/sink.
/// Returns `PageWriteSpec` that contains information about written page metrics,
/// including number of bytes, size, number of values, offset, etc.
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,12 @@ mod tests {
);
}

#[test]
fn test_send() {
fn test<T: Send>() {}
test::<ColumnWriterImpl<Int32Type>>();
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/encodings/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod dict_encoder;
///
/// Currently this allocates internal buffers for the encoded values. After done putting
/// values, caller should call `flush_buffer()` to get an immutable buffer pointer.
pub trait Encoder<T: DataType> {
pub trait Encoder<T: DataType>: Send {
/// Encodes data from `values`.
fn put(&mut self, values: &[T::T]) -> Result<()>;

Expand Down
10 changes: 5 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
}
}

impl<W: Write> SerializedFileWriter<W> {
impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
let mut buf = TrackedWrite::new(buf);
Expand Down Expand Up @@ -406,7 +406,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
on_close: Option<OnCloseRowGroup<'a>>,
}

impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
/// Creates a new `SerializedRowGroupWriter` with:
///
/// - `schema_descr` - the schema to write
Expand Down Expand Up @@ -700,7 +700,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
}
}

impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
Expand Down Expand Up @@ -1336,7 +1336,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
W: Write,
W: Write + Send,
R: ChunkReader + From<W> + 'static,
{
test_roundtrip::<W, R, Int32Type, _>(
Expand All @@ -1356,7 +1356,7 @@ mod tests {
compression: Compression,
) -> crate::format::FileMetaData
where
W: Write,
W: Write + Send,
R: ChunkReader + From<W> + 'static,
D: DataType,
F: Fn(Row) -> D::T,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/record/record_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::super::errors::ParquetError;
use super::super::file::writer::SerializedRowGroupWriter;

pub trait RecordWriter<T> {
fn write_to_row_group<W: std::io::Write>(
fn write_to_row_group<W: std::io::Write + Send>(
&self,
row_group_writer: &mut SerializedRowGroupWriter<W>,
) -> Result<(), ParquetError>;
Expand Down
2 changes: 1 addition & 1 deletion parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke

(quote! {
impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
fn write_to_row_group<W: ::std::io::Write>(
fn write_to_row_group<W: ::std::io::Write + Send>(
&self,
row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), ::parquet::errors::ParquetError> {
Expand Down
3 changes: 1 addition & 2 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ mod tests {
use std::{env, fs, io::Write, sync::Arc};

use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
record::RecordWriter,
file::writer::SerializedFileWriter, record::RecordWriter,
schema::parser::parse_message_type,
};

Expand Down