Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust): serialize MetricDetails from compaction runs to a string #2317

Merged
43 changes: 38 additions & 5 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! ````

use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

Expand All @@ -36,7 +37,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;

use super::transaction::PROTOCOL;
Expand All @@ -60,8 +61,16 @@ pub struct Metrics {
/// Number of unoptimized files removed
pub num_files_removed: u64,
/// Detailed metrics for the add operation
#[serde(
serialize_with = "serialize_metric_details",
deserialize_with = "deserialize_metric_details"
)]
pub files_added: MetricDetails,
/// Detailed metrics for the remove operation
#[serde(
serialize_with = "serialize_metric_details",
deserialize_with = "deserialize_metric_details"
)]
pub files_removed: MetricDetails,
/// Number of partitions that had at least one file optimized
pub partitions_optimized: u64,
Expand All @@ -75,17 +84,34 @@ pub struct Metrics {
pub preserve_insertion_order: bool,
}

// Custom serialization function that serializes metric details as a string
fn serialize_metric_details<S>(value: &MetricDetails, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&value.to_string())
}

// Custom deserialization that parses a JSON string into MetricDetails
fn deserialize_metric_details<'de, D>(deserializer: D) -> Result<MetricDetails, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
serde_json::from_str(&s).map_err(DeError::custom)
}

/// Statistics on files for a particular operation
/// Operation can be remove or add
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MetricDetails {
/// Minimum file size of a operation
pub min: i64,
/// Maximum file size of a operation
pub max: i64,
/// Average file size of a operation
pub avg: f64,
/// Maximum file size of a operation
pub max: i64,
/// Minimum file size of a operation
pub min: i64,
/// Number of files encountered during operation
pub total_files: usize,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
/// Sum of file sizes of a operation
Expand All @@ -103,6 +129,13 @@ impl MetricDetails {
}
}

impl fmt::Display for MetricDetails {
/// Display the metric details using serde serialization
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
serde_json::to_string(self).map_err(|_| fmt::Error)?.fmt(f)
}
}

/// Metrics for a single partition
pub struct PartialMetrics {
/// Number of optimized files added
Expand Down
58 changes: 57 additions & 1 deletion python/tests/pyspark_integration/test_write_to_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
import pytest

from deltalake import write_deltalake
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaProtocolError

from .utils import assert_spark_read_equal, get_spark
Expand Down Expand Up @@ -113,3 +113,59 @@ def test_checks_min_writer_version(tmp_path: pathlib.Path):
):
valid_data = pa.table({"c1": pa.array([5, 6])})
write_deltalake(str(tmp_path), valid_data, mode="append")


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_read_optimize_history(tmp_path: pathlib.Path):
ids = ["1"] * 10
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

id_array = pa.array(ids, type=pa.string())
value_array = pa.array(values, type=pa.int32())

pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])

# Two writes on purpose for an optimize to occur
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])

dt = DeltaTable(tmp_path)
dt.optimize.compact(partition_filters=[("id", "=", "1")])

spark = get_spark()
history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")

latest_operation_metrics = (
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
)

assert latest_operation_metrics["operationMetrics"] is not None


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_read_z_ordered_history(tmp_path: pathlib.Path):
ids = ["1"] * 10
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

id_array = pa.array(ids, type=pa.string())
value_array = pa.array(values, type=pa.int32())

pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])

# Two writes on purpose for an optimize to occur
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])

dt = DeltaTable(tmp_path)
dt.optimize.z_order(columns=["value"], partition_filters=[("id", "=", "1")])

spark = get_spark()
history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")

latest_operation_metrics = (
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
)

assert latest_operation_metrics["operationMetrics"] is not None
Loading