-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Writing DeltaTable to Azure with partitioning fails and goes into endless loop w/o error message #1770
Comments
@stefnba which PyArrow version are you using? |
PyArrow version 12 and just tried version 14 but same issue. Somehow I cannot install version 13 for some reason (need to investigate why when I have more time). Fyi, when I use PyArrow directly to write a partitioned dataset to Azure, it works fine. |
Hmm I'm guessing there's some sort of GIL deadlock. If there's some way to reliably repro, then it wouldn't be hard to attach Though what I don't get is creating a directory is a no-op here: delta-rs/python/src/filesystem.rs Line 86 in 9b93830
But you are seeing partition directories being made? |
@wjones127 maybe we can have a chat on how I could execute that debugging library, I have one pipeline this is occurring on, I could probably recreate the issue with dummy data. Also check this discussion on slack: https://delta-users.slack.com/archives/C013LCAEB98/p1699482282569879?thread_ts=1699481918.955769&cid=C013LCAEB98 |
@ion-elgreco Thanks for following up on this issue. I'm on vacation and don't have access to our repo, so I can't copy paste the exact code that's working but it should be close to the following. I'm using import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem
filesystem = AzureBlobFileSystem(account_name="my_account_name", anon=False)
# read local
input_ds = ds.dataset("path/to/local/ds")
# write to Azure using adlfs filesystem
ds.write_dataset(
data=input_ds,
base_dir="container/blob",
filesystem=filesystem,
partitioning_flavor="hive",
partitioning=["col1", "col2"]
) Hope that helps. I'm back in two weeks and then able to provide the working code. |
@wjones127 In some cases 1-2 partition dirs are created and a couple of sub dirs, in other cases none are created. Files are never created, all dirs are empty. |
Ok then two temp workarounds are to pass a AzureBlobFileSystem to write_deltalake, since that is than passed to pyarrow dataset or you limit the max_files_open |
Thanks for the suggestion. I remember that I tried to do that but when passing the arg I guess it's because of this line in if filesystem is not None:
raise NotImplementedError("Filesystem support is not yet implemented. #570") Is there another way? Fyi, I’ve not fully read and tried some of the suggestions in #570 back then. |
Hmm I missed that, I thought it was implemented. Then I'll need to look a bit deeper at the code there. In the linked issue, Will mentions that it is possible to use object store file systems though. |
@stefnba for me it consistently it happens more often on pyarrow v8 then on pyarrow v14, I need to find some time to debug it. Will gave some guidance to start doing that, but at least I can get it to occur more quickly with pyarrow v8 |
I wanted to re-create this issueon my side but so far I am unable. I tried matching @stefnba setup as close as I could (on Mac M2, python 3.10.7, generated dataset with a narrower schema - only 3 columns but with 800 million rows and about 900 total partitions over 2 columns). Tried with different versions of pyarrow (from v14 down to v9, v8 causes segmentation error), also tried with both Single write takes about 5 minutes, i'm running it in a loop now with pyarrow v9 and it's on attempt 9 - still going. What else can I try to cause the failure? |
@r3stl355 can you try writing this from a databricks notebook to ADLS? Im mainly running into the issue with Azure<->Azure connections |
Running it now on Azure Databricks notebook - still the same @ion-elgreco - started a loop with pyarrow v8 (deltalake is latest though, 0.13). A bit faster than on my laptop and no issues 🤷
|
@r3stl355 can you share the exact code you're running? Is the databricks instance behind a vnet? |
Yes, it is VNET-injected workspace. Also, looks like @stefnba is having similar problem running from his local machine so the issue may not be unique to Azure -> Azure |
I'm running this (on Databricks, my source parquet files are in DBFS root)
|
@r3stl355 can you try this: Also, I am doing this with pyarrow v8, it already hangs on the first/second iteration. import pyarrow as pa
import polars as pl
import os
from azureml.opendatasets import NycTlcGreen
from datetime import datetime
from dateutil import parser
start_date = parser.parse('2018-01-01')
end_date = parser.parse('2019-12-31')
nyc_tlc = NycTlcGreen(start_date=start_date, end_date=end_date)
nyc_tlc_df = pl.from_pandas(nyc_tlc.to_pandas_dataframe())
nyc_tlc_df = nyc_tlc_df.with_columns(
pl.col('lpepPickupDatetime').dt.strftime("%G%m").alias('year_month'),
pl.all().cast(pl.Utf8)
)
for i in range (250):
nyc_tlc_df.write_delta(
os.path.join(root_dir, '_TEMP_WRITE_DEBUG'), storage_options=storage_options, mode='overwrite', delta_write_options={"partition_by": ['year_month']}
)
print(i) |
Is this on Databricks? I get
|
No actually in a AzureML VM, Databricks was giving me the same issue even though Azure docs said it would work on databricks :S |
Quick update in case someone wants to follow up (I don't know how much time I'll have in the next few days). After lots of back and forth between different environments we now have a single parquet file which is consistently getting stuck on attempt to write to Azure storage. Parquet file is created from NYC taxi data as suggested by @ion-elgreco so I can put for download somewhere if any is interested in trying out, together with the code. When it's stuck, terminal becomes non-responsive to Ctrl+C - is that a sign that the main thread is stuck waiting for a lock or IO? |
the latest:
|
# Description Current implementation of `ObjectOutputStream` does not invoke flush when writing out files to Azure storage which seem to cause intermittent issues when the `write_deltalake` hangs with no progress and no error. I'm adding a periodic flush to the write process, based on the written buffer size, which can be parameterized via `storage_options` parameter (I could not find another way without changing the interface). I don't know if this is an acceptable approach (also, it requires string values) Setting the `"max_buffer_size": f"{100 * 1024}"` in `storage_options` passed to `write_deltalake` helps me resolve the issue with writing a dataset to Azure which was otherwise failing constantly. Default max buffer size is set to 4MB which looks reasonable and used by other implementations I've seen (e.g. https://github.com/fsspec/filesystem_spec/blob/3c247f56d4a4b22fc9ffec9ad4882a76ee47237d/fsspec/spec.py#L1577) # Related Issue(s) Can help with resolving #1770 # Documentation If the approach is accepted then I need to find the best way of adding this to docs --------- Signed-off-by: Nikolay Ulmasov <[email protected]>
# Description Current implementation of `ObjectOutputStream` does not invoke flush when writing out files to Azure storage which seem to cause intermittent issues when the `write_deltalake` hangs with no progress and no error. I'm adding a periodic flush to the write process, based on the written buffer size, which can be parameterized via `storage_options` parameter (I could not find another way without changing the interface). I don't know if this is an acceptable approach (also, it requires string values) Setting the `"max_buffer_size": f"{100 * 1024}"` in `storage_options` passed to `write_deltalake` helps me resolve the issue with writing a dataset to Azure which was otherwise failing constantly. Default max buffer size is set to 4MB which looks reasonable and used by other implementations I've seen (e.g. https://github.com/fsspec/filesystem_spec/blob/3c247f56d4a4b22fc9ffec9ad4882a76ee47237d/fsspec/spec.py#L1577) # Related Issue(s) Can help with resolving delta-io#1770 # Documentation If the approach is accepted then I need to find the best way of adding this to docs --------- Signed-off-by: Nikolay Ulmasov <[email protected]>
Environment
Delta-rs version: 0.12.0
Binding: Python
Environment:
Bug
What happened:
I have a arrow dataset (~120m records, ~2.8GB as parquet files) with the following schema that I want to write to ADLS Gen2 partitioned by cols
exchange_code
andyear
:It works successfully when:
deltalake.write_deltalake
deltalake.write_deltalake
and argstorage_options
pyarrow.dataset.write_dataset
with partitions andadlfs.AzureBlobFileSystem
as filesystem argdeltalake.write_deltalake
and above mentioned partitioningBut not when:
deltalake.write_deltalake
with above mentioned partitioning.There is no error message - it just goes on and on forever w/o creating any dir or files and also no error message appears. In some cases it created 2-3 partition directories at the beginning but then nothing happens thereafter.
Column
exchange_code
has 15 distinct values andyear
~10-80 distinct values depending on the exchange.What you expected to happen:
DeltaTable to be created successfully.
How to reproduce it:
Here is my code:
If the dataset is needed, please let me know.
More details:
I have tried different options for args
max_rows_per_file
,max_rows_per_group
,min_rows_per_group
,max_open_files
but also no success.The text was updated successfully, but these errors were encountered: