Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge schema #2229

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cd11c76
compiles ;)
aersam Feb 28, 2024
3ce861a
python compiles
aersam Feb 28, 2024
36d3463
fmt & clippy
aersam Feb 28, 2024
3c21940
renamings
aersam Feb 29, 2024
3c9ff11
clippy's feedback
aersam Feb 29, 2024
4d99d99
test seems ok
aersam Mar 1, 2024
d159020
fmt
aersam Mar 1, 2024
fd457d8
clippy feedback
aersam Mar 1, 2024
a8a711c
compiles again after refactoring
aersam Mar 1, 2024
f515f31
fmt
aersam Mar 1, 2024
6182cff
clippy
aersam Mar 1, 2024
ca761a2
wip on new merge method
aersam Mar 1, 2024
35027ed
fmt
aersam Mar 1, 2024
d95889a
next fix
aersam Mar 1, 2024
36fa567
WIP
aersam Mar 1, 2024
1602333
compiles again
aersam Mar 1, 2024
563bf30
fmt
aersam Mar 1, 2024
0f97fd7
might fixes test
aersam Mar 1, 2024
0f7fba5
better cast
aersam Mar 1, 2024
950cd23
test passes!
aersam Mar 1, 2024
449007c
Merge branch 'main' of https://github.com/bmsuisse/delta-rs into appe…
aersam Mar 4, 2024
3292de0
tests passing in both rust and python
aersam Mar 4, 2024
dfec2ac
fnt
aersam Mar 4, 2024
4a09921
format
aersam Mar 4, 2024
46c084a
thanks, clippy for your feedback
aersam Mar 4, 2024
e629f4c
fix ruff and mypy version and format
aersam Mar 4, 2024
360c43b
Merge branch 'linter_versions' into append-python
aersam Mar 4, 2024
f86d069
Merge branch 'main' into append-python
aersam Mar 4, 2024
d14b4b0
validate schema if schema_mode not given
aersam Mar 4, 2024
dc71771
use new schema_mode parameter and refactor tests to match new behavior
aersam Mar 4, 2024
4c7a9e1
docs
aersam Mar 4, 2024
9fbb9bb
fmt
aersam Mar 4, 2024
d70b716
remove parameter that causes trouble with pyarrow 8
aersam Mar 4, 2024
e816061
format again :)
aersam Mar 4, 2024
b07f219
fighting with py 3.8 ;)
aersam Mar 4, 2024
a7ee463
address feedback
aersam Mar 4, 2024
6a9012b
clippy
aersam Mar 4, 2024
06eb8b3
errors
aersam Mar 5, 2024
9b81041
fmt
aersam Mar 5, 2024
e3f8b8b
unused import
aersam Mar 5, 2024
e26add2
Merge branch 'main' into append-python
aersam Mar 5, 2024
15d4be3
Better exception handling
aersam Mar 5, 2024
2d36999
commit missing file
aersam Mar 5, 2024
82a0233
do not use 0.9.1 of object_store for now
aersam Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! .await?;
//! ````

use core::panic;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -36,6 +37,7 @@ use serde_json::Value;

use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use super::write::SchemaWriteMode;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder, DeltaSessionContext};
use crate::errors::DeltaResult;
Expand Down Expand Up @@ -167,9 +169,15 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
SchemaWriteMode::None,
)
.await?;
.await?
.into_iter()
.map(|a| match a {
Action::Add(a) => a,
_ => panic!("Expected Add action"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I allowed to do that? :)

})
.collect::<Vec<Add>>();

let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
let filter_records = filter.metrics().and_then(|m| m.output_rows());
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use crate::delta_datafusion::{
use crate::kernel::Action;
use crate::logstore::LogStoreRef;
use crate::operations::merge::barrier::find_barrier_node;
use crate::operations::write::write_execution_plan;
use crate::operations::write::{write_execution_plan, SchemaWriteMode};
use crate::protocol::{DeltaOperation, MergePredicate};
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -1379,13 +1379,13 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
SchemaWriteMode::None,
)
.await?;

metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64;

let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();
metrics.num_target_files_added = actions.len();

let survivors = barrier
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Value;

use super::datafusion_utils::Expression;
use super::transaction::{commit, PROTOCOL};
use super::write::write_execution_plan;
use super::{datafusion_utils::Expression, write::SchemaWriteMode};
use crate::delta_datafusion::{
expr::fmt_expr_to_sql, physical::MetricObserverExec, DeltaColumn, DeltaSessionContext,
};
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
SchemaWriteMode::None,
)
.await?;

Expand All @@ -377,7 +377,7 @@ async fn execute(
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let mut actions: Vec<Action> = add_actions.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add_actions.clone();

metrics.num_added_files = actions.len();
metrics.num_removed_files = candidates.candidates.len();
Expand Down
112 changes: 87 additions & 25 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! ````

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

Expand All @@ -50,7 +51,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::DeltaDataChecker;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType};
use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType};
use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
Expand Down Expand Up @@ -87,6 +88,33 @@ impl From<WriteError> for DeltaTableError {
}
}

///Specifies how to handle schema drifts
#[derive(PartialEq)]
pub enum SchemaWriteMode {
/// Use existing schema and fail if it does not match the new schema
None,
/// Overwrite the schema with the new schema
Overwrite,
/// Append the new schema to the existing schema
Merge,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already some schema evolution code inside of the root writer module, would you be up for consolidating some of this code? It's been on my todo list to bring this write operation and the RecordBatch/JsonWriters into alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first looked at the code I asked myself, why there are so many writer implementations? Actually I think there should be only one writer, which is the record_batch writer. The Json Writer should just be producing record batches and use the record batch writer and the writer operation should also use the record batch writer. But maybe I oversee something here and it's probably a bigger undertaking? And also it seems the RecordBatch Writer does not have Overwrite schema yet, right?

A lot simpler would be to use the existing WriteMode enum to not duplicate there. And maybe make a separate file with the schema merge logic which then also handles the partition columns.

What level of consolidating did you had in mind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtyler I just tried what happens if I delete writer.rs and try to replace it with RecordBatchWriter in this PR: #2232 It seems to be possible. Would you generally prefer that approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After having tried some stuff I came to conclusion that this is a bigger project I'm not sure I'm the right one to do, since I don't really see the use case for RecordBatchWriter and how the connection of writer.rs to DataFusion is an issue. I'd prefer having this merged first and then discuss further code deduplication


impl FromStr for SchemaWriteMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(SchemaWriteMode::None),
"overwrite" => Ok(SchemaWriteMode::Overwrite),
"merge" => Ok(SchemaWriteMode::Merge),
_ => Err(DeltaTableError::Generic(format!(
"Invalid schema write mode provided: {}, only these are supported: ['none', 'overwrite', 'merge']",
s
))),
}
}
}

/// Write data into a DeltaTable
pub struct WriteBuilder {
/// A snapshot of the to-be-loaded table's state
Expand All @@ -109,8 +137,8 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
/// whether to overwrite the schema or to merge it
schema_write_mode: SchemaWriteMode,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
Expand Down Expand Up @@ -140,7 +168,7 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
overwrite_schema: false,
schema_write_mode: SchemaWriteMode::None,
writer_properties: None,
app_metadata: None,
name: None,
Expand All @@ -155,9 +183,9 @@ impl WriteBuilder {
self
}

/// Add overwrite_schema
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self {
self.overwrite_schema = overwrite_schema;
/// Add Schema Write Mode
pub fn with_schema_write_mode(mut self, schema_write_mode: SchemaWriteMode) -> Self {
self.schema_write_mode = schema_write_mode;
self
}

Expand Down Expand Up @@ -311,11 +339,37 @@ async fn write_execution_plan_with_predicate(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
schema_write_mode: SchemaWriteMode,
) -> DeltaResult<Vec<Action>> {
let mut schema_action: Option<Action> = None;
// Use input schema to prevent wrapping partitions columns into a dictionary.
let schema: ArrowSchemaRef = if overwrite_schema {
let schema: ArrowSchemaRef = if schema_write_mode == SchemaWriteMode::Overwrite {
plan.schema()
} else if schema_write_mode == SchemaWriteMode::Merge {
let original_schema = snapshot
.and_then(|s| s.input_schema().ok())
.unwrap_or(plan.schema());
if original_schema == plan.schema() {
original_schema
} else {
let new_schema = Arc::new(arrow_schema::Schema::try_merge(vec![
original_schema.as_ref().clone(),
plan.schema().as_ref().clone(),
])?);
let schema_struct: StructType = new_schema.clone().try_into()?;
schema_action = Some(Action::Metadata(Metadata::try_new(
schema_struct,
match snapshot {
Some(sn) => sn.metadata().partition_columns.clone(),
None => vec![],
},
match snapshot {
Some(sn) => sn.metadata().configuration.clone(),
None => HashMap::new(),
},
)?));
new_schema
}
} else {
snapshot
.and_then(|s| s.input_schema().ok())
Expand Down Expand Up @@ -352,7 +406,7 @@ async fn write_execution_plan_with_predicate(
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
let mut stream = inner_plan.execute(i, task_ctx)?;
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> =
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> =
tokio::task::spawn(async move {
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
Expand All @@ -361,14 +415,16 @@ async fn write_execution_plan_with_predicate(
super::cast::cast_record_batch(&batch, inner_schema.clone(), safe_cast)?;
writer.write(&arr).await?;
}
writer.close().await
let add_actions = writer.close().await;
match add_actions {
Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::<Vec<_>>()),
Err(err) => Err(err),
}
});

tasks.push(handle);
}

// Collect add actions to add to commit
Ok(futures::future::join_all(tasks)
let mut actions = futures::future::join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
Expand All @@ -377,7 +433,12 @@ async fn write_execution_plan_with_predicate(
.collect::<Result<Vec<_>, _>>()?
.concat()
.into_iter()
.collect::<Vec<_>>())
.collect::<Vec<_>>();
if let Some(schema_action) = schema_action {
actions.push(schema_action);
}
// Collect add actions to add to commit
Ok(actions)
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -391,8 +452,8 @@ pub(crate) async fn write_execution_plan(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
schema_write_mode: SchemaWriteMode,
) -> DeltaResult<Vec<Action>> {
write_execution_plan_with_predicate(
None,
snapshot,
Expand All @@ -404,7 +465,7 @@ pub(crate) async fn write_execution_plan(
write_batch_size,
writer_properties,
safe_cast,
overwrite_schema,
schema_write_mode,
)
.await
}
Expand All @@ -417,7 +478,7 @@ async fn execute_non_empty_expr(
expression: &Expr,
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
) -> DeltaResult<Vec<Add>> {
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.

Expand Down Expand Up @@ -452,7 +513,7 @@ async fn execute_non_empty_expr(
None,
writer_properties,
false,
false,
SchemaWriteMode::None,
)
.await?;

Expand Down Expand Up @@ -488,7 +549,7 @@ async fn prepare_predicate_actions(
};
let remove = candidates.candidates;

let mut actions: Vec<Action> = add.into_iter().map(Action::Add).collect();
let mut actions: Vec<Action> = add.into_iter().collect();

for action in remove {
actions.push(Action::Remove(Remove {
Expand Down Expand Up @@ -563,7 +624,8 @@ impl std::future::IntoFuture for WriteBuilder {
.unwrap_or(schema.clone());

if !can_cast_batch(schema.fields(), table_schema.fields())
&& !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite))
&& (this.schema_write_mode == SchemaWriteMode::None
&& !matches!(this.mode, SaveMode::Overwrite))
{
return Err(DeltaTableError::Generic(
"Schema of data does not match table schema".to_string(),
Expand Down Expand Up @@ -641,10 +703,10 @@ impl std::future::IntoFuture for WriteBuilder {
this.write_batch_size,
this.writer_properties.clone(),
this.safe_cast,
this.overwrite_schema,
this.schema_write_mode,
)
.await?;
actions.extend(add_actions.into_iter().map(Action::Add));
actions.extend(add_actions);

// Collect remove actions if we are overwriting the table
if let Some(snapshot) = &this.snapshot {
Expand Down
2 changes: 1 addition & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def write_to_deltalake(
partition_by: Optional[List[str]],
mode: str,
max_rows_per_group: int,
overwrite_schema: bool,
schema_write_mode: Optional[str],
predicate: Optional[str],
name: Optional[str],
description: Optional[str],
Expand Down
20 changes: 15 additions & 5 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
convert_pyarrow_table,
)
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable, WriterProperties

import warnings
try:
import pandas as pd # noqa: F811
except ModuleNotFoundError:
Expand Down Expand Up @@ -185,6 +185,7 @@ def write_deltalake(
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
overwrite_schema: bool = False,
schema_write_mode: Literal["none", "merge", "overwrite"] = "none",
storage_options: Optional[Dict[str, str]] = None,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
predicate: Optional[str] = None,
Expand Down Expand Up @@ -238,7 +239,8 @@ def write_deltalake(
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
overwrite_schema: If True, allows updating the schema of the table.
overwrite_schema: Deprecated, use schema_write_mode instead.
schema_write_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema.
storage_options: options passed to the native delta filesystem.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.
partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine.
Expand All @@ -256,7 +258,15 @@ def write_deltalake(
table.update_incremental()

__enforce_append_only(table=table, configuration=configuration, mode=mode)

if overwrite_schema:
assert schema_write_mode in ["none", "overwrite"] # none is default, overwrite would at least match
schema_write_mode = "overwrite"

warnings.warn(
"overwrite_schema is deprecated, use schema_write_mode instead. ",
category=DeprecationWarning,
stacklevel=2,
)
if isinstance(partition_by, str):
partition_by = [partition_by]

Expand Down Expand Up @@ -302,7 +312,7 @@ def write_deltalake(
partition_by=partition_by,
mode=mode,
max_rows_per_group=max_rows_per_group,
overwrite_schema=overwrite_schema,
schema_write_mode=schema_write_mode,
predicate=predicate,
name=name,
description=description,
Expand All @@ -327,7 +337,7 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema:
if table: # already exists
if sort_arrow_schema(schema) != sort_arrow_schema(
table.schema().to_pyarrow(as_large_types=large_dtypes)
) and not (mode == "overwrite" and overwrite_schema):
) and not (mode == "overwrite" and schema_write_mode == "overwrite"):
raise ValueError(
"Schema of data does not match table schema\n"
f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}"
Expand Down
Loading
Loading