Skip to content

Commit

Permalink
fix issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed May 3, 2024
1 parent 85089b1 commit 0b4f5c9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
20 changes: 14 additions & 6 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,13 @@ impl Snapshot {
}

/// Get the statistics schema of the snapshot
pub fn stats_schema(&self) -> DeltaResult<StructType> {
pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult<StructType> {
let schema = table_schema.unwrap_or_else(|| self.schema());

let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
stats_cols
.iter()
.map(|col| match self.schema().field_with_name(col) {
.map(|col| match schema.field_with_name(col) {
Ok(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
Expand All @@ -314,7 +316,7 @@ impl Snapshot {
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = self.table_config().num_indexed_cols();
self.schema()
schema
.fields
.iter()
.enumerate()
Expand Down Expand Up @@ -362,7 +364,7 @@ impl EagerSnapshot {
let mut files = Vec::new();
let mut scanner = LogReplayScanner::new();
files.push(scanner.process_files_batch(&batch, true)?);
let mapper = LogMapper::try_new(&snapshot)?;
let mapper = LogMapper::try_new(&snapshot, None)?;
files = files
.into_iter()
.map(|b| mapper.map_batch(b))
Expand Down Expand Up @@ -401,7 +403,7 @@ impl EagerSnapshot {
)
.boxed()
};
let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = LogMapper::try_new(&self.snapshot, None)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
Expand Down Expand Up @@ -517,7 +519,13 @@ impl EagerSnapshot {
files.push(scanner.process_files_batch(&batch?, true)?);
}

let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = if let Some(metadata) = &metadata {
let new_schema: StructType = serde_json::from_str(&metadata.schema_string)?;
LogMapper::try_new(&self.snapshot, Some(&new_schema))?
} else {
LogMapper::try_new(&self.snapshot, None)?
};

self.files = files
.into_iter()
.chain(
Expand Down
10 changes: 7 additions & 3 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tracing::debug;

use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::kernel::StructType;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::Snapshot;
Expand All @@ -41,7 +42,7 @@ pin_project! {

impl<S> ReplayStream<S> {
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
config: snapshot.config.clone(),
Expand All @@ -61,9 +62,12 @@ pub(super) struct LogMapper {
}

impl LogMapper {
pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult<Self> {
pub(super) fn try_new(
snapshot: &Snapshot,
table_schema: Option<&StructType>,
) -> DeltaResult<Self> {
Ok(Self {
stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?),
stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?),
config: snapshot.config.clone(),
})
}
Expand Down
19 changes: 19 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1570,3 +1570,22 @@ def test_write_timestamp_ntz_on_table_with_features_not_enabled(tmp_path: pathli
write_deltalake(
tmp_path, data, mode="overwrite", engine="pyarrow", schema_mode="overwrite"
)


@pytest.mark.parametrize("engine", ["pyarrow", "rust"])
def test_parse_stats_with_new_schema(tmp_path, engine):
sample_data = pa.table(
{
"val": pa.array([1, 1], pa.int8()),
}
)
write_deltalake(tmp_path, sample_data)

sample_data = pa.table(
{
"val": pa.array([1000000000000, 1000000000000], pa.int64()),
}
)
write_deltalake(
tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine
)

0 comments on commit 0b4f5c9

Please sign in to comment.