diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 08cfc7ea3ebf..616968bf6407 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -117,7 +117,7 @@ impl Debug for ArrowWriter { } } -impl ArrowWriter { +impl ArrowWriter { /// Try to create a new Arrow writer /// /// The writer will fail if: @@ -273,7 +273,7 @@ impl ArrowWriter { } } -impl RecordBatchWriter for ArrowWriter { +impl RecordBatchWriter for ArrowWriter { fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { self.write(batch).map_err(|e| e.into()) } @@ -284,7 +284,7 @@ impl RecordBatchWriter for ArrowWriter { } } -fn write_leaves( +fn write_leaves( row_group_writer: &mut SerializedRowGroupWriter<'_, W>, arrays: &[ArrayRef], levels: &mut [Vec], diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index bd3568d13cee..f854e5caca80 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -248,7 +248,7 @@ pub trait PageReader: Iterator> + Send { /// /// It is reasonable to assume that all pages will be written in the correct order, e.g. /// dictionary page followed by data pages, or a set of data pages, etc. -pub trait PageWriter { +pub trait PageWriter: Send { /// Writes a page into the output stream/sink. /// Returns `PageWriteSpec` that contains information about written page metrics, /// including number of bytes, size, number of values, offset, etc. diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 137893092405..c203fc02281a 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2174,6 +2174,12 @@ mod tests { ); } + #[test] + fn test_send() { + fn test() {} + test::>(); + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index b7e30c4ecf08..3088f332183b 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -40,7 +40,7 @@ mod dict_encoder; /// /// Currently this allocates internal buffers for the encoded values. After done putting /// values, caller should call `flush_buffer()` to get an immutable buffer pointer. -pub trait Encoder { +pub trait Encoder: Send { /// Encodes data from `values`. fn put(&mut self, values: &[T::T]) -> Result<()>; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 4b1c4bad92e1..1f71f9c67822 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -160,7 +160,7 @@ impl Debug for SerializedFileWriter { } } -impl SerializedFileWriter { +impl SerializedFileWriter { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result { let mut buf = TrackedWrite::new(buf); @@ -406,7 +406,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { on_close: Option>, } -impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { +impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { /// Creates a new `SerializedRowGroupWriter` with: /// /// - `schema_descr` - the schema to write @@ -700,7 +700,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { } } -impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { +impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result { let uncompressed_size = page.uncompressed_size(); let compressed_size = page.compressed_size(); @@ -1336,7 +1336,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From + 'static, { test_roundtrip::( @@ -1356,7 +1356,7 @@ mod tests { compression: Compression, ) -> crate::format::FileMetaData where - W: Write, + W: Write + Send, R: ChunkReader + From + 'static, D: DataType, F: Fn(Row) -> D::T, diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index fe803a7ff4ef..62099051f513 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -21,7 +21,7 @@ use super::super::errors::ParquetError; use super::super::file::writer::SerializedRowGroupWriter; pub trait RecordWriter { - fn write_to_row_group( + fn write_to_row_group( &self, row_group_writer: &mut SerializedRowGroupWriter, ) -> Result<(), ParquetError>; diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index a09b3b65233b..0f875401f0e9 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -96,7 +96,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group( + fn write_to_row_group( &self, row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> ) -> Result<(), ::parquet::errors::ParquetError> { diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index d2cf9efb1db6..f4f8be1e0d8c 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -56,8 +56,7 @@ mod tests { use std::{env, fs, io::Write, sync::Arc}; use parquet::{ - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, + file::writer::SerializedFileWriter, record::RecordWriter, schema::parser::parse_message_type, };