diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ccc5ede44459..147470a4f18d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -115,7 +115,7 @@ impl<W: Write> Debug for ArrowWriter<W> { } } -impl<W: Write> ArrowWriter<W> { +impl<W: Write + Send> ArrowWriter<W> { /// Try to create a new Arrow writer /// /// The writer will fail if: @@ -271,7 +271,7 @@ impl<W: Write> ArrowWriter<W> { } } -impl<W: Write> RecordBatchWriter for ArrowWriter<W> { +impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> { fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { self.write(batch).map_err(|e| e.into()) } @@ -282,7 +282,7 @@ impl<W: Write> RecordBatchWriter for ArrowWriter<W> { } } -fn write_leaves<W: Write>( +fn write_leaves<W: Write + Send>( row_group_writer: &mut SerializedRowGroupWriter<'_, W>, arrays: &[ArrayRef], levels: &mut [Vec<LevelInfo>], diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index bd3568d13cee..f854e5caca80 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -248,7 +248,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send { /// /// It is reasonable to assume that all pages will be written in the correct order, e.g. /// dictionary page followed by data pages, or a set of data pages, etc. -pub trait PageWriter { +pub trait PageWriter: Send { /// Writes a page into the output stream/sink. /// Returns `PageWriteSpec` that contains information about written page metrics, /// including number of bytes, size, number of values, offset, etc. diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 51e2614993e1..d311f4cc9534 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2174,6 +2174,12 @@ mod tests { ); } + #[test] + fn test_send() { + fn test<T: Send>() {} + test::<ColumnWriterImpl<Int32Type>>(); + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index b7e30c4ecf08..3088f332183b 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -40,7 +40,7 @@ mod dict_encoder; /// /// Currently this allocates internal buffers for the encoded values. After done putting /// values, caller should call `flush_buffer()` to get an immutable buffer pointer. -pub trait Encoder<T: DataType> { +pub trait Encoder<T: DataType>: Send { /// Encodes data from `values`. fn put(&mut self, values: &[T::T]) -> Result<()>; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 93e8319b0c3e..bced9135121a 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -160,7 +160,7 @@ impl<W: Write> Debug for SerializedFileWriter<W> { } } -impl<W: Write> SerializedFileWriter<W> { +impl<W: Write + Send> SerializedFileWriter<W> { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> { let mut buf = TrackedWrite::new(buf); @@ -406,7 +406,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { on_close: Option<OnCloseRowGroup<'a>>, } -impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { +impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { /// Creates a new `SerializedRowGroupWriter` with: /// /// - `schema_descr` - the schema to write @@ -700,7 +700,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { } } -impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { +impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> { let uncompressed_size = page.uncompressed_size(); let compressed_size = page.compressed_size(); @@ -1336,7 +1336,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From<W> + 'static, { test_roundtrip::<W, R, Int32Type, _>( @@ -1356,7 +1356,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From<W> + 'static, D: DataType, F: Fn(Row) -> D::T, diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index fe803a7ff4ef..62099051f513 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -21,7 +21,7 @@ use super::super::errors::ParquetError; use super::super::file::writer::SerializedRowGroupWriter; pub trait RecordWriter<T> { - fn write_to_row_group<W: std::io::Write>( + fn write_to_row_group<W: std::io::Write + Send>( &self, row_group_writer: &mut SerializedRowGroupWriter<W>, ) -> Result<(), ParquetError>; diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 6525513cbaa1..c6b5237b85ff 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -97,7 +97,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group<W: ::std::io::Write>( + fn write_to_row_group<W: ::std::io::Write + Send>( &self, row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> ) -> Result<(), ::parquet::errors::ParquetError> { diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 2aa174974aba..82b99e214786 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -56,8 +56,7 @@ mod tests { use std::{env, fs, io::Write, sync::Arc}; use parquet::{ - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, + file::writer::SerializedFileWriter, record::RecordWriter, schema::parser::parse_message_type, };