-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support merge operation for Delta tables #11983
Comments
@ion-elgreco is working on this. We are blocked by delta-io/delta-rs#1602 |
I think this is a different request. @edgBR wants to see support for delta operations inside Polars but I don't see how this would work. The only thing you need is your polars df into pyarrow format. Also important to note MERGE currently does not work with large arrow types. This should probably work once logical plans are build instead of directly a physical plan for the MERGE operation. So I think you always will need to manually convert your polars df to arrow. Regarding the PyArrow 13, I have looked at it but haven't figured out yet how to fix it quickly. So, I think we'll have to wait until @wjones127 has some time to look at it. In the meantime, if you want to write polars dataframes with arrow large_dtypes like this: import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake
df = pl.DataFrame() # your df
table = df.with_columns(cs.datetime().cast(pl.Datetime(time_unit='us')).to_arrow()
write_deltalake(table, mode='append', large_dtypes=True) |
Hi @ion-elgreco so your suggestion is:
Is this correct? At the end I am basically trying to refactor some pyspark code into polars + deltalake. I have a library that works for both streaming data and batch files but in my company I am the only one with basic pyspark expertise so I have decided to go with an option with higher chances of adaptation, the code is as follows: from delta.tables import DeltaTable
from pyspark.sql.columns import Column as col
class DataLoader:
...
def read_data():
df = (self.spark.readStream
.format(self.format) #delta format
.options(**self.options)) # ignoreChanges:true is needed
df = df.load()
return df
def merge_delta_data(input_df : DataFrame,
batchId: int,
catalog_name: str,
db_name: str,
table_name: str,
merge_condition: str,
partition_column: str,
row_number_partition_by_cols: list,
row_number_ordered_column):
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled"," true")
spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "true")
fqn_table = f"{catalog_name}.{db_name}.{table_name}"
if (spark.jsparkSession.catalog().tableExists(fqn_table)):
deltaTable=DeltaTable.forName(spark, fqn_table)
deltaTable.alias("tgt").merge(
(input_df.select("*").F.row_number().over(
Window.partitionBy(row_number_partition_by_cols).
orderBy(row_number_ordered_column))
.alias("rownum")))
.where("rownum==1").alias("src"),
merge_condition) \
.whenMatchedUpdateAll() \ #if condition matches we updated
.whenNotMatchedInsertAll() \ #if these are new rows we just insert
execute()
)
)
else:
### we create delta table here if this doesnt exist
...
def process_silver_data():
self.read_data()
return (df.WriteStream.format("delta")
.outputMode("append")
.queryName("Incremental Merge")
.foreachBatch(lambda dfx, epochId: self.merge_delta_data(...))
.trigger(availableNow=True) #when process finish, cluster shut downs
### this is equivalent to trigger(once=True) in old spark versions
.option("checkpointLocation", self.checkpoint_location))
....
merge_condition = """src.processed_date = tgt.processed_data and
src.entity_code = tgt.entity_code
src.debtor_id = tgt.debtor_id
src.case_id = tgt.case_id
src.portfolio_id = tgt.portfolio_id
src.snapshot_datetime > tgt.snapshot_datetime"""
row_number_partition_by_cols = ["country", "portfolio_id"]
row_number_ordered_column = col("processed_datetime").desc() #pyspark.sql.column.Columns
df = DataLoader().process_silver_data() |
@edgBR that's correct! |
Got it, Do you think then we should keep this open? I mean from my side it would be AWESOME to do this directly from polars, but I leave it up to you guys. |
We could maybe add on Polars dataframe this method: This would then return the deltalake class TableMerger where you can add all the when clauses. @stinodego what do you think? |
Hi @ion-elgreco This might be a good suggestion always that the mergeschema option gets added as well to the deltalake TableMerger class in future. Do you know if someone is working on this? Maybe we could ask what are the plans. |
I don't know of anyone working on it. I have a PR open to allow for additional when clauses in Python Merge. I would suggest creating an issue for schema evolution in merge in delta-rs repo |
Hm, so it seems I misunderstood the original request. I thought you were calling for an update to It's still not clear to me exactly what you want. Could you provide explain what exactly you want to achieve and add a code example of what the Polars syntax would look like? |
Hi @stinodego Pseudo code below, with a new method called merge_deltalake(): import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake
table_target = table_source = pl.DataFrame({
'date': [datetime(2022, 1, 1),
datetime(2022, 1, 2),
datetime(2022, 1, 3)],
'max_temperature': [32.6,29.3, 30.4]
})
table_target.write_delta(target='./my_delta_table', mode='append')
table_source = pl.DataFrame({
'date': [datetime(2022, 1, 3),
datetime(2022, 1, 4),
datetime(2022, 1, 5)],
'max_temperature': [33.2,34, 35]
})
table_target = pl.read_delta(source='my_delta_table')
table_target.merge_deltalake(source=table_source,
merge_condition="src.date==tgt.date",
source_alias="src",
target_alias="tgt",
schema_evolution="autoMerge") \
.when_matched_update_all() \
.when_not_matched_insert_all()) Or updating write_delta(): import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake
table_target = table_source = pl.DataFrame({
'date': [datetime(2022, 1, 1),
datetime(2022, 1, 2),
datetime(2022, 1, 3)],
'max_temperature': [32.6,29.3, 30.4]
})
table_target.write_delta(target='./my_delta_table', mode='append')
table_source = pl.DataFrame({
'date': [datetime(2022, 1, 3),
datetime(2022, 1, 4),
datetime(2022, 1, 5)],
'max_temperature': [33.2,34, 35]
})
table_target = pl.read_delta(source='my_delta_table')
table_target.write_delta(source=table_source,
mode="merge",
delta_write_options={"merge_condition" : "src.date==tgt.date",
"source_alias":"src",
"target_alias":"tgt",
"schema_evolution":"autoMerge") \
.when_matched_update_all() \
.when_not_matched_insert_all()) And of course similar functionality for when_not_matched_delete etc.... @stinodego let me know if I managed to explain my self properly and thanks for the support. |
It would make sense to me to update But since it looks like you have both DataFrames in memory, why not do the required computation using the Polars API, and then do a |
@stinodego but it's not just a write operation, also then if it's merge you need to add more inputs and then we have a function that serves 2 purposes. With write it doesn't return anything but with merge it returns delta lake class |
What's the |
TableMerger contains a set of class methods that allow you define how records should be inserted, updated, or deleted. https://delta-io.github.io/delta-rs/python/api_reference.html#deltalake.table.TableMerger I have some better examples here: https://delta.io/blog/2023-10-22-delta-rs-python-v0.12.0/ |
@ion-elgreco Thanks, that blog post really clarifies it for me. For UPDATE and DELETE, I think the For the merge operation, the current workflow clearly isn't ideal, having to go through Arrow. What would make sense to me is to add additional functionality to This would not return a Does this make sense? |
@stinodego hmm I think that could be possible, however we would have to allow users to provide inputs for multiple when calls. So I recently added the ability to do Let me look into this on Sunday :) Feel free to assign it to me! 😄 |
Great! This was just a suggestion - if you figure out something else that works better, I'm open for it. Curious to see what you come up with. |
While sleeping it over I see two ways on doing everything within Option 1: key value args for function and their inputsThe user needs to provide the function call als key and then then the function parameters as key value but in list format, since it is possible to do multiple when calls (introduced in this PR: delta-io/delta-rs#1750). df = pl.DataFrame({"id":['1', '2'], "foo":['hello :)', 'Good bye']})
delta_merge_options = {
'predicate':'s.id = t.id',
"source_alias": 's',
'target_alias': 't',
"when_matched_update": {
"predicate": ["s.id = t.id", "s.id_2 = t.id2"],
"updates": [{ "s.id":"t.id", "s.foo":"t.foo"}, {"s.id2":"t.id2", "s.bar":"t.bar"}]
},
"when_not_matched_insert_all": {
"predicate": "s.id > 5"
}
}
df.write_delta('TEST_TABLE', mode='merge', delta_merge_options=delta_merge_options) Option 2: positional argsRequires user to know the order of the inputs delta_merge_options = {
'predicate':'s.id = t.id',
"source_alias": 's',
'target_alias':'t',
"when_matched_update": [({"s.id":"t.id", "s.foo":"t.foo"}, "s.id = t.id"),
({"s.id2":"t.id2", "s.bar":"t.bar"}, "s.id_2 = t.id2")],
"when_not_matched_insert_all": ("s.id > 5")
}
df.write_delta('TEST_TABLE', mode='merge', delta_merge_options=delta_merge_options) Option 3: chain method callsdelta_merge_options = {'predicate':'s.id = t.id', "source_alias": 's', 'target_alias': 't'}
(
df
.write_delta(
'TEST_TABLE',
mode='merge',
delta_merge_options=delta_merge_options
)
.when_matched_update(updates={"s.id":"t.id", "s.foo":"t.foo"}, predicate="s.id = 1")
.when_matched_update(updates={"s.id":"t.id", "s.bar":"t.bar"}, predicate="s.id > 1")
.when_not_matched_insert_all()
.execute()
) @stinodego Let me know what you think! 😄 |
Hi @ion-elgreco Of course I am biased against 3 as I proposed: table_target.write_delta(source=table_source,
mode="merge",
delta_write_options={"merge_condition" : "src.date==tgt.date",
"source_alias":"src",
"target_alias":"tgt",
"schema_evolution":"autoMerge") \
.when_matched_update_all() \
.when_not_matched_insert_all()) But I think if you are coming from pyspark this would increase adoption. Also option 1 and 2 seems to me a bit funky and easy to mess up with closing "" and dicts. Also as you mentioned to return the TableMerger isnt it Option3 the easiest one to implement? Please bear in mind that I'm ages away from your knowledge in how deltalake is built internally. BR |
Let's go with option 3. And we should make sure that I initially hesitated on returning a non-Polars object in our API, but then again we do the same in Do we need a separate |
Great :) I will then also add good examples of the behaviors! I also like the separate one better, otherwise it implies those write parameters could be similar to the merge parameters. |
Hi @ion-elgreco and @stinodego I know that this is closed and still I have not adapted my code to the new version but I have been able to rewrite mostly everything for batch files: landing to bronze: def landing_to_bronze(self) -> None:
"""
This method converts parquet files into append only delta tables.
"""
try:
self.logger.info(f"Reading raw landing file {self.args.landing_file_name}")
df = (
pl.read_parquet(source=f"""abfss://{self.landing_container}/
{self.args.landing_file_name}""",
storage_options=self.storage_credentials)
.with_columns(insertion_time=datetime.now())
)
## schema enforcement and data quality checks here
self.logger.info(f"Reading raw landing file {self.args.landing_file_name}")
df.write_delta(
target=f'abfss://{self.bronze_container}/{self.bronze_table}',
mode='append',
storage_options=self.storage_credentials
)
except Exception as e:
self.logger.error(f"Failed to append file {self.args.landing_file_name} to append layer")
raise e
def bronze_to_silver(self) -> None:
"""
This method merges incrementally the data to the silver table it also
performs deduplication.
Raises
------
e
A generic error explaining why the merge failed.
"""
try:
bronze_df = pl.read_delta(
source=f'abfss://{self.bronze_container}/{self.bronze_table}',
storage_options=self.storage_credentials)
bronze_df_no_duplicates = (
bronze_df
.with_columns(pl.col("insertion_time")
.rank("ordinal", descending=True)
.over('payment_id')
.alias('rownumber'))
.sort(pl.col('rownumber'))
.filter(pl.col('rownumber')==1)
)
silver_check = self._table_checker(container=self.silver_table,
table_name=self.silver_table)
if silver_check:
self.logger.info("Merging new data into silver")
silver_df = DeltaTable(
table_uri=f'abfss://{self.silver_container}/{self.silver_table}',
storage_options=self.storage_credentials
)
(
silver_df.merge(
source=bronze_df_no_duplicates.to_arrow(),
predicate="s.primary_key = t.primary_key and s.insertion_time > t.insertion_time",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
self.logger.info("Optimizing by Z order for silver table")
silver_df.optimize.z_order(['primary_key'])
self.logger.info(f'History of operations: {silver_df.get_add_actions().to_pandas()}')
else:
self.logger.info("Because silver table is empty we save the first bronze file as silver")
bronze_df.write_delta(
target=f'abfss://{self.silver_container}/{self.silver_table}',
mode='append',
storage_options=self.storage_credentials
)
except Exception as e:
self.logger.error(f"Failed to merge {self.args.landing_file_name} to silver table")
raise e |
Description
Hi,
Lets ditch spark, once for all. Just kidding, now seriously, delta-rs new release allows for update, delete and merge operations in the python bindings.
I have checked that polars.DataFrame.write_delta() supports different write modes (append, overwrite etc...) but there is not way to merge data incrementally. It would be amazing to have this.
The text was updated successfully, but these errors were encountered: