-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(python): expose rust writer as additional engine v2 #1891
Changes from 36 commits
ca2acb8
3f55470
714cc56
52f0d6a
c067d4d
5c5f247
717b7c7
dae361e
3052f12
01f0194
9911574
a410dfa
8c976b6
57565b5
49a298b
635313f
633fd7f
07113c6
3a8c026
3e25561
d9a4ce0
e3c7189
5af1251
3744384
2e1f0c9
fee4d77
3173ad7
fe4fe51
c38b518
3ed7df0
573e8fe
bb5815a
8c56194
381df0d
3ab7687
e73eea3
b4f9695
50e7257
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -172,6 +172,7 @@ async fn excute_non_empty_expr( | |
None, | ||
writer_properties, | ||
false, | ||
false, | ||
) | ||
.await?; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1013,6 +1013,7 @@ async fn execute( | |
None, | ||
writer_properties, | ||
safe_cast, | ||
false, | ||
) | ||
.await?; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -363,6 +363,7 @@ async fn execute( | |
None, | ||
writer_properties, | ||
safe_cast, | ||
false, | ||
) | ||
.await?; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig}; | |
use super::{transaction::commit, CreateBuilder}; | ||
use crate::delta_datafusion::DeltaDataChecker; | ||
use crate::errors::{DeltaResult, DeltaTableError}; | ||
use crate::kernel::{Action, Add, Remove, StructType}; | ||
use crate::kernel::{Action, Add, Metadata, Remove, StructType}; | ||
use crate::logstore::LogStoreRef; | ||
use crate::protocol::{DeltaOperation, SaveMode}; | ||
use crate::storage::ObjectStoreRef; | ||
|
@@ -103,12 +103,20 @@ pub struct WriteBuilder { | |
write_batch_size: Option<usize>, | ||
/// RecordBatches to be written into the table | ||
batches: Option<Vec<RecordBatch>>, | ||
/// whether to overwrite the schema | ||
overwrite_schema: bool, | ||
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) | ||
safe_cast: bool, | ||
/// Parquet writer properties | ||
writer_properties: Option<WriterProperties>, | ||
/// Additional metadata to be added to commit | ||
app_metadata: Option<HashMap<String, serde_json::Value>>, | ||
/// Name of the table, only used when table doesn't exist yet | ||
name: Option<String>, | ||
/// Description of the table, only used when table doesn't exist yet | ||
description: Option<String>, | ||
/// Configurations of the delta table, only used when table doesn't exist | ||
configuration: HashMap<String, Option<String>>, | ||
} | ||
|
||
impl WriteBuilder { | ||
|
@@ -126,8 +134,12 @@ impl WriteBuilder { | |
write_batch_size: None, | ||
batches: None, | ||
safe_cast: false, | ||
overwrite_schema: false, | ||
writer_properties: None, | ||
app_metadata: None, | ||
name: None, | ||
description: None, | ||
configuration: Default::default(), | ||
} | ||
} | ||
|
||
|
@@ -137,6 +149,12 @@ impl WriteBuilder { | |
self | ||
} | ||
|
||
/// Add overwrite_schema | ||
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self { | ||
self.overwrite_schema = overwrite_schema; | ||
self | ||
} | ||
|
||
/// When using `Overwrite` mode, replace data that matches a predicate | ||
pub fn with_replace_where(mut self, predicate: impl Into<String>) -> Self { | ||
self.predicate = Some(predicate.into()); | ||
|
@@ -205,6 +223,31 @@ impl WriteBuilder { | |
self | ||
} | ||
|
||
/// Specify the table name. Optionally qualified with | ||
/// a database name [database_name.] table_name. | ||
pub fn with_table_name(mut self, name: impl Into<String>) -> Self { | ||
self.name = Some(name.into()); | ||
self | ||
} | ||
|
||
/// Comment to describe the table. | ||
pub fn with_description(mut self, description: impl Into<String>) -> Self { | ||
self.description = Some(description.into()); | ||
self | ||
} | ||
|
||
/// Set configuration on created table | ||
pub fn with_configuration( | ||
mut self, | ||
configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>, | ||
) -> Self { | ||
self.configuration = configuration | ||
.into_iter() | ||
.map(|(k, v)| (k.into(), v.map(|s| s.into()))) | ||
.collect(); | ||
self | ||
} | ||
|
||
async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> { | ||
match self.log_store.is_delta_table_location().await? { | ||
true => { | ||
|
@@ -229,10 +272,20 @@ impl WriteBuilder { | |
}?; | ||
let mut builder = CreateBuilder::new() | ||
.with_log_store(self.log_store.clone()) | ||
.with_columns(schema.fields().clone()); | ||
.with_columns(schema.fields().clone()) | ||
.with_configuration(self.configuration.clone()); | ||
if let Some(partition_columns) = self.partition_columns.as_ref() { | ||
builder = builder.with_partition_columns(partition_columns.clone()) | ||
} | ||
|
||
if let Some(name) = self.name.as_ref() { | ||
builder = builder.with_table_name(name.clone()); | ||
}; | ||
|
||
if let Some(desc) = self.description.as_ref() { | ||
builder = builder.with_comment(desc.clone()); | ||
}; | ||
|
||
let (_, actions, _) = builder.into_table_and_actions()?; | ||
Ok(actions) | ||
} | ||
|
@@ -251,14 +304,19 @@ pub(crate) async fn write_execution_plan( | |
write_batch_size: Option<usize>, | ||
writer_properties: Option<WriterProperties>, | ||
safe_cast: bool, | ||
overwrite_schema: bool, | ||
) -> DeltaResult<Vec<Add>> { | ||
let invariants = snapshot | ||
.current_metadata() | ||
.and_then(|meta| meta.schema.get_invariants().ok()) | ||
.unwrap_or_default(); | ||
|
||
// Use input schema to prevent wrapping partitions columns into a dictionary. | ||
let schema = snapshot.input_schema().unwrap_or(plan.schema()); | ||
let schema: ArrowSchemaRef = if overwrite_schema { | ||
plan.schema() | ||
} else { | ||
snapshot.input_schema().unwrap_or(plan.schema()) | ||
}; | ||
|
||
let checker = DeltaDataChecker::new(invariants); | ||
|
||
|
@@ -339,23 +397,26 @@ impl std::future::IntoFuture for WriteBuilder { | |
Ok(this.partition_columns.unwrap_or_default()) | ||
}?; | ||
|
||
let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into(); | ||
let plan = if let Some(plan) = this.input { | ||
Ok(plan) | ||
} else if let Some(batches) = this.batches { | ||
if batches.is_empty() { | ||
Err(WriteError::MissingData) | ||
} else { | ||
let schema = batches[0].schema(); | ||
schema = batches[0].schema(); | ||
let table_schema = this | ||
.snapshot | ||
.physical_arrow_schema(this.log_store.object_store().clone()) | ||
.await | ||
.or_else(|_| this.snapshot.arrow_schema()) | ||
.unwrap_or(schema.clone()); | ||
|
||
if !can_cast_batch(schema.fields(), table_schema.fields()) { | ||
if !can_cast_batch(schema.fields(), table_schema.fields()) | ||
&& !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite)) | ||
{ | ||
return Err(DeltaTableError::Generic( | ||
"Updating table schema not yet implemented".to_string(), | ||
"Schema of data does not match table schema".to_string(), | ||
)); | ||
}; | ||
|
||
|
@@ -390,7 +451,7 @@ impl std::future::IntoFuture for WriteBuilder { | |
vec![batches] | ||
}; | ||
|
||
Ok(Arc::new(MemoryExec::try_new(&data, schema, None)?) | ||
Ok(Arc::new(MemoryExec::try_new(&data, schema.clone(), None)?) | ||
as Arc<dyn ExecutionPlan>) | ||
} | ||
} else { | ||
|
@@ -415,12 +476,31 @@ impl std::future::IntoFuture for WriteBuilder { | |
this.write_batch_size, | ||
this.writer_properties, | ||
this.safe_cast, | ||
this.overwrite_schema, | ||
) | ||
.await?; | ||
actions.extend(add_actions.into_iter().map(Action::Add)); | ||
|
||
// Collect remove actions if we are overwriting the table | ||
if matches!(this.mode, SaveMode::Overwrite) { | ||
// Update metadata with new schema | ||
let table_schema = this | ||
.snapshot | ||
.physical_arrow_schema(this.log_store.object_store().clone()) | ||
.await | ||
.or_else(|_| this.snapshot.arrow_schema()) | ||
.unwrap_or(schema.clone()); | ||
|
||
if schema != table_schema { | ||
let mut metadata = this | ||
.snapshot | ||
.current_metadata() | ||
.ok_or(DeltaTableError::NoMetadata)? | ||
.clone(); | ||
metadata.schema = schema.clone().try_into().unwrap(); | ||
let metadata_action = Metadata::try_from(metadata).unwrap(); | ||
actions.push(Action::Metadata(metadata_action)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's handle these unwraps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @roeap Can you check if the change is ok? I think you want that it returns the error, so I used the question mark operator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, exactly right :) |
||
} | ||
// This should never error, since now() will always be larger than UNIX_EPOCH | ||
let deletion_timestamp = SystemTime::now() | ||
.duration_since(UNIX_EPOCH) | ||
|
@@ -445,7 +525,10 @@ impl std::future::IntoFuture for WriteBuilder { | |
|
||
match this.predicate { | ||
Some(_pred) => { | ||
todo!("Overwriting data based on predicate is not yet implemented") | ||
return Err(DeltaTableError::Generic( | ||
"Overwriting data based on predicate is not yet implemented" | ||
.to_string(), | ||
)); | ||
} | ||
_ => { | ||
let remove_actions = this | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the schemas here have Eq or PartialEq on them? This might not be as straight forward as this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has PartialEq on them. This piece was also reused from src/lib.rs in the python module