Skip to content

Commit

Permalink
ARROW-10636: [Rust][Parquet] Remove specialization
Browse files Browse the repository at this point in the history
Remove specialization from parquet-rs. This allows the codebase to be
compiled with stable
  • Loading branch information
GregBowyer committed Dec 12, 2020
1 parent edff65d commit f8f9749
Show file tree
Hide file tree
Showing 15 changed files with 1,035 additions and 806 deletions.
7 changes: 4 additions & 3 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ mod tests {
};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArrayType, Int32Type,
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type,
};
use crate::errors::Result;
use crate::file::properties::WriterProperties;
Expand Down Expand Up @@ -331,10 +332,10 @@ mod tests {
struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
fn gen(len: i32) -> ByteArray {
fn gen(len: i32) -> FixedLenByteArray {
let mut v = vec![0u8; len as usize];
rand::thread_rng().fill_bytes(&mut v);
v.into()
ByteArray::from(v).into()
}
}

Expand Down
8 changes: 4 additions & 4 deletions rust/parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{ByteArray, DataType, Int96};
use crate::data_type::{ByteArray, FixedLenByteArray, DataType, Int96};
// TODO: clean up imports (best done when there are few moving parts)
use arrow::array::{
Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder,
Expand Down Expand Up @@ -57,8 +57,8 @@ impl FixedSizeArrayConverter {
}
}

impl Converter<Vec<Option<ByteArray>>, FixedSizeBinaryArray> for FixedSizeArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<FixedSizeBinaryArray> {
impl Converter<Vec<Option<FixedLenByteArray>>, FixedSizeBinaryArray> for FixedSizeArrayConverter {
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<FixedSizeBinaryArray> {
let mut builder = FixedSizeBinaryBuilder::new(source.len(), self.byte_width);
for v in source {
match v {
Expand Down Expand Up @@ -277,7 +277,7 @@ pub type PrimitiveDictionaryConverter<K, V> = ArrayRefConverter<
pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;
pub type FixedLenBinaryConverter = ArrayRefConverter<
Vec<Option<ByteArray>>,
Vec<Option<FixedLenByteArray>>,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
>;
Expand Down
108 changes: 33 additions & 75 deletions rust/parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
// under the License.

//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, sync::Arc};
use std::{
cmp,
marker::PhantomData,
collections::VecDeque,
convert::TryFrom,
sync::Arc
};

use crate::basic::{Compression, Encoding, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
Expand Down Expand Up @@ -196,6 +202,7 @@ pub struct ColumnWriterImpl<T: DataType> {
def_levels_sink: Vec<i16>,
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
}

impl<T: DataType> ColumnWriterImpl<T> {
Expand All @@ -209,7 +216,7 @@ impl<T: DataType> ColumnWriterImpl<T> {

// Optionally set dictionary encoder.
let dict_encoder = if props.dictionary_enabled(descr.path())
&& Self::has_dictionary_support(&props)
&& has_dictionary_support(T::get_physical_type(), &props)
{
Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new())))
} else {
Expand All @@ -224,7 +231,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
descr.clone(),
props
.encoding(descr.path())
.unwrap_or_else(|| Self::fallback_encoding(&props)),
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)),
Arc::new(MemTracker::new()),
)
.unwrap();
Expand Down Expand Up @@ -259,6 +266,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
}
}

Expand Down Expand Up @@ -955,81 +963,31 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Trait to define default encoding for types, including whether or not the type
/// supports dictionary encoding.
trait EncodingWriteSupport {
/// Returns encoding for a column when no other encoding is provided in writer
/// properties.
fn fallback_encoding(props: &WriterProperties) -> Encoding;

/// Returns true if dictionary is supported for column writer, false otherwise.
fn has_dictionary_support(props: &WriterProperties) -> bool;
}

// Basic implementation, always falls back to PLAIN and supports dictionary.
impl<T: DataType> EncodingWriteSupport for ColumnWriterImpl<T> {
default fn fallback_encoding(_props: &WriterProperties) -> Encoding {
Encoding::PLAIN
}

default fn has_dictionary_support(_props: &WriterProperties) -> bool {
true
}
}

impl EncodingWriteSupport for ColumnWriterImpl<BoolType> {
fn fallback_encoding(props: &WriterProperties) -> Encoding {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::RLE,
}
}

// Boolean column does not support dictionary encoding and should fall back to
// whatever fallback encoding is defined.
fn has_dictionary_support(_props: &WriterProperties) -> bool {
false
}
}

impl EncodingWriteSupport for ColumnWriterImpl<Int32Type> {
fn fallback_encoding(props: &WriterProperties) -> Encoding {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::DELTA_BINARY_PACKED,
}
}
}

impl EncodingWriteSupport for ColumnWriterImpl<Int64Type> {
fn fallback_encoding(props: &WriterProperties) -> Encoding {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::DELTA_BINARY_PACKED,
}
/// Returns encoding for a column when no other encoding is provided in writer properties.
fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
match (kind, props.writer_version()) {
(Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
(Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
(Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
(Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
(Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
_ => Encoding::PLAIN,
}
}

impl EncodingWriteSupport for ColumnWriterImpl<ByteArrayType> {
fn fallback_encoding(props: &WriterProperties) -> Encoding {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
}
}
}

impl EncodingWriteSupport for ColumnWriterImpl<FixedLenByteArrayType> {
fn fallback_encoding(props: &WriterProperties) -> Encoding {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
}
}

fn has_dictionary_support(props: &WriterProperties) -> bool {
match props.writer_version() {
// Dictionary encoding was not enabled in PARQUET 1.0
WriterVersion::PARQUET_1_0 => false,
WriterVersion::PARQUET_2_0 => true,
}
/// Returns true if dictionary is supported for column writer, false otherwise.
fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
match (kind, props.writer_version()) {
// Booleans do not support dict encoding and should use a fallback encoding.
(Type::BOOLEAN, _) => false,
// Dictionary encoding was not enabled in PARQUET 1.0
(Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
(Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
_ => true,
}
}

Expand Down Expand Up @@ -1398,28 +1356,28 @@ mod tests {
check_encoding_write_support::<FixedLenByteArrayType>(
WriterVersion::PARQUET_1_0,
true,
&[ByteArray::from(vec![1u8])],
&[ByteArray::from(vec![1u8]).into()],
None,
&[Encoding::PLAIN, Encoding::RLE],
);
check_encoding_write_support::<FixedLenByteArrayType>(
WriterVersion::PARQUET_1_0,
false,
&[ByteArray::from(vec![1u8])],
&[ByteArray::from(vec![1u8]).into()],
None,
&[Encoding::PLAIN, Encoding::RLE],
);
check_encoding_write_support::<FixedLenByteArrayType>(
WriterVersion::PARQUET_2_0,
true,
&[ByteArray::from(vec![1u8])],
&[ByteArray::from(vec![1u8]).into()],
Some(0),
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE],
);
check_encoding_write_support::<FixedLenByteArrayType>(
WriterVersion::PARQUET_2_0,
false,
&[ByteArray::from(vec![1u8])],
&[ByteArray::from(vec![1u8]).into()],
None,
&[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE],
);
Expand Down
Loading

0 comments on commit f8f9749

Please sign in to comment.