Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert parquet metadata back to builders #4265

Merged
merged 1 commit into from
May 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 46 additions & 93 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,78 +365,69 @@ impl RowGroupMetaData {
ordinal: None,
}
}

/// Converts this [`RowGroupMetaData`] into a [`RowGroupMetaDataBuilder`]
pub fn into_builder(self) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder(self)
}
}

/// Builder for row group metadata.
pub struct RowGroupMetaDataBuilder {
columns: Vec<ColumnChunkMetaData>,
schema_descr: SchemaDescPtr,
num_rows: i64,
sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
}
pub struct RowGroupMetaDataBuilder(RowGroupMetaData);

impl RowGroupMetaDataBuilder {
/// Creates new builder from schema descriptor.
fn new(schema_descr: SchemaDescPtr) -> Self {
Self {
Self(RowGroupMetaData {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
num_rows: 0,
sorting_columns: None,
total_byte_size: 0,
}
})
}

/// Sets number of rows in this row group.
pub fn set_num_rows(mut self, value: i64) -> Self {
self.num_rows = value;
self.0.num_rows = value;
self
}

/// Sets the sorting order for columns
pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
self.sorting_columns = value;
self.0.sorting_columns = value;
self
}

/// Sets total size in bytes for this row group.
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.total_byte_size = value;
self.0.total_byte_size = value;
self
}

/// Sets column metadata for this row group.
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.columns = value;
self.0.columns = value;
self
}

/// Builds row group metadata.
pub fn build(self) -> Result<RowGroupMetaData> {
if self.schema_descr.num_columns() != self.columns.len() {
if self.0.schema_descr.num_columns() != self.0.columns.len() {
return Err(general_err!(
"Column length mismatch: {} != {}",
self.schema_descr.num_columns(),
self.columns.len()
self.0.schema_descr.num_columns(),
self.0.columns.len()
));
}

Ok(RowGroupMetaData {
columns: self.columns,
num_rows: self.num_rows,
sorting_columns: self.sorting_columns,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
})
Ok(self.0)
}
}

/// Metadata for a column chunk.
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnChunkMetaData {
column_type: Type,
column_path: ColumnPath,
column_descr: ColumnDescPtr,
encodings: Vec<Encoding>,
file_path: Option<String>,
Expand Down Expand Up @@ -479,12 +470,12 @@ impl ColumnChunkMetaData {

/// Type of this column. Must be primitive.
pub fn column_type(&self) -> Type {
self.column_type
self.column_descr.physical_type()
}

/// Path (or identifier) of this column.
pub fn column_path(&self) -> &ColumnPath {
&self.column_path
self.column_descr.path()
}

/// Descriptor for this column.
Expand Down Expand Up @@ -609,7 +600,6 @@ impl ColumnChunkMetaData {
}
let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
let column_type = Type::try_from(col_metadata.type_)?;
let column_path = ColumnPath::new(col_metadata.path_in_schema);
let encodings = col_metadata
.encodings
.drain(0..)
Expand Down Expand Up @@ -641,8 +631,6 @@ impl ColumnChunkMetaData {
let column_index_length = cc.column_index_length;

let result = ColumnChunkMetaData {
column_type,
column_path,
column_descr,
encodings,
file_path,
Expand Down Expand Up @@ -685,9 +673,9 @@ impl ColumnChunkMetaData {
/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
ColumnMetaData {
type_: self.column_type.into(),
type_: self.column_type().into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: Vec::from(self.column_path.as_ref()),
path_in_schema: self.column_path().as_ref().to_vec(),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
Expand All @@ -704,34 +692,20 @@ impl ColumnChunkMetaData {
bloom_filter_offset: self.bloom_filter_offset,
}
}

/// Converts this [`ColumnChunkMetaData`] into a [`ColumnChunkMetaDataBuilder`]
pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
ColumnChunkMetaDataBuilder(self)
}
}

/// Builder for column chunk metadata.
pub struct ColumnChunkMetaDataBuilder {
column_descr: ColumnDescPtr,
encodings: Vec<Encoding>,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
compression: Compression,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
encoding_stats: Option<Vec<PageEncodingStats>>,
bloom_filter_offset: Option<i64>,
offset_index_offset: Option<i64>,
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
}
pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);

impl ColumnChunkMetaDataBuilder {
/// Creates new column chunk metadata builder.
fn new(column_descr: ColumnDescPtr) -> Self {
Self {
Self(ColumnChunkMetaData {
column_descr,
encodings: Vec::new(),
file_path: None,
Expand All @@ -750,135 +724,114 @@ impl ColumnChunkMetaDataBuilder {
offset_index_length: None,
column_index_offset: None,
column_index_length: None,
}
})
}

/// Sets list of encodings for this column chunk.
pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
self.encodings = encodings;
self.0.encodings = encodings;
self
}

/// Sets optional file path for this column chunk.
pub fn set_file_path(mut self, value: String) -> Self {
self.file_path = Some(value);
self.0.file_path = Some(value);
self
}

/// Sets file offset in bytes.
pub fn set_file_offset(mut self, value: i64) -> Self {
self.file_offset = value;
self.0.file_offset = value;
self
}

/// Sets number of values.
pub fn set_num_values(mut self, value: i64) -> Self {
self.num_values = value;
self.0.num_values = value;
self
}

/// Sets compression.
pub fn set_compression(mut self, value: Compression) -> Self {
self.compression = value;
self.0.compression = value;
self
}

/// Sets total compressed size in bytes.
pub fn set_total_compressed_size(mut self, value: i64) -> Self {
self.total_compressed_size = value;
self.0.total_compressed_size = value;
self
}

/// Sets total uncompressed size in bytes.
pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
self.total_uncompressed_size = value;
self.0.total_uncompressed_size = value;
self
}

/// Sets data page offset in bytes.
pub fn set_data_page_offset(mut self, value: i64) -> Self {
self.data_page_offset = value;
self.0.data_page_offset = value;
self
}

/// Sets optional dictionary page ofset in bytes.
pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
self.dictionary_page_offset = value;
self.0.dictionary_page_offset = value;
self
}

/// Sets optional index page offset in bytes.
pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
self.index_page_offset = value;
self.0.index_page_offset = value;
self
}

/// Sets statistics for this column chunk.
pub fn set_statistics(mut self, value: Statistics) -> Self {
self.statistics = Some(value);
self.0.statistics = Some(value);
self
}

/// Sets page encoding stats for this column chunk.
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
self.encoding_stats = Some(value);
self.0.encoding_stats = Some(value);
self
}

/// Sets optional bloom filter offset in bytes.
pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
self.bloom_filter_offset = value;
self.0.bloom_filter_offset = value;
self
}

/// Sets optional offset index offset in bytes.
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
self.offset_index_offset = value;
self.0.offset_index_offset = value;
self
}

/// Sets optional offset index length in bytes.
pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
self.offset_index_length = value;
self.0.offset_index_length = value;
self
}

/// Sets optional column index offset in bytes.
pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
self.column_index_offset = value;
self.0.column_index_offset = value;
self
}

/// Sets optional column index length in bytes.
pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
self.column_index_length = value;
self.0.column_index_length = value;
self
}

/// Builds column chunk metadata.
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(ColumnChunkMetaData {
column_type: self.column_descr.physical_type(),
column_path: self.column_descr.path().clone(),
column_descr: self.column_descr,
encodings: self.encodings,
file_path: self.file_path,
file_offset: self.file_offset,
num_values: self.num_values,
compression: self.compression,
total_compressed_size: self.total_compressed_size,
total_uncompressed_size: self.total_uncompressed_size,
data_page_offset: self.data_page_offset,
index_page_offset: self.index_page_offset,
dictionary_page_offset: self.dictionary_page_offset,
statistics: self.statistics,
encoding_stats: self.encoding_stats,
bloom_filter_offset: self.bloom_filter_offset,
offset_index_offset: self.offset_index_offset,
offset_index_length: self.offset_index_length,
column_index_offset: self.column_index_offset,
column_index_length: self.column_index_length,
})
Ok(self.0)
}
}

Expand Down