Skip to content

Commit

Permalink
fix: include commit_seqno for merge order
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 18, 2025
1 parent cd6505e commit f5d793f
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 84 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub enum CoreError {
#[error("{0}")]
InvalidPartitionPath(String),

#[error("{0}")]
InvalidValue(String),

#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod error;
pub mod expr;
pub mod file_group;
pub mod merge;
pub mod metadata;
pub mod storage;
pub mod table;
pub mod timeline;
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/
pub mod record_merger;

use crate::config::error;
use crate::config::error::ConfigError;
use crate::config::error::ConfigError::InvalidValue;
Expand Down
169 changes: 86 additions & 83 deletions crates/core/src/merge/record_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ use crate::config::table::HudiTableConfig::{
};
use crate::config::HudiConfigs;
use crate::merge::RecordMergeStrategyValue;
use crate::metadata::meta_field::MetaField;
use crate::util::arrow::lexsort_to_indices;
use crate::util::arrow::ColumnAsArray;
use crate::Result;
use arrow::compute::{sort_to_indices, take_record_batch};
use arrow::error::ArrowError;
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
use arrow_schema::{SchemaRef, SortOptions};
use arrow::compute::take_record_batch;
use arrow_array::{Array, RecordBatch, UInt32Array};
use arrow_schema::SchemaRef;
use arrow_select::concat::concat_batches;
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -114,39 +116,26 @@ impl RecordMerger {
}

let precombine_field = self.hudi_configs.get(PrecombineField)?.to::<String>();
let precombine_array =
concat_batch
.column_by_name(&precombine_field)
.ok_or_else(|| {
ArrowError::SchemaError(format!("Column {precombine_field} not found."))
})?;

// Sort the precombine values in descending order, and put nulls last.
let sort_options = SortOptions::new(true, false);
let sorted_indices = sort_to_indices(precombine_array, Some(sort_options), None)?;

let record_key_field = "_hoodie_record_key";
let record_key_array = concat_batch
.column_by_name(record_key_field)
.ok_or_else(|| {
ArrowError::SchemaError(format!("Column {record_key_field} not found."))
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
ArrowError::CastError(format!(
"Column {record_key_field} cannot be cast to StringArray."
))
})?;

let precombine_array = concat_batch.get_array(&precombine_field)?;
let commit_seqno_array = concat_batch.get_array(MetaField::CommitSeqno.as_ref())?;
let sorted_indices = lexsort_to_indices(
&[precombine_array.clone(), commit_seqno_array.clone()],
true,
);

let record_key_array =
concat_batch.get_string_array(MetaField::RecordKey.as_ref())?;
let mut keys_and_latest_indices: HashMap<&str, u32> =
HashMap::with_capacity(record_key_array.len());
for i in sorted_indices.values() {
let record_key = record_key_array.value(*i as usize);
if keys_and_latest_indices.contains_key(record_key) {
// We sorted the precombine field in descending order, so if the record key
// is already in the map, the associated index will be already pointing to
// the latest version of that record.
// We sorted the precombine and commit seqno in descending order,
// so if the record key is already in the map, the associated row index
// will be already pointing to the latest version of that record.
// Note that records with the same record key, precombine value,
// and commit seqno are considered duplicates, and we keep whichever
// comes first in the sorted indices.
continue;
} else {
keys_and_latest_indices.insert(record_key, *i);
Expand All @@ -163,20 +152,24 @@ impl RecordMerger {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::Int32Array;
use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema};

fn create_configs(strategy: &str, meta_fields: bool, precombine: Option<&str>) -> HudiConfigs {
fn create_configs(
strategy: &str,
populates_meta_fields: bool,
precombine: Option<&str>,
) -> HudiConfigs {
if let Some(precombine) = precombine {
HudiConfigs::new([
(RecordMergeStrategy, strategy.to_string()),
(PopulatesMetaFields, meta_fields.to_string()),
(PopulatesMetaFields, populates_meta_fields.to_string()),
(PrecombineField, precombine.to_string()),
])
} else {
HudiConfigs::new([
(RecordMergeStrategy, strategy.to_string()),
(PopulatesMetaFields, meta_fields.to_string()),
(PopulatesMetaFields, populates_meta_fields.to_string()),
])
}
}
Expand Down Expand Up @@ -209,40 +202,56 @@ mod tests {
SchemaRef::from(schema)
}

fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, i32, i32)> {
fn create_test_schema(ts_nullable: bool) -> SchemaRef {
create_schema(vec![
(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
("ts", DataType::Int32, ts_nullable),
("value", DataType::Int32, false),
])
}

fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, String, i32, i32)> {
let seqno = batch
.get_string_array(MetaField::CommitSeqno.as_ref())
.unwrap();
let keys = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("First column must be strings");
let timestamps = batch
.column(1)
.get_string_array(MetaField::RecordKey.as_ref())
.unwrap();
let timestamps = batch.get_array("ts").unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<Int32Array>()
.expect("Second column must be i32");
let values = batch
.column(2)
.unwrap()
.clone();
let values = batch.get_array("value").unwrap();
let values = values
.as_any()
.downcast_ref::<Int32Array>()
.expect("Third column must be i32");
.unwrap()
.clone();

let mut result: Vec<(String, i32, i32)> = keys
let mut result: Vec<(String, String, i32, i32)> = seqno
.iter()
.zip(keys.iter())
.zip(timestamps.iter())
.zip(values.iter())
.map(|((k, t), v)| (k.unwrap().to_string(), t.unwrap(), v.unwrap()))
.map(|(((s, k), t), v)| {
(
s.unwrap().to_string(),
k.unwrap().to_string(),
t.unwrap(),
v.unwrap(),
)
})
.collect();
result.sort_unstable_by_key(|(k, ts, _)| (k.clone(), *ts));
result.sort_unstable_by_key(|(s, k, ts, _)| (k.clone(), *ts, s.clone()));
result
}

#[test]
fn test_merge_records_empty() {
let schema = create_schema(vec![
("_hoodie_record_key", DataType::Utf8, false),
("ts", DataType::Int32, false),
("value", DataType::Int32, false),
]);
let schema = create_test_schema(false);

let configs = create_configs("OVERWRITE_WITH_LATEST", true, Some("ts"));
let merger = RecordMerger::new(Arc::new(configs));
Expand All @@ -261,16 +270,13 @@ mod tests {

#[test]
fn test_merge_records_append_only() {
let schema = create_schema(vec![
("_hoodie_record_key", DataType::Utf8, false),
("ts", DataType::Int32, false),
("value", DataType::Int32, false),
]);
let schema = create_test_schema(false);

// First batch
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![10, 20])),
Expand All @@ -282,6 +288,7 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k3"])),
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(Int32Array::from(vec![30, 40])),
Expand All @@ -302,26 +309,23 @@ mod tests {
assert_eq!(
result,
vec![
("k1".to_string(), 1, 10),
("k1".to_string(), 3, 30),
("k2".to_string(), 2, 20),
("k3".to_string(), 4, 40),
("s1".to_string(), "k1".to_string(), 1, 10),
("s2".to_string(), "k1".to_string(), 3, 30),
("s1".to_string(), "k2".to_string(), 2, 20),
("s2".to_string(), "k3".to_string(), 4, 40),
]
);
}

#[test]
fn test_merge_records_nulls() {
let schema = create_schema(vec![
("_hoodie_record_key", DataType::Utf8, false),
("ts", DataType::Int32, true), // Nullable timestamp
("value", DataType::Int32, false),
]);
let schema = create_test_schema(true);

// First batch with some null timestamps
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
Expand All @@ -333,6 +337,7 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![None, Some(5)])),
Arc::new(Int32Array::from(vec![40, 50])),
Expand All @@ -352,25 +357,22 @@ mod tests {
assert_eq!(
result,
vec![
("k1".to_string(), 1, 10), // Keep original since both updates have null ts
("k2".to_string(), 5, 50), // Take second value due to higher ts
("k3".to_string(), 3, 30), // Unchanged
("s1".to_string(), "k1".to_string(), 1, 10), // Keep original since ts is null in 2nd batch
("s2".to_string(), "k2".to_string(), 5, 50), // Take second value due to higher ts
("s1".to_string(), "k3".to_string(), 3, 30), // Unchanged
]
);
}

#[test]
fn test_merge_records_overwrite_with_latest() {
let schema = create_schema(vec![
("_hoodie_record_key", DataType::Utf8, false),
("ts", DataType::Int32, false),
("value", DataType::Int32, false),
]);
let schema = create_test_schema(false);

// First batch
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
Expand All @@ -382,9 +384,10 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![4, 1])),
Arc::new(Int32Array::from(vec![40, 50])),
Arc::new(StringArray::from(vec!["s2", "s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![4, 1, 3])),
Arc::new(Int32Array::from(vec![40, 50, 60])),
],
)
.unwrap();
Expand All @@ -401,9 +404,9 @@ mod tests {
assert_eq!(
result,
vec![
("k1".to_string(), 4, 40), // Latest value due to ts=4
("k2".to_string(), 2, 20), // Original value since ts=1 < ts=2
("k3".to_string(), 3, 30), // Unchanged
("s2".to_string(), "k1".to_string(), 4, 40), // Latest value due to ts=4
("s1".to_string(), "k2".to_string(), 2, 20), // Original value since ts=1 < ts=2
("s2".to_string(), "k3".to_string(), 3, 60), // Latest value due to equal ts and seqno=s2
]
);
}
Expand Down
Loading

0 comments on commit f5d793f

Please sign in to comment.