Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: double-encode paths during zorder optimize when they contain special characters #2897

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
@@ -76,9 +76,7 @@ use url::Url;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{
Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt,
};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
2 changes: 0 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
@@ -3099,7 +3099,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
@@ -3194,7 +3193,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
120 changes: 113 additions & 7 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
@@ -39,7 +39,8 @@ use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;
use tracing::*;
use url::Url;

use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
@@ -137,6 +138,7 @@ impl fmt::Display for MetricDetails {
}
}

#[derive(Debug)]
/// Metrics for a single partition
pub struct PartialMetrics {
/// Number of optimized files added
@@ -345,6 +347,7 @@ impl From<OptimizeInput> for DeltaOperation {
}
}

/// Generate an appropriate remove action for the optimization task
fn create_remove(
path: &str,
partitions: &IndexMap<String, Scalar>,
@@ -606,12 +609,26 @@ impl MergePlan {
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ScalarUDF};

let locations = files
// This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then
// parse as URLs and then pass back to the object store (x_x). This can cause problems when
// paths in object storage have special characters like spaces, etc.
//
// This [str::replace] i kind of a hack to address
// <https://github.com/delta-io/delta-rs/issues/2834 >
let locations: Vec<String> = files
.iter()
.map(|file| format!("delta-rs:///{}", file.location))
.collect_vec();
.map(|om| {
format!(
"delta-rs:///{}",
str::replace(om.location.as_ref(), "%", "%25")
)
})
.collect();
debug!("Reading z-order with locations are: {locations:?}");

let df = context
.ctx
// TODO: should read options have the partition columns
.read_parquet(locations, ParquetReadOptions::default())
.await?;

@@ -712,13 +729,15 @@ impl MergePlan {
bins.len() <= num_cpus::get(),
));

debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store(),
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();

let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(_, (partition, files))| {
@@ -891,9 +910,7 @@ impl MergeBin {
self.size_bytes += meta.size as i64;
self.files.push(meta);
}
}

impl MergeBin {
fn iter(&self) -> impl Iterator<Item = &ObjectMeta> {
self.files.iter()
}
@@ -1036,6 +1053,7 @@ fn build_zorder_plan(
.or_insert_with(|| (partition_values, MergeBin::new()))
.1
.add(object_meta);
error!("partition_files inside the zorder plan: {partition_files:?}");
}

let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files);
@@ -1229,7 +1247,6 @@ pub(super) mod zorder {
let runtime = Arc::new(RuntimeEnv::new(config)?);
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

use url::Url;
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
@@ -1269,6 +1286,7 @@ pub(super) mod zorder {
fn zorder_key_datafusion(
columns: &[ColumnarValue],
) -> Result<ColumnarValue, DataFusionError> {
debug!("zorder_key_datafusion: {columns:#?}");
let length = columns
.iter()
.map(|col| match col {
@@ -1423,6 +1441,94 @@ pub(super) mod zorder {
.await;
assert!(res.is_ok());
}

/// Issue <https://github.com/delta-io/delta-rs/issues/2834>
#[tokio::test]
async fn test_zorder_space_in_partition_value() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany",
"China",
"Canada",
"Dominican Republic",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
//Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])),
//Arc::new(arrow::array::Int32Array::from(vec![100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}

#[tokio::test]
async fn test_zorder_space_in_partition_value_garbage() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany", "China", "Canada", "USA$$!",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}
}
}

35 changes: 35 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,13 @@
import pyarrow as pa
import pytest

try:
import pandas as pd
except ModuleNotFoundError:
_has_pandas = False
else:
_has_pandas = True

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties

@@ -132,3 +139,31 @@ def test_optimize_schema_evolved_table(
assert dt.to_pyarrow_table().sort_by([("foo", "ascending")]) == data.sort_by(
[("foo", "ascending")]
)


@pytest.mark.pandas
def test_zorder_with_space_partition(tmp_path: pathlib.Path):
df = pd.DataFrame(
{
"user": ["James", "Anna", "Sara", "Martin"],
"country": ["United States", "Canada", "Costa Rica", "South Africa"],
"age": [34, 23, 45, 26],
}
)

write_deltalake(
table_or_uri=tmp_path,
data=df,
mode="overwrite",
partition_by=["country"],
)

test_table = DeltaTable(tmp_path)

# retrieve by partition works fine
partitioned_df = test_table.to_pandas(
partitions=[("country", "=", "United States")],
)
print(partitioned_df)

test_table.optimize.z_order(columns=["user"])
Loading