Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose rust writer as additional engine v2 #1891

Merged
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ca2acb8
first version
ion-elgreco Nov 15, 2023
3f55470
add try from uri with storage options
ion-elgreco Nov 15, 2023
714cc56
Start to enable overwrite_schema
ion-elgreco Nov 15, 2023
52f0d6a
add tests to check rust py03 writer
ion-elgreco Nov 16, 2023
c067d4d
remove comment
ion-elgreco Nov 16, 2023
5c5f247
rename and clean up
ion-elgreco Nov 17, 2023
717b7c7
add float type support in partition cols
ion-elgreco Nov 18, 2023
dae361e
check for pandas
ion-elgreco Nov 18, 2023
3052f12
add support for name and desc
ion-elgreco Nov 18, 2023
01f0194
fmt
ion-elgreco Nov 18, 2023
9911574
improve tests and add config support
ion-elgreco Nov 18, 2023
a410dfa
parametrize write benchmark
ion-elgreco Nov 18, 2023
8c976b6
add LargeUtf8 support in partition stringify
ion-elgreco Nov 18, 2023
57565b5
refactor: express log schema in delta types
roeap Nov 18, 2023
49a298b
feat(python): expose `convert_to_deltalake` (#1842)
ion-elgreco Nov 19, 2023
635313f
refactor: merge to use logical plans (#1720)
Blajda Nov 19, 2023
633fd7f
feat: create benchmarks for merge (#1857)
Blajda Nov 20, 2023
07113c6
Revert "refactor: express log schema in delta types"
ion-elgreco Nov 20, 2023
3a8c026
Merge branch 'main' into feat/expose_rust_writer_as_optional_engine
ion-elgreco Nov 20, 2023
3e25561
formatting
ion-elgreco Nov 20, 2023
d9a4ce0
use fromstr
ion-elgreco Nov 20, 2023
e3c7189
add overwrite_schema support
ion-elgreco Nov 20, 2023
5af1251
fix clippy of unrelated code ?
ion-elgreco Nov 20, 2023
3744384
cargo fmt
ion-elgreco Nov 20, 2023
2e1f0c9
resolve #1860
r3stl355 Nov 19, 2023
fee4d77
docs: on append, overwrite, delete and z-ordering (#1897)
MrPowers Nov 22, 2023
3173ad7
docs: update python docs link in readme.md (#1899)
thomasfrederikhoeck Nov 22, 2023
fe4fe51
fix: use physical name for column name lookup in partitions (#1836)
aersam Nov 24, 2023
c38b518
ci: run doctest in CI for Python API examples (#1840)
marijncv Nov 24, 2023
3ed7df0
feat(python): add pyarrow to delta compatible schema conversion in wr…
ion-elgreco Nov 24, 2023
573e8fe
resolve merge
ion-elgreco Nov 25, 2023
bb5815a
fix bug from conflict
ion-elgreco Nov 25, 2023
8c56194
Merge branch 'main' into fix/expose_writer_rust_to_python_v2
ion-elgreco Nov 25, 2023
381df0d
remove commented code
ion-elgreco Nov 25, 2023
3ab7687
expose predicate, handle error better, and add overloads
ion-elgreco Nov 25, 2023
e73eea3
fmt
ion-elgreco Nov 25, 2023
b4f9695
use ? instead of unwrap
ion-elgreco Nov 25, 2023
50e7257
Merge branch 'main' into fix/expose_writer_rust_to_python_v2
ion-elgreco Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down
17 changes: 17 additions & 0 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::vacuum::VacuumBuilder;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;
use std::collections::HashMap;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod convert_to_delta;
Expand Down Expand Up @@ -73,6 +74,22 @@ impl DeltaOps {
}
}

/// try from uri with storage options
pub async fn try_from_uri_with_storage_options(
uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> DeltaResult<Self> {
let mut table = DeltaTableBuilder::from_uri(uri)
.with_storage_options(storage_options)
.build()?;
// We allow for uninitialized locations, since we may want to create the table
match table.load().await {
Ok(_) => Ok(table.into()),
Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
Err(err) => Err(err),
}
}

/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down
99 changes: 91 additions & 8 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::{transaction::commit, CreateBuilder};
use crate::delta_datafusion::DeltaDataChecker;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Remove, StructType};
use crate::kernel::{Action, Add, Metadata, Remove, StructType};
use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
Expand Down Expand Up @@ -103,12 +103,20 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
/// Name of the table, only used when table doesn't exist yet
name: Option<String>,
/// Description of the table, only used when table doesn't exist yet
description: Option<String>,
/// Configurations of the delta table, only used when table doesn't exist
configuration: HashMap<String, Option<String>>,
}

impl WriteBuilder {
Expand All @@ -126,8 +134,12 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
overwrite_schema: false,
writer_properties: None,
app_metadata: None,
name: None,
description: None,
configuration: Default::default(),
}
}

Expand All @@ -137,6 +149,12 @@ impl WriteBuilder {
self
}

/// Add overwrite_schema
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self {
self.overwrite_schema = overwrite_schema;
self
}

/// When using `Overwrite` mode, replace data that matches a predicate
pub fn with_replace_where(mut self, predicate: impl Into<String>) -> Self {
self.predicate = Some(predicate.into());
Expand Down Expand Up @@ -205,6 +223,31 @@ impl WriteBuilder {
self
}

/// Specify the table name. Optionally qualified with
/// a database name [database_name.] table_name.
pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}

/// Comment to describe the table.
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}

/// Set configuration on created table
pub fn with_configuration(
mut self,
configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
) -> Self {
self.configuration = configuration
.into_iter()
.map(|(k, v)| (k.into(), v.map(|s| s.into())))
.collect();
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.log_store.is_delta_table_location().await? {
true => {
Expand All @@ -229,10 +272,20 @@ impl WriteBuilder {
}?;
let mut builder = CreateBuilder::new()
.with_log_store(self.log_store.clone())
.with_columns(schema.fields().clone());
.with_columns(schema.fields().clone())
.with_configuration(self.configuration.clone());
if let Some(partition_columns) = self.partition_columns.as_ref() {
builder = builder.with_partition_columns(partition_columns.clone())
}

if let Some(name) = self.name.as_ref() {
builder = builder.with_table_name(name.clone());
};

if let Some(desc) = self.description.as_ref() {
builder = builder.with_comment(desc.clone());
};

let (_, actions, _) = builder.into_table_and_actions()?;
Ok(actions)
}
Expand All @@ -251,14 +304,19 @@ pub(crate) async fn write_execution_plan(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
let invariants = snapshot
.current_metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();

// Use input schema to prevent wrapping partitions columns into a dictionary.
let schema = snapshot.input_schema().unwrap_or(plan.schema());
let schema: ArrowSchemaRef = if overwrite_schema {
plan.schema()
} else {
snapshot.input_schema().unwrap_or(plan.schema())
};

let checker = DeltaDataChecker::new(invariants);

Expand Down Expand Up @@ -339,23 +397,26 @@ impl std::future::IntoFuture for WriteBuilder {
Ok(this.partition_columns.unwrap_or_default())
}?;

let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into();
let plan = if let Some(plan) = this.input {
Ok(plan)
} else if let Some(batches) = this.batches {
if batches.is_empty() {
Err(WriteError::MissingData)
} else {
let schema = batches[0].schema();
schema = batches[0].schema();
let table_schema = this
.snapshot
.physical_arrow_schema(this.log_store.object_store().clone())
.await
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if !can_cast_batch(schema.fields(), table_schema.fields()) {
if !can_cast_batch(schema.fields(), table_schema.fields())
&& !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite))
{
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
"Schema of data does not match table schema".to_string(),
));
};

Expand Down Expand Up @@ -390,7 +451,7 @@ impl std::future::IntoFuture for WriteBuilder {
vec![batches]
};

Ok(Arc::new(MemoryExec::try_new(&data, schema, None)?)
Ok(Arc::new(MemoryExec::try_new(&data, schema.clone(), None)?)
as Arc<dyn ExecutionPlan>)
}
} else {
Expand All @@ -415,12 +476,31 @@ impl std::future::IntoFuture for WriteBuilder {
this.write_batch_size,
this.writer_properties,
this.safe_cast,
this.overwrite_schema,
)
.await?;
actions.extend(add_actions.into_iter().map(Action::Add));

// Collect remove actions if we are overwriting the table
if matches!(this.mode, SaveMode::Overwrite) {
// Update metadata with new schema
let table_schema = this
.snapshot
.physical_arrow_schema(this.log_store.object_store().clone())
.await
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if schema != table_schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the schemas here have Eq or PartialEq on them? This might not be as straight forward as this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has PartialEq on them. This piece was also reused from src/lib.rs in the python module

let mut metadata = this
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.clone();
metadata.schema = schema.clone().try_into().unwrap();
let metadata_action = Metadata::try_from(metadata).unwrap();
actions.push(Action::Metadata(metadata_action));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's handle these unwraps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roeap Can you check if the change is ok? I think you want that it returns the error, so I used the question mark operator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly right :)

}
// This should never error, since now() will always be larger than UNIX_EPOCH
let deletion_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -445,7 +525,10 @@ impl std::future::IntoFuture for WriteBuilder {

match this.predicate {
Some(_pred) => {
todo!("Overwriting data based on predicate is not yet implemented")
return Err(DeltaTableError::Generic(
"Overwriting data based on predicate is not yet implemented"
.to_string(),
));
}
_ => {
let remove_actions = this
Expand Down
25 changes: 11 additions & 14 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,22 +468,19 @@ fn apply_stats_conversion(
data_type: &DataType,
) {
if path.len() == 1 {
match data_type {
DataType::Primitive(PrimitiveType::Timestamp) => {
let v = context.get_mut(&path[0]);

if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
.map(|n| Value::Number(serde_json::Number::from(n)));

if let Some(ts) = ts {
*v = ts;
}
if let DataType::Primitive(PrimitiveType::Timestamp) = data_type {
let v = context.get_mut(&path[0]);

if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
.map(|n| Value::Number(serde_json::Number::from(n)));

if let Some(ts) = ts {
*v = ts;
}
}
_ => { /* noop */ }
}
} else {
let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut());
Expand Down
14 changes: 9 additions & 5 deletions crates/deltalake-core/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::io::Write;
use std::sync::Arc;

use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
as_boolean_array, as_generic_binary_array, as_largestring_array, as_primitive_array,
as_string_array, Array,
};
use arrow::datatypes::{
DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
Expand Down Expand Up @@ -184,7 +185,10 @@ pub(crate) fn stringified_partition_value(
DataType::UInt16 => as_primitive_array::<UInt16Type>(arr).value(0).to_string(),
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Float32 => as_primitive_array::<Float32Type>(arr).value(0).to_string(),
DataType::Float64 => as_primitive_array::<Float64Type>(arr).value(0).to_string(),
DataType::Utf8 => as_string_array(arr).value(0).to_string(),
DataType::LargeUtf8 => as_largestring_array(arr).value(0).to_string(),
DataType::Boolean => as_boolean_array(arr).value(0).to_string(),
DataType::Date32 => as_primitive_array::<Date32Type>(arr)
.value_as_date(0)
Expand Down
13 changes: 13 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
max_rows_per_group: int,
overwrite_schema: bool,
predicate: Optional[str],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
Expand Down
Loading
Loading