-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: merge schema #2229
feat: merge schema #2229
Changes from 3 commits
cd11c76
3ce861a
36d3463
3c21940
3c9ff11
4d99d99
d159020
fd457d8
a8a711c
f515f31
6182cff
ca761a2
35027ed
d95889a
36fa567
1602333
563bf30
0f97fd7
0f7fba5
950cd23
449007c
3292de0
dfec2ac
4a09921
46c084a
e629f4c
360c43b
f86d069
d14b4b0
dc71771
4c7a9e1
9fbb9bb
d70b716
e816061
b07f219
a7ee463
6a9012b
06eb8b3
9b81041
e3f8b8b
e26add2
15d4be3
2d36999
82a0233
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
//! ```` | ||
|
||
use std::collections::HashMap; | ||
use std::str::FromStr; | ||
use std::sync::Arc; | ||
use std::time::{SystemTime, UNIX_EPOCH}; | ||
|
||
|
@@ -50,7 +51,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression; | |
use crate::delta_datafusion::DeltaDataChecker; | ||
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; | ||
use crate::errors::{DeltaResult, DeltaTableError}; | ||
use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType}; | ||
use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; | ||
use crate::logstore::LogStoreRef; | ||
use crate::protocol::{DeltaOperation, SaveMode}; | ||
use crate::storage::ObjectStoreRef; | ||
|
@@ -87,6 +88,33 @@ impl From<WriteError> for DeltaTableError { | |
} | ||
} | ||
|
||
///Specifies how to handle schema drifts | ||
#[derive(PartialEq)] | ||
pub enum SchemaWriteMode { | ||
/// Use existing schema and fail if it does not match the new schema | ||
None, | ||
aersam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Overwrite the schema with the new schema | ||
Overwrite, | ||
/// Append the new schema to the existing schema | ||
Merge, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is already some schema evolution code inside of the root writer module, would you be up for consolidating some of this code? It's been on my todo list to bring this write operation and the RecordBatch/JsonWriters into alignment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I first looked at the code I asked myself, why there are so many writer implementations? Actually I think there should be only one writer, which is the record_batch writer. The Json Writer should just be producing record batches and use the record batch writer and the writer operation should also use the record batch writer. But maybe I oversee something here and it's probably a bigger undertaking? And also it seems the RecordBatch Writer does not have Overwrite schema yet, right? A lot simpler would be to use the existing WriteMode enum to not duplicate there. And maybe make a separate file with the schema merge logic which then also handles the partition columns. What level of consolidating did you had in mind? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After having tried some stuff I came to conclusion that this is a bigger project I'm not sure I'm the right one to do, since I don't really see the use case for RecordBatchWriter and how the connection of writer.rs to DataFusion is an issue. I'd prefer having this merged first and then discuss further code deduplication |
||
|
||
impl FromStr for SchemaWriteMode { | ||
type Err = DeltaTableError; | ||
|
||
fn from_str(s: &str) -> DeltaResult<Self> { | ||
match s.to_ascii_lowercase().as_str() { | ||
"none" => Ok(SchemaWriteMode::None), | ||
"overwrite" => Ok(SchemaWriteMode::Overwrite), | ||
"merge" => Ok(SchemaWriteMode::Merge), | ||
_ => Err(DeltaTableError::Generic(format!( | ||
"Invalid schema write mode provided: {}, only these are supported: ['none', 'overwrite', 'merge']", | ||
aersam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s | ||
))), | ||
} | ||
} | ||
} | ||
|
||
/// Write data into a DeltaTable | ||
pub struct WriteBuilder { | ||
/// A snapshot of the to-be-loaded table's state | ||
|
@@ -109,8 +137,8 @@ pub struct WriteBuilder { | |
write_batch_size: Option<usize>, | ||
/// RecordBatches to be written into the table | ||
batches: Option<Vec<RecordBatch>>, | ||
/// whether to overwrite the schema | ||
overwrite_schema: bool, | ||
/// whether to overwrite the schema or to merge it | ||
schema_write_mode: SchemaWriteMode, | ||
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) | ||
safe_cast: bool, | ||
/// Parquet writer properties | ||
|
@@ -140,7 +168,7 @@ impl WriteBuilder { | |
write_batch_size: None, | ||
batches: None, | ||
safe_cast: false, | ||
overwrite_schema: false, | ||
schema_write_mode: SchemaWriteMode::None, | ||
writer_properties: None, | ||
app_metadata: None, | ||
name: None, | ||
|
@@ -155,9 +183,9 @@ impl WriteBuilder { | |
self | ||
} | ||
|
||
/// Add overwrite_schema | ||
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self { | ||
self.overwrite_schema = overwrite_schema; | ||
/// Add Schema Write Mode | ||
pub fn with_schema_write_mode(mut self, schema_write_mode: SchemaWriteMode) -> Self { | ||
self.schema_write_mode = schema_write_mode; | ||
self | ||
} | ||
|
||
|
@@ -311,11 +339,37 @@ async fn write_execution_plan_with_predicate( | |
write_batch_size: Option<usize>, | ||
writer_properties: Option<WriterProperties>, | ||
safe_cast: bool, | ||
overwrite_schema: bool, | ||
) -> DeltaResult<Vec<Add>> { | ||
schema_write_mode: SchemaWriteMode, | ||
) -> DeltaResult<Vec<Action>> { | ||
let mut schema_action: Option<Action> = None; | ||
// Use input schema to prevent wrapping partitions columns into a dictionary. | ||
let schema: ArrowSchemaRef = if overwrite_schema { | ||
let schema: ArrowSchemaRef = if schema_write_mode == SchemaWriteMode::Overwrite { | ||
plan.schema() | ||
} else if schema_write_mode == SchemaWriteMode::Merge { | ||
let original_schema = snapshot | ||
.and_then(|s| s.input_schema().ok()) | ||
.unwrap_or(plan.schema()); | ||
if original_schema == plan.schema() { | ||
original_schema | ||
} else { | ||
let new_schema = Arc::new(arrow_schema::Schema::try_merge(vec![ | ||
original_schema.as_ref().clone(), | ||
plan.schema().as_ref().clone(), | ||
])?); | ||
let schema_struct: StructType = new_schema.clone().try_into()?; | ||
schema_action = Some(Action::Metadata(Metadata::try_new( | ||
schema_struct, | ||
match snapshot { | ||
Some(sn) => sn.metadata().partition_columns.clone(), | ||
None => vec![], | ||
}, | ||
match snapshot { | ||
Some(sn) => sn.metadata().configuration.clone(), | ||
None => HashMap::new(), | ||
}, | ||
)?)); | ||
new_schema | ||
} | ||
} else { | ||
snapshot | ||
.and_then(|s| s.input_schema().ok()) | ||
|
@@ -352,7 +406,7 @@ async fn write_execution_plan_with_predicate( | |
let mut writer = DeltaWriter::new(object_store.clone(), config); | ||
let checker_stream = checker.clone(); | ||
let mut stream = inner_plan.execute(i, task_ctx)?; | ||
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> = | ||
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> = | ||
tokio::task::spawn(async move { | ||
while let Some(maybe_batch) = stream.next().await { | ||
let batch = maybe_batch?; | ||
|
@@ -361,14 +415,16 @@ async fn write_execution_plan_with_predicate( | |
super::cast::cast_record_batch(&batch, inner_schema.clone(), safe_cast)?; | ||
writer.write(&arr).await?; | ||
} | ||
writer.close().await | ||
let add_actions = writer.close().await; | ||
match add_actions { | ||
Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::<Vec<_>>()), | ||
Err(err) => Err(err), | ||
} | ||
}); | ||
|
||
tasks.push(handle); | ||
} | ||
|
||
// Collect add actions to add to commit | ||
Ok(futures::future::join_all(tasks) | ||
let mut actions = futures::future::join_all(tasks) | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
|
@@ -377,7 +433,12 @@ async fn write_execution_plan_with_predicate( | |
.collect::<Result<Vec<_>, _>>()? | ||
.concat() | ||
.into_iter() | ||
.collect::<Vec<_>>()) | ||
.collect::<Vec<_>>(); | ||
if let Some(schema_action) = schema_action { | ||
actions.push(schema_action); | ||
} | ||
// Collect add actions to add to commit | ||
Ok(actions) | ||
} | ||
|
||
#[allow(clippy::too_many_arguments)] | ||
|
@@ -391,8 +452,8 @@ pub(crate) async fn write_execution_plan( | |
write_batch_size: Option<usize>, | ||
writer_properties: Option<WriterProperties>, | ||
safe_cast: bool, | ||
overwrite_schema: bool, | ||
) -> DeltaResult<Vec<Add>> { | ||
schema_write_mode: SchemaWriteMode, | ||
aersam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> DeltaResult<Vec<Action>> { | ||
write_execution_plan_with_predicate( | ||
None, | ||
snapshot, | ||
|
@@ -404,7 +465,7 @@ pub(crate) async fn write_execution_plan( | |
write_batch_size, | ||
writer_properties, | ||
safe_cast, | ||
overwrite_schema, | ||
schema_write_mode, | ||
) | ||
.await | ||
} | ||
|
@@ -417,7 +478,7 @@ async fn execute_non_empty_expr( | |
expression: &Expr, | ||
rewrite: &[Add], | ||
writer_properties: Option<WriterProperties>, | ||
) -> DeltaResult<Vec<Add>> { | ||
) -> DeltaResult<Vec<Action>> { | ||
// For each identified file perform a parquet scan + filter + limit (1) + count. | ||
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. | ||
|
||
|
@@ -452,7 +513,7 @@ async fn execute_non_empty_expr( | |
None, | ||
writer_properties, | ||
false, | ||
false, | ||
SchemaWriteMode::None, | ||
) | ||
.await?; | ||
|
||
|
@@ -488,7 +549,7 @@ async fn prepare_predicate_actions( | |
}; | ||
let remove = candidates.candidates; | ||
|
||
let mut actions: Vec<Action> = add.into_iter().map(Action::Add).collect(); | ||
let mut actions: Vec<Action> = add.into_iter().collect(); | ||
|
||
for action in remove { | ||
actions.push(Action::Remove(Remove { | ||
|
@@ -563,7 +624,8 @@ impl std::future::IntoFuture for WriteBuilder { | |
.unwrap_or(schema.clone()); | ||
|
||
if !can_cast_batch(schema.fields(), table_schema.fields()) | ||
&& !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite)) | ||
&& (this.schema_write_mode == SchemaWriteMode::None | ||
&& !matches!(this.mode, SaveMode::Overwrite)) | ||
{ | ||
return Err(DeltaTableError::Generic( | ||
"Schema of data does not match table schema".to_string(), | ||
|
@@ -641,10 +703,10 @@ impl std::future::IntoFuture for WriteBuilder { | |
this.write_batch_size, | ||
this.writer_properties.clone(), | ||
this.safe_cast, | ||
this.overwrite_schema, | ||
this.schema_write_mode, | ||
) | ||
.await?; | ||
actions.extend(add_actions.into_iter().map(Action::Add)); | ||
actions.extend(add_actions); | ||
|
||
// Collect remove actions if we are overwriting the table | ||
if let Some(snapshot) = &this.snapshot { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
am I allowed to do that? :)