diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 24dae4f20d64..77f9598b23fe 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -104,12 +104,12 @@ impl<'a> ByteArrayWriter<'a> { /// Returns a new [`ByteArrayWriter`] pub fn new( descr: ColumnDescPtr, - props: &'a WriterPropertiesPtr, + props: WriterPropertiesPtr, page_writer: Box, on_close: OnCloseColumnChunk<'a>, ) -> Result { Ok(Self { - writer: GenericColumnWriter::new(descr, props.clone(), page_writer), + writer: GenericColumnWriter::new(descr, props, page_writer), on_close: Some(on_close), }) } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 9923970bedde..c7a7b19acf30 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -21,7 +21,7 @@ use crate::bloom_filter::Sbbf; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; -use std::io::{BufWriter, IoSlice}; +use std::io::{BufWriter, IoSlice, Read}; use std::{io::Write, sync::Arc}; use thrift::protocol::{TCompactOutputProtocol, TSerializable}; @@ -35,6 +35,7 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; +use crate::file::reader::ChunkReader; use crate::file::{ metadata::*, properties::WriterPropertiesPtr, statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC, @@ -423,27 +424,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { } } - /// Returns the next column writer, if available, using the factory function; - /// otherwise returns `None`. - pub(crate) fn next_column_with_factory<'b, F, C>( - &'b mut self, - factory: F, - ) -> Result> - where - F: FnOnce( - ColumnDescPtr, - &'b WriterPropertiesPtr, - Box, - OnCloseColumnChunk<'b>, - ) -> Result, - { - self.assert_previous_writer_closed()?; - - if self.column_index >= self.descr.num_columns() { - return Ok(None); - } - let page_writer = Box::new(SerializedPageWriter::new(self.buf)); + /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any + fn next_column_desc(&mut self) -> Option { + let ret = self.descr.columns().get(self.column_index)?.clone(); + self.column_index += 1; + Some(ret) + } + /// Returns [`OnCloseColumnChunk`] for the next writer + fn get_on_close(&mut self) -> (&mut TrackedWrite, OnCloseColumnChunk<'_>) { let total_bytes_written = &mut self.total_bytes_written; let total_uncompressed_bytes = &mut self.total_uncompressed_bytes; let total_rows_written = &mut self.total_rows_written; @@ -475,16 +464,33 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { Ok(()) }; + (self.buf, Box::new(on_close)) + } - let column = self.descr.column(self.column_index); - self.column_index += 1; - - Ok(Some(factory( - column, - &self.props, - page_writer, - Box::new(on_close), - )?)) + /// Returns the next column writer, if available, using the factory function; + /// otherwise returns `None`. + pub(crate) fn next_column_with_factory<'b, F, C>( + &'b mut self, + factory: F, + ) -> Result> + where + F: FnOnce( + ColumnDescPtr, + WriterPropertiesPtr, + Box, + OnCloseColumnChunk<'b>, + ) -> Result, + { + self.assert_previous_writer_closed()?; + Ok(match self.next_column_desc() { + Some(column) => { + let props = self.props.clone(); + let (buf, on_close) = self.get_on_close(); + let page_writer = Box::new(SerializedPageWriter::new(buf)); + Some(factory(column, props, page_writer, Box::new(on_close))?) + } + None => None, + }) } /// Returns the next column writer, if available; otherwise returns `None`. @@ -497,6 +503,69 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { }) } + /// Append a column chunk from another source without decoding it + /// + /// This can be used for efficiently concatenating or projecting parquet data, + /// or encoding parquet data to temporary in-memory buffers + pub fn splice_column( + &mut self, + reader: &R, + mut close: ColumnCloseResult, + ) -> Result<()> { + self.assert_previous_writer_closed()?; + let desc = self.next_column_desc().ok_or_else(|| { + general_err!("exhausted columns in SerializedRowGroupWriter") + })?; + + let metadata = close.metadata; + + if metadata.column_descr() != desc.as_ref() { + return Err(general_err!( + "column descriptor mismatch, expected {:?} got {:?}", + desc, + metadata.column_descr() + )); + } + + let src_dictionary_offset = metadata.dictionary_page_offset(); + let src_data_offset = metadata.data_page_offset(); + let src_offset = src_dictionary_offset.unwrap_or(src_data_offset); + let src_length = metadata.compressed_size(); + + let write_offset = self.buf.bytes_written(); + let mut read = reader.get_read(src_offset as _)?.take(src_length as _); + let write_length = std::io::copy(&mut read, &mut self.buf)?; + + if src_length as u64 != write_length { + return Err(general_err!( + "Failed to splice column data, expected {read_length} got {write_length}" + )); + } + + let file_offset = self.buf.bytes_written() as i64; + + let map_offset = |x| x - src_offset + write_offset as i64; + let mut builder = + ColumnChunkMetaData::builder(metadata.column_descr_ptr().clone()) + .set_compression(metadata.compression()) + .set_encodings(metadata.encodings().clone()) + .set_file_offset(file_offset) + .set_total_compressed_size(metadata.compressed_size()) + .set_total_uncompressed_size(metadata.uncompressed_size()) + .set_num_values(metadata.num_values()) + .set_data_page_offset(map_offset(src_data_offset)) + .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); + + if let Some(statistics) = metadata.statistics() { + builder = builder.set_statistics(statistics.clone()) + } + close.metadata = builder.build()?; + + SerializedPageWriter::new(&mut self.buf).write_metadata(&metadata)?; + let (_, on_close) = self.get_on_close(); + on_close(close) + } + /// Closes this row group writer and returns row group metadata. pub fn close(mut self) -> Result { if self.row_group_metadata.is_none() { @@ -516,9 +585,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { if let Some(on_close) = self.on_close.take() { on_close( metadata, - self.bloom_filters.clone(), - self.column_indexes.clone(), - self.offset_indexes.clone(), + self.bloom_filters, + self.column_indexes, + self.offset_indexes, )? } } @@ -720,6 +789,7 @@ mod tests { use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; use crate::column::page::PageReader; + use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::{BoolType, Int32Type}; use crate::file::reader::ChunkReader; @@ -1540,4 +1610,83 @@ mod tests { assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref())); assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref())); } + + #[test] + fn test_spliced_write() { + let message_type = " + message test_schema { + REQUIRED INT32 i32 (INTEGER(32,true)); + REQUIRED INT32 u32 (INTEGER(32,false)); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let props = Arc::new(WriterProperties::builder().build()); + + let mut file = Vec::with_capacity(1024); + let mut file_writer = + SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap(); + + let columns = file_writer.descr.columns(); + let mut column_state: Vec<(_, Option)> = columns + .iter() + .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None)) + .collect(); + + let mut column_state_slice = column_state.as_mut_slice(); + let mut column_writers = Vec::with_capacity(columns.len()); + for c in columns { + let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap(); + column_state_slice = tail; + + let page_writer = Box::new(SerializedPageWriter::new(buf)); + let col_writer = get_column_writer(c.clone(), props.clone(), page_writer); + column_writers.push(SerializedColumnWriter::new( + col_writer, + Some(Box::new(|on_close| { + *out = Some(on_close); + Ok(()) + })), + )); + } + + let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]]; + + // Interleaved writing to the column writers + for (writer, batch) in column_writers.iter_mut().zip(column_data) { + let writer = writer.typed::(); + writer.write_batch(&batch, None, None).unwrap(); + } + + // Close the column writers + for writer in column_writers { + writer.close().unwrap() + } + + // Splice column data into a row group + let mut row_group_writer = file_writer.next_row_group().unwrap(); + for (write, close) in column_state { + let buf = Bytes::from(write.into_inner().unwrap()); + row_group_writer + .splice_column(&buf, close.unwrap()) + .unwrap(); + } + row_group_writer.close().unwrap(); + file_writer.close().unwrap(); + + // Check data was written correctly + let file = Bytes::from(file); + let reader = SerializedFileReader::new(file).unwrap(); + let row_group = reader.get_row_group(0).unwrap(); + + let mut out = [0; 4]; + let mut c1 = + get_typed_column_reader::(row_group.get_column_reader(0).unwrap()); + c1.read_batch(4, None, None, &mut out).unwrap(); + assert_eq!(out, column_data[0]); + + let mut c2 = + get_typed_column_reader::(row_group.get_column_reader(1).unwrap()); + c2.read_batch(4, None, None, &mut out).unwrap(); + assert_eq!(out, column_data[1]); + } }