Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing DeltaTable to Azure with partitioning fails and goes into endless loop w/o error message #1770

Closed
stefnba opened this issue Oct 25, 2023 · 22 comments
Labels
bug Something isn't working

Comments

@stefnba
Copy link

stefnba commented Oct 25, 2023

Environment

Delta-rs version: 0.12.0

Binding: Python

Environment:

  • Cloud provider: Azure Storage Gen2
  • OS: Apple M1 Pro with Apple Sonoma (14.0)
  • Other: Python 3.10.11

Bug

What happened:
I have a arrow dataset (~120m records, ~2.8GB as parquet files) with the following schema that I want to write to ADLS Gen2 partitioned by cols exchange_code and year:

schema = pa.schema(
    [
        pa.field("date", pa.date32()),
        pa.field("open", pa.float64()),
        pa.field("high", pa.float64()),
        pa.field("low", pa.float64()),
        pa.field("close", pa.float64()),
        pa.field("adjusted_close", pa.float64()),
        pa.field("volume", pa.int64()),
        pa.field("exchange_code", pa.string()),
        pa.field("security_code", pa.string()),
        pa.field("year", pa.int32()),
        pa.field("created_at", pa.timestamp("us")),
    ]
)

It works successfully when:

  • writing to local filesystem (takes about ~1min) using deltalake.write_deltalake
  • writing to Azure without any partitioning using deltalake.write_deltalake and arg storage_options
  • using pyarrow.dataset.write_dataset with partitions and adlfs.AzureBlobFileSystem as filesystem arg
  • writing only the first 1-5m records to Azure using deltalake.write_deltalake and above mentioned partitioning

But not when:

  • writing full dataset to Azure using deltalake.write_deltalake with above mentioned partitioning.

There is no error message - it just goes on and on forever w/o creating any dir or files and also no error message appears. In some cases it created 2-3 partition directories at the beginning but then nothing happens thereafter.

Column exchange_code has 15 distinct values and year ~10-80 distinct values depending on the exchange.

What you expected to happen:
DeltaTable to be created successfully.

How to reproduce it:

Here is my code:

import os
import polars as pl
import pyarrow.dataset as ds
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
client = os.getenv("AZURE_CLIENT_ID")
tenant = os.getenv("AZURE_TENANT_ID")
secret = os.getenv("AZURE_CLIENT_SECRET")

storage_options = {"account_name": account_name, "client_id": client, "client_secret": secret, "tenant_id": tenant}

schema = pa.schema(
    [
        pa.field("date", pa.date32()),
        pa.field("open", pa.float64()),
        pa.field("high", pa.float64()),
        pa.field("low", pa.float64()),
        pa.field("close", pa.float64()),
        pa.field("adjusted_close", pa.float64()),
        pa.field("volume", pa.int64()),
        pa.field("exchange_code", pa.string()),
        pa.field("security_code", pa.string()),
        pa.field("year", pa.int32()),
        pa.field("created_at", pa.timestamp("us")),
    ]
)

dataset = ds.dataset(source="arrow_dataset", format="parquet", schema=schema)

write_deltalake(
    data=dataset.to_batches(),
    table_or_uri="abfs://temp/testDelta",
    storage_options=storage_options,
    schema=schema,
    partition_by=["exchange_code", "year"],
)

If the dataset is needed, please let me know.

More details:
I have tried different options for args max_rows_per_file, max_rows_per_group, min_rows_per_group, max_open_files but also no success.

@stefnba stefnba added the bug Something isn't working label Oct 25, 2023
@ion-elgreco
Copy link
Collaborator

@stefnba which PyArrow version are you using?

@stefnba
Copy link
Author

stefnba commented Nov 2, 2023

PyArrow version 12 and just tried version 14 but same issue. Somehow I cannot install version 13 for some reason (need to investigate why when I have more time).

Fyi, when I use PyArrow directly to write a partitioned dataset to Azure, it works fine.

@ion-elgreco
Copy link
Collaborator

@roeap it seems the issue was already flagged by @stefnba.

Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

@wjones127
Copy link
Collaborator

Hmm I'm guessing there's some sort of GIL deadlock. If there's some way to reliably repro, then it wouldn't be hard to attach lldb and confirm where it's happening. (We'd have to run with a development version of the library to get the symbols though).

Though what I don't get is creating a directory is a no-op here:

fn create_dir(&self, _path: String, _recursive: bool) -> PyResult<()> {

But you are seeing partition directories being made?

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Nov 10, 2023

@wjones127 maybe we can have a chat on how I could execute that debugging library, I have one pipeline this is occurring on, I could probably recreate the issue with dummy data.

Also check this discussion on slack: https://delta-users.slack.com/archives/C013LCAEB98/p1699482282569879?thread_ts=1699481918.955769&cid=C013LCAEB98

@stefnba
Copy link
Author

stefnba commented Nov 10, 2023

@roeap it seems the issue was already flagged by @stefnba.

Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

@ion-elgreco Thanks for following up on this issue.

I'm on vacation and don't have access to our repo, so I can't copy paste the exact code that's working but it should be close to the following.

I'm using adlfs as an fsspec-compatible filesystem interface.

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

filesystem = AzureBlobFileSystem(account_name="my_account_name", anon=False)

# read local
input_ds = ds.dataset("path/to/local/ds")

# write to Azure using adlfs filesystem
ds.write_dataset(
	data=input_ds,
	base_dir="container/blob",
	filesystem=filesystem,
	partitioning_flavor="hive",
	partitioning=["col1", "col2"]
)

Hope that helps. I'm back in two weeks and then able to provide the working code.

@stefnba
Copy link
Author

stefnba commented Nov 10, 2023

But you are seeing partition directories being made?

@wjones127 In some cases 1-2 partition dirs are created and a couple of sub dirs, in other cases none are created. Files are never created, all dirs are empty.

@ion-elgreco
Copy link
Collaborator

@roeap it seems the issue was already flagged by @stefnba.
Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

@ion-elgreco Thanks for following up on this issue.

I'm on vacation and don't have access to our repo, so I can't copy paste the exact code that's working but it should be close to the following.

I'm using adlfs as an fsspec-compatible filesystem interface.

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

filesystem = AzureBlobFileSystem(account_name="my_account_name", anon=False)

# read local
input_ds = ds.dataset("path/to/local/ds")

# write to Azure using adlfs filesystem
ds.write_dataset(
	data=input_ds,
	base_dir="container/blob",
	filesystem=filesystem,
	partitioning_flavor="hive",
	partitioning=["col1", "col2"]
)

Hope that helps. I'm back in two weeks and then able to provide the working code.

Ok then two temp workarounds are to pass a AzureBlobFileSystem to write_deltalake, since that is than passed to pyarrow dataset or you limit the max_files_open

@stefnba
Copy link
Author

stefnba commented Nov 10, 2023

Ok then two temp workarounds are to pass a AzureBlobFileSystem to write_deltalake, since that is than passed to pyarrow dataset or you limit the max_files_open

Thanks for the suggestion. I remember that I tried to do that but when passing the arg filesystem to write_deltalake, it throws a NotImplementedError.

I guess it's because of this line in python/deltalake/writer.py:

if filesystem is not None:
        raise NotImplementedError("Filesystem support is not yet implemented.  #570")

Is there another way? Fyi, I’ve not fully read and tried some of the suggestions in #570 back then.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Nov 10, 2023

Hmm I missed that, I thought it was implemented. Then I'll need to look a bit deeper at the code there.

In the linked issue, Will mentions that it is possible to use object store file systems though.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Nov 14, 2023

@stefnba for me it consistently it happens more often on pyarrow v8 then on pyarrow v14, I need to find some time to debug it. Will gave some guidance to start doing that, but at least I can get it to occur more quickly with pyarrow v8

@r3stl355
Copy link
Contributor

I wanted to re-create this issueon my side but so far I am unable.

I tried matching @stefnba setup as close as I could (on Mac M2, python 3.10.7, generated dataset with a narrower schema - only 3 columns but with 800 million rows and about 900 total partitions over 2 columns).

Tried with different versions of pyarrow (from v14 down to v9, v8 causes segmentation error), also tried with both maturin develop and maturin build options.

Single write takes about 5 minutes, i'm running it in a loop now with pyarrow v9 and it's on attempt 9 - still going. What else can I try to cause the failure?

@ion-elgreco
Copy link
Collaborator

@r3stl355 can you try writing this from a databricks notebook to ADLS?

Im mainly running into the issue with Azure<->Azure connections

@r3stl355
Copy link
Contributor

Running it now on Azure Databricks notebook - still the same @ion-elgreco - started a loop with pyarrow v8 (deltalake is latest though, 0.13). A bit faster than on my laptop and no issues 🤷

Running command...
- Attempt 0: 2023-11-21 13:32:36.625640
- Attempt 1: 2023-11-21 13:35:17.566879
- Attempt 2: 2023-11-21 13:37:58.235321
- Attempt 3: 2023-11-21 13:40:42.188006
- Attempt 4: 2023-11-21 13:43:24.954625
- Attempt 5: 2023-11-21 13:46:07.584661
- Attempt 6: 2023-11-21 13:48:47.521191

@ion-elgreco
Copy link
Collaborator

@r3stl355 can you share the exact code you're running?

Is the databricks instance behind a vnet?

@r3stl355
Copy link
Contributor

Yes, it is VNET-injected workspace.

Also, looks like @stefnba is having similar problem running from his local machine so the issue may not be unique to Azure -> Azure

@r3stl355
Copy link
Contributor

I'm running this (on Databricks, my source parquet files are in DBFS root)

%pip install deltalake
import os
import datetime as dt
import pyarrow.dataset as ds
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

SCOPE = "delta-rs"
account_name = dbutils.secrets.get(SCOPE, "AZURE_STORAGE_ACCOUNT_NAME")
client = dbutils.secrets.get(SCOPE, "AZURE_CLIENT_ID")
tenant = dbutils.secrets.get(SCOPE, "AZURE_TENANT_ID")
secret = dbutils.secrets.get(SCOPE, "AZURE_CLIENT_SECRET")

storage_options = {"account_name": account_name, "client_id": client, "client_secret": secret, "tenant_id": tenant}
schema = pa.schema(
    [
        pa.field("date", pa.timestamp("us")),
        pa.field("code", pa.string()),
        pa.field("year", pa.int64()),
    ]
)

# Read local dataset (Parquet)
SOURCE_DIR = "/dbfs/parquet"
dataset = ds.dataset(SOURCE_DIR)

target_uri = f"abfs://temp/from_databricks"
test_count = 10
for i in range(0, test_count):
    print(f"- Attempt {i}: {dt.datetime.now()}")
    write_deltalake(
        data=dataset.to_batches(),
        table_or_uri=target_uri,
        mode="overwrite",
        storage_options=storage_options,
        schema=schema,
        partition_by=["code", "year"],
    )

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Nov 21, 2023

@r3stl355 can you try this:

Also, I am doing this with pyarrow v8, it already hangs on the first/second iteration.

import pyarrow as pa
import polars as pl
import os
from azureml.opendatasets import NycTlcGreen
from datetime import datetime
from dateutil import parser

start_date = parser.parse('2018-01-01')
end_date = parser.parse('2019-12-31')
nyc_tlc = NycTlcGreen(start_date=start_date, end_date=end_date)
nyc_tlc_df = pl.from_pandas(nyc_tlc.to_pandas_dataframe())

nyc_tlc_df = nyc_tlc_df.with_columns(
    pl.col('lpepPickupDatetime').dt.strftime("%G%m").alias('year_month'),
    pl.all().cast(pl.Utf8)
)

for i in range (250):
    nyc_tlc_df.write_delta(
        os.path.join(root_dir, '_TEMP_WRITE_DEBUG'), storage_options=storage_options, mode='overwrite', delta_write_options={"partition_by": ['year_month']}
    )
    print(i)

@r3stl355
Copy link
Contributor

Is this on Databricks? I get

/local_disk0/.ephemeral_nfs/envs/pythonEnv-2e004da5-e00b-47ff-b78a-3bcba4398610/lib/python3.10/site-packages/azureml/opendatasets/dataaccess/_blob_accessor.py:519: Warning: Please install azureml-dataset-runtimeusing pip install azureml-dataset-runtime
  warnings.warn(
NotImplementedError: Linux distribution ubuntu 22.04 does not have automatic support. 
Missing packages: {'liblttng-ust.so.0'}
.NET Core 3.1 can still be used via `dotnetcore2` if the required dependencies are installed.
Visit https://aka.ms/dotnet-install-linux for Linux distro specific .NET Core install instructions.
Follow your distro specific instructions to install `dotnet-runtime-*` and replace `*` with `3.1.23`.

@ion-elgreco
Copy link
Collaborator

Is this on Databricks? I get

/local_disk0/.ephemeral_nfs/envs/pythonEnv-2e004da5-e00b-47ff-b78a-3bcba4398610/lib/python3.10/site-packages/azureml/opendatasets/dataaccess/_blob_accessor.py:519: Warning: Please install azureml-dataset-runtimeusing pip install azureml-dataset-runtime
  warnings.warn(
NotImplementedError: Linux distribution ubuntu 22.04 does not have automatic support. 
Missing packages: {'liblttng-ust.so.0'}
.NET Core 3.1 can still be used via `dotnetcore2` if the required dependencies are installed.
Visit https://aka.ms/dotnet-install-linux for Linux distro specific .NET Core install instructions.
Follow your distro specific instructions to install `dotnet-runtime-*` and replace `*` with `3.1.23`.

No actually in a AzureML VM, Databricks was giving me the same issue even though Azure docs said it would work on databricks :S

@r3stl355
Copy link
Contributor

Quick update in case someone wants to follow up (I don't know how much time I'll have in the next few days). After lots of back and forth between different environments we now have a single parquet file which is consistently getting stuck on attempt to write to Azure storage.
File is not large - about 200 MB and I'm only creating 23 partition folder, each file in the single partition about 15 MB. When I did a first run, it created one file and one partition folder and got stuck. This is using pyarrow v9. I'll try with other versions.

Parquet file is created from NYC taxi data as suggested by @ion-elgreco so I can put for download somewhere if any is interested in trying out, together with the code.

When it's stuck, terminal becomes non-responsive to Ctrl+C - is that a sign that the main thread is stuck waiting for a lock or IO?

@r3stl355
Copy link
Contributor

the latest:

  • looks like the issue happens here on attempt to close the stream, once one of the threads is blocked, everything stops:
    py.allow_threads(|| match self.rt.block_on(self.writer.shutdown()) {
  • usually it is intermittent but today it happens for every run, I am not sure if it is related to Azure resources or my laptop resources
  • i see that all writes to stream in that struct succeed - I put logs at the start and end of writes and can see all call to write succeed

ion-elgreco pushed a commit that referenced this issue Nov 29, 2023
# Description
Current implementation of `ObjectOutputStream` does not invoke flush
when writing out files to Azure storage which seem to cause intermittent
issues when the `write_deltalake` hangs with no progress and no error.

I'm adding a periodic flush to the write process, based on the written
buffer size, which can be parameterized via `storage_options` parameter
(I could not find another way without changing the interface). I don't
know if this is an acceptable approach (also, it requires string values)

Setting the `"max_buffer_size": f"{100 * 1024}"` in `storage_options`
passed to `write_deltalake` helps me resolve the issue with writing a
dataset to Azure which was otherwise failing constantly.

Default max buffer size is set to 4MB which looks reasonable and used by
other implementations I've seen (e.g.
https://github.com/fsspec/filesystem_spec/blob/3c247f56d4a4b22fc9ffec9ad4882a76ee47237d/fsspec/spec.py#L1577)

# Related Issue(s)
Can help with resolving #1770

# Documentation
If the approach is accepted then I need to find the best way of adding
this to docs

---------

Signed-off-by: Nikolay Ulmasov <[email protected]>
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Dec 1, 2023
# Description
Current implementation of `ObjectOutputStream` does not invoke flush
when writing out files to Azure storage which seem to cause intermittent
issues when the `write_deltalake` hangs with no progress and no error.

I'm adding a periodic flush to the write process, based on the written
buffer size, which can be parameterized via `storage_options` parameter
(I could not find another way without changing the interface). I don't
know if this is an acceptable approach (also, it requires string values)

Setting the `"max_buffer_size": f"{100 * 1024}"` in `storage_options`
passed to `write_deltalake` helps me resolve the issue with writing a
dataset to Azure which was otherwise failing constantly.

Default max buffer size is set to 4MB which looks reasonable and used by
other implementations I've seen (e.g.
https://github.com/fsspec/filesystem_spec/blob/3c247f56d4a4b22fc9ffec9ad4882a76ee47237d/fsspec/spec.py#L1577)

# Related Issue(s)
Can help with resolving delta-io#1770

# Documentation
If the approach is accepted then I need to find the best way of adding
this to docs

---------

Signed-off-by: Nikolay Ulmasov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants