Skip to content

Commit

Permalink
fix: replace BTreeMap with IndexMap to preserve insertion order (#2150)
Browse files Browse the repository at this point in the history
# Description

When switching to using `BTreeMap` for representing partition values, I
introduced a bug, thinking `BTreeMap` would preserve insertion order.
While it is ordered, it is ordered based on keys.

This PR moves to using `IndexMap`, which actually preserves insertion
order.
  • Loading branch information
roeap authored Feb 1, 2024
1 parent bdb03a3 commit 5518261
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 42 deletions.
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ errno = "0.3"
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
hyper = { version = "0.14", optional = true }
indexmap = "2.2.1"
itertools = "0.12"
lazy_static = "1"
libc = ">=0.2.90, <1"
Expand Down
23 changes: 11 additions & 12 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use chrono::{NaiveDateTime, TimeZone, Utc};
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectMeta;
use percent_encoding::percent_decode_str;
Expand All @@ -19,37 +20,35 @@ const COL_MIN_VALUES: &str = "minValues";
const COL_MAX_VALUES: &str = "maxValues";
const COL_NULL_COUNT: &str = "nullCount";

pub(crate) type PartitionFields<'a> = Arc<BTreeMap<&'a str, &'a StructField>>;
pub(crate) type PartitionValues<'a> = BTreeMap<&'a str, Scalar>;
pub(crate) type PartitionFields<'a> = Arc<IndexMap<&'a str, &'a StructField>>;
pub(crate) type PartitionValues<'a> = IndexMap<&'a str, Scalar>;

pub(crate) trait PartitionsExt {
fn hive_partition_path(&self) -> String;
}

impl PartitionsExt for BTreeMap<&str, Scalar> {
impl PartitionsExt for IndexMap<&str, Scalar> {
fn hive_partition_path(&self) -> String {
let mut fields = self
let fields = self
.iter()
.map(|(k, v)| {
let encoded = v.serialize_encoded();
format!("{k}={encoded}")
})
.collect::<Vec<_>>();
fields.reverse();
fields.join("/")
}
}

impl PartitionsExt for BTreeMap<String, Scalar> {
impl PartitionsExt for IndexMap<String, Scalar> {
fn hive_partition_path(&self) -> String {
let mut fields = self
let fields = self
.iter()
.map(|(k, v)| {
let encoded = v.serialize_encoded();
format!("{k}={encoded}")
})
.collect::<Vec<_>>();
fields.reverse();
fields.join("/")
}
}
Expand Down Expand Up @@ -192,7 +191,7 @@ impl LogicalFile<'_> {
/// The partition values for this logical file.
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
if self.partition_fields.is_empty() {
return Ok(BTreeMap::new());
return Ok(IndexMap::new());
}
let map_value = self.partition_values.value(self.index);
let keys = map_value
Expand Down Expand Up @@ -237,7 +236,7 @@ impl LogicalFile<'_> {
.unwrap_or(Scalar::Null(f.data_type.clone()));
Ok((*k, val))
})
.collect::<DeltaResult<BTreeMap<_, _>>>()
.collect::<DeltaResult<IndexMap<_, _>>>()
}

/// Defines a deletion vector
Expand Down Expand Up @@ -355,7 +354,7 @@ impl<'a> FileStatsAccessor<'a> {
.partition_columns
.iter()
.map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?)))
.collect::<DeltaResult<BTreeMap<_, _>>>()?,
.collect::<DeltaResult<IndexMap<_, _>>>()?,
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_array::RecordBatch;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{Future, StreamExt, TryStreamExt};
use indexmap::IndexMap;
use itertools::Itertools;
use num_cpus;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
Expand Down Expand Up @@ -308,7 +309,7 @@ impl From<OptimizeInput> for DeltaOperation {

fn create_remove(
path: &str,
partitions: &BTreeMap<String, Scalar>,
partitions: &IndexMap<String, Scalar>,
size: i64,
) -> Result<Action, DeltaTableError> {
// NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now.
Expand Down Expand Up @@ -353,11 +354,11 @@ enum OptimizeOperations {
///
/// Bins are determined by the bin-packing algorithm to reach an optimal size.
/// Files that are large enough already are skipped. Bins of size 1 are dropped.
Compact(HashMap<String, (BTreeMap<String, Scalar>, Vec<MergeBin>)>),
Compact(HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)>),
/// Plan to Z-order each partition
ZOrder(
Vec<String>,
HashMap<String, (BTreeMap<String, Scalar>, MergeBin)>,
HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
),
// TODO: Sort
}
Expand Down Expand Up @@ -401,7 +402,7 @@ impl MergePlan {
/// collected during the operation.
async fn rewrite_files<F>(
task_parameters: Arc<MergeTaskParameters>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
files: MergeBin,
object_store: ObjectStoreRef,
read_stream: F,
Expand Down Expand Up @@ -849,7 +850,7 @@ fn build_compaction_plan(
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

let mut partition_files: HashMap<String, (BTreeMap<String, Scalar>, Vec<ObjectMeta>)> =
let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<ObjectMeta>)> =
HashMap::new();
for add in snapshot.get_active_add_actions_by_partitions(filters)? {
let add = add?;
Expand All @@ -863,7 +864,7 @@ fn build_compaction_plan(
.partition_values()?
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<_, _>>();
.collect::<IndexMap<_, _>>();

partition_files
.entry(add.partition_values()?.hive_partition_path())
Expand All @@ -877,7 +878,7 @@ fn build_compaction_plan(
file.sort_by(|a, b| b.size.cmp(&a.size));
}

let mut operations: HashMap<String, (BTreeMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
for (part, (partition, files)) in partition_files {
let mut merge_bins = vec![MergeBin::new()];

Expand Down Expand Up @@ -955,14 +956,14 @@ fn build_zorder_plan(
// For now, just be naive and optimize all files in each selected partition.
let mut metrics = Metrics::default();

let mut partition_files: HashMap<String, (BTreeMap<String, Scalar>, MergeBin)> = HashMap::new();
let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, MergeBin)> = HashMap::new();
for add in snapshot.get_active_add_actions_by_partitions(filters)? {
let add = add?;
let partition_values = add
.partition_values()?
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<_, _>>();
.collect::<IndexMap<_, _>>();
metrics.total_considered_files += 1;
let object_meta = ObjectMeta::try_from(&add)?;

Expand Down
11 changes: 6 additions & 5 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Abstractions and implementations for writing data to delta tables
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand Down Expand Up @@ -155,7 +156,7 @@ impl DeltaWriter {
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
) -> DeltaResult<()> {
let partition_key = Path::parse(partition_values.hive_partition_path())?;

Expand Down Expand Up @@ -217,7 +218,7 @@ pub(crate) struct PartitionWriterConfig {
/// Prefix applied to all paths
prefix: Path,
/// Values for all partition columns
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
/// Properties passed to underlying parquet writer
writer_properties: WriterProperties,
/// Size above which we will write a buffered parquet file to disk.
Expand All @@ -230,7 +231,7 @@ pub(crate) struct PartitionWriterConfig {
impl PartitionWriterConfig {
pub fn try_new(
file_schema: ArrowSchemaRef,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
Expand Down Expand Up @@ -514,7 +515,7 @@ mod tests {
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
BTreeMap::new(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
Expand Down
15 changes: 8 additions & 7 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Main writer API to write json messages to delta table
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
Expand Down Expand Up @@ -45,7 +46,7 @@ pub(crate) struct DataArrowWriter {
writer_properties: WriterProperties,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
buffered_record_batch_count: usize,
}

Expand Down Expand Up @@ -153,7 +154,7 @@ impl DataArrowWriter {
writer_properties.clone(),
)?;

let partition_values = BTreeMap::new();
let partition_values = IndexMap::new();
let buffered_record_batch_count = 0;

Ok(Self {
Expand Down Expand Up @@ -397,8 +398,8 @@ fn quarantine_failed_parquet_rows(
fn extract_partition_values(
partition_cols: &[String],
record_batch: &RecordBatch,
) -> Result<BTreeMap<String, Scalar>, DeltaWriterError> {
let mut partition_values = BTreeMap::new();
) -> Result<IndexMap<String, Scalar>, DeltaWriterError> {
let mut partition_values = IndexMap::new();

for col_name in partition_cols.iter() {
let arrow_schema = record_batch.schema();
Expand Down Expand Up @@ -499,15 +500,15 @@ mod tests {
&record_batch
)
.unwrap(),
BTreeMap::from([
IndexMap::from([
(String::from("col1"), Scalar::Integer(1)),
(String::from("col2"), Scalar::Integer(2)),
(String::from("col3"), Scalar::Null(DataType::INTEGER)),
])
);
assert_eq!(
extract_partition_values(&[String::from("col1")], &record_batch).unwrap(),
BTreeMap::from([(String::from("col1"), Scalar::Integer(1)),])
IndexMap::from([(String::from("col1"), Scalar::Integer(1)),])
);
assert!(extract_partition_values(&[String::from("col4")], &record_batch).is_err())
}
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! the writer. Once written, add actions are returned by the writer. It's the users responsibility
//! to create the transaction using those actions.
use std::collections::BTreeMap;
use std::{collections::HashMap, sync::Arc};

use arrow::array::{Array, UInt32Array};
Expand All @@ -15,6 +14,7 @@ use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl RecordBatchWriter {
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
) -> Result<(), DeltaTableError> {
let arrow_schema =
arrow_schema_without_partitions(&self.arrow_schema_ref, &self.partition_columns);
Expand Down Expand Up @@ -212,7 +212,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
#[derive(Clone, Debug)]
pub struct PartitionResult {
/// values found in partition columns
pub partition_values: BTreeMap<String, Scalar>,
pub partition_values: IndexMap<String, Scalar>,
/// remaining dataset with partition column values removed
pub record_batch: RecordBatch,
}
Expand All @@ -222,14 +222,14 @@ struct PartitionWriter {
writer_properties: WriterProperties,
pub(super) buffer: ShareableBuffer,
pub(super) arrow_writer: ArrowWriter<ShareableBuffer>,
pub(super) partition_values: BTreeMap<String, Scalar>,
pub(super) partition_values: IndexMap<String, Scalar>,
pub(super) buffered_record_batch_count: usize,
}

impl PartitionWriter {
pub fn new(
arrow_schema: Arc<ArrowSchema>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
writer_properties: WriterProperties,
) -> Result<Self, ParquetError> {
let buffer = ShareableBuffer::default();
Expand Down Expand Up @@ -302,7 +302,7 @@ pub(crate) fn divide_by_partition_values(

if partition_columns.is_empty() {
partitions.push(PartitionResult {
partition_values: BTreeMap::new(),
partition_values: IndexMap::new(),
record_batch: values.clone(),
});
return Ok(partitions);
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, ops::AddAssign};

use indexmap::IndexMap;
use parquet::format::FileMetaData;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{basic::LogicalType, errors::ParquetError};
Expand All @@ -17,7 +17,7 @@ use crate::protocol::{ColumnValueStat, Stats};

/// Creates an [`Add`] log action struct.
pub fn create_add(
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
path: String,
size: i64,
file_metadata: &FileMetaData,
Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn create_add(
}

fn stats_from_file_metadata(
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
file_metadata: &FileMetaData,
) -> Result<Stats, DeltaWriterError> {
let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
Expand Down

0 comments on commit 5518261

Please sign in to comment.