Skip to content

Commit

Permalink
Reverted back to logic to handle lance field id mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed May 23, 2024
1 parent 7e81cad commit 68cc3d8
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 38 deletions.
13 changes: 8 additions & 5 deletions rust/lance-encoding-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_schema::DataType;
use lance_encoding::encoder::{CoreFieldEncodingStrategy, FieldEncodingStrategy};
use lance_encoding::encoder::{
ColumnIndexSequence, CoreFieldEncodingStrategy, FieldEncodingStrategy,
};
use zone::ZoneMapsFieldEncoder;

pub mod format;
Expand All @@ -20,12 +22,13 @@ impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
data_type: &arrow_schema::DataType,
column_index: u32,
field: &lance_core::datatypes::Field,
column_index: &mut ColumnIndexSequence,
cache_bytes_per_column: u64,
keep_original_array: bool,
config: &std::collections::HashMap<String, String>,
) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
let data_type = field.data_type();
if data_type.is_primitive()
|| matches!(
data_type,
Expand All @@ -35,7 +38,7 @@ impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
let inner_encoder = self.core.create_field_encoder(
// Don't collect stats on inner string fields
&self.core,
data_type,
field,
column_index,
cache_bytes_per_column,
keep_original_array,
Expand All @@ -49,7 +52,7 @@ impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
} else {
self.core.create_field_encoder(
encoding_strategy_root,
data_type,
field,
column_index,
cache_bytes_per_column,
keep_original_array,
Expand Down
15 changes: 12 additions & 3 deletions rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,25 @@ mod tests {
use arrow_array::types::Int32Type;
use arrow_schema::DataType;
use lance_datagen::{BatchCount, RowCount};
use lance_encoding::encoder::{CoreFieldEncodingStrategy, FieldEncoder, FieldEncodingStrategy};
use lance_encoding::encoder::{
ColumnIndexSequence, CoreFieldEncodingStrategy, FieldEncoder, FieldEncodingStrategy,
};

#[tokio::test]
async fn test_basic_stats() {
let encoding_strategy = CoreFieldEncodingStrategy::default();
let mut col_idx_seq = ColumnIndexSequence::new();
let mock_field = lance_core::datatypes::Field::try_from(arrow_schema::Field::new(
"foo",
DataType::Int32,
false,
))
.unwrap();
let inner = encoding_strategy
.create_field_encoder(
&encoding_strategy,
&DataType::Int32,
0,
&mock_field,
&mut col_idx_seq,
4096,
true,
&HashMap::new(),
Expand Down
80 changes: 55 additions & 25 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::FutureExt;
use lance_core::datatypes::Schema;
use lance_core::datatypes::{Field, Schema};
use lance_core::Result;

use crate::{
Expand Down Expand Up @@ -213,6 +213,33 @@ impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
}
}

/// Keeps track of the current column index and makes a mapping
/// from field id to column index
pub struct ColumnIndexSequence {
current_index: u32,
mapping: Vec<(i32, i32)>,
}

impl ColumnIndexSequence {
pub fn new() -> Self {
Self {
current_index: 0,
mapping: Vec::new(),
}
}

pub fn next_column_index(&mut self, field_id: i32) -> u32 {
let idx = self.current_index;
self.current_index += 1;
self.mapping.push((field_id, idx as i32));
idx
}

pub fn skip(&mut self) {
self.current_index += 1;
}
}

/// A trait to pick which kind of field encoding to use for a field
///
/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
Expand All @@ -233,8 +260,8 @@ pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
data_type: &DataType,
column_index: u32,
field: &Field,
column_index: &mut ColumnIndexSequence,
cache_bytes_per_column: u64,
keep_original_array: bool,
config: &HashMap<String, String>,
Expand All @@ -260,13 +287,13 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
data_type: &DataType,
column_index: u32,
field: &Field,
column_index: &mut ColumnIndexSequence,
cache_bytes_per_column: u64,
keep_original_array: bool,
_config: &HashMap<String, String>,
) -> Result<Box<dyn FieldEncoder>> {
match data_type {
match field.data_type() {
DataType::Boolean
| DataType::Date32
| DataType::Date64
Expand Down Expand Up @@ -295,13 +322,14 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
cache_bytes_per_column,
keep_original_array,
self.array_encoding_strategy.clone(),
column_index,
column_index.next_column_index(field.id),
)?)),
DataType::List(child) => {
let list_idx = column_index.next_column_index(field.id);
let inner_encoding = encoding_strategy_root.create_field_encoder(
encoding_strategy_root,
child.data_type(),
column_index + 1,
&field.children[0],
column_index,
cache_bytes_per_column,
keep_original_array,
child.metadata(),
Expand All @@ -310,37 +338,40 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
inner_encoding,
cache_bytes_per_column,
keep_original_array,
column_index,
list_idx,
)))
}
DataType::Struct(children) => {
let children_encoders = children
DataType::Struct(_) => {
let header_idx = column_index.next_column_index(field.id);
let children_encoders = field
.children
.iter()
.enumerate()
.map(|(field_idx, field)| {
.map(|field| {
self.create_field_encoder(
encoding_strategy_root,
field.data_type(),
column_index + field_idx as u32 + 1,
field,
column_index,
cache_bytes_per_column,
keep_original_array,
field.metadata(),
&field.metadata,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(StructFieldEncoder::new(
children_encoders,
column_index,
header_idx,
)))
}
DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => {
let list_idx = column_index.next_column_index(field.id);
column_index.skip();
Ok(Box::new(BinaryFieldEncoder::new(
cache_bytes_per_column,
keep_original_array,
column_index,
list_idx,
)))
}
_ => todo!("Implement encoding for data type {}", data_type),
_ => todo!("Implement encoding for field {}", field),
}
}
}
Expand All @@ -360,16 +391,15 @@ impl BatchEncoder {
keep_original_array: bool,
) -> Result<Self> {
let mut col_idx = 0;
let mut field_col_mapping = Vec::new();
let mut col_idx_sequence = ColumnIndexSequence::new();
let field_encoders = schema
.fields
.iter()
.map(|field| {
field_col_mapping.push((field.id, col_idx as i32));
let encoder = strategy.create_field_encoder(
strategy,
&field.data_type(),
col_idx,
&field,
&mut col_idx_sequence,
cache_bytes_per_column,
keep_original_array,
&field.metadata,
Expand All @@ -380,7 +410,7 @@ impl BatchEncoder {
.collect::<Result<Vec<_>>>()?;
Ok(Self {
field_encoders,
field_id_to_column_index: field_col_mapping,
field_id_to_column_index: col_idx_sequence.mapping,
})
}

Expand Down
15 changes: 10 additions & 5 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use lance_datagen::{array, gen, RowCount, Seed};

use crate::{
decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMessage, PageInfo},
encoder::{CoreFieldEncodingStrategy, EncodedPage, FieldEncoder, FieldEncodingStrategy},
encoder::{
ColumnIndexSequence, CoreFieldEncodingStrategy, EncodedPage, FieldEncoder,
FieldEncodingStrategy,
},
encodings::logical::r#struct::SimpleStructDecoder,
EncodingsIo,
};
Expand Down Expand Up @@ -102,11 +105,12 @@ pub async fn check_round_trip_encoding_random(field: Field) {
let encoding_strategy = CoreFieldEncodingStrategy::default();
let encoding_config = HashMap::new();
let encoder_factory = || {
let mut column_index_seq = ColumnIndexSequence::new();
encoding_strategy
.create_field_encoder(
&encoding_strategy,
&lance_field.data_type(),
0,
&lance_field,
&mut column_index_seq,
page_size,
true,
&encoding_config,
Expand Down Expand Up @@ -178,11 +182,12 @@ pub async fn check_round_trip_encoding_of_data(data: Vec<Arc<dyn Array>>, test_c
for page_size in [4096, 1024 * 1024] {
let encoding_strategy = CoreFieldEncodingStrategy::default();
let encoding_config = HashMap::new();
let mut column_index_seq = ColumnIndexSequence::new();
let encoder = encoding_strategy
.create_field_encoder(
&encoding_strategy,
&lance_field.data_type(),
0,
&lance_field,
&mut column_index_seq,
page_size,
true,
&encoding_config,
Expand Down

0 comments on commit 68cc3d8

Please sign in to comment.