Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Automatically convert Pandas types to valid Delta Lake types in write_deltalake() #686

Closed
wjones127 opened this issue Jul 11, 2022 · 10 comments · Fixed by #1820
Closed
Labels
binding/python Issues for the Python package enhancement New feature or request good first issue Good for newcomers

Comments

@wjones127
Copy link
Collaborator

Description

Many Pandas types aren't automatically converted into valid Delta Lake types when converted into Arrow tables. For example, Pandas Timestamps are converted into timestamps with nanosecond precision by default, but Delta Lake only supports microsecond precision. This makes write_deltalake() difficult to use for Pandas users.

We should write a test that validates all Pandas types can be written with write_deltalake() without manual conversion.

I'm not sure yet how to configure the conversion here:

if _has_pandas and isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data)

It's possible that we can pass in an adjusted schema to the schema parameter of pyarrow.Table.from_pandas() and that will make the correct conversion.

Use Case

Related Issue(s)

Based on #685

@wjones127 wjones127 added enhancement New feature or request good first issue Good for newcomers labels Jul 11, 2022
wjones127 added a commit that referenced this issue Dec 1, 2022
# Description
As described in #686 some pandas datatypes are not converted to a format
that is compatible with delta lake. This handles the instance of
timestamps, which are stored with `ns` resolution in Pandas. Here, if is
a schema is not provided, we specify converting the timestamps to `us`
resolution.

We also update `python/tests/test_writer.py::test_write_pandas` to
reflect this change.

# Related Issue(s)
#685

Co-authored-by: Will Jones <[email protected]>
@blaze225
Copy link

Would appreciate if this can be prioritized. Right now this is forcing us to use spark over delta-rs.

@ion-elgreco
Copy link
Collaborator

This also happens when you write delta from Polars with columns with nano precision datetime. However it's slightly more easy to circumvent you just have to do the casting first to micro precision.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Sep 24, 2023

Would appreciate if this can be prioritized. Right now this is forcing us to use spark over delta-rs.

@blaze225 You can also switch to polars, which casts the dtypes correctly to a delta compatible schema: https://github.com/pola-rs/polars/pull/10165/files#diff-843e4fa7334b1cfcdf4ebe039377c0d724d0abb51bcde68c9aaae1b93868e20b

@thehappycheese
Copy link

I made this as a stopgap solution. Its a dumb solution but it helped me actually get it to write and test out the library.

import deltalake as dl
from deltalake import DeltaTable
from typing import Union

def strip_categorical(df:pd.DataFrame):
    """convert categorical columns back into integer types,
    and return a dataframe of the categories
    
    Example:

    ```python
    (original_df, categories) = strip_categorical(df)
    ```"""
    categories = {}
    df=df.copy()
    for col in df.columns:
        if pd.api.types.is_categorical_dtype(df[col]):
            print(f"Converting categorical column to integer: '{col}' - {dict(enumerate(df[col].cat.categories))}")
            categories[col] = df[col].cat.categories
            df[col] = df[col].cat.codes
    return df, pd.DataFrame(categories)

def strip_duration_to_int(df:pd.DataFrame, to_int_unit:Union[str,dict[str,str]]="ms"):
    """convert Timedelta columns to integer types with the given unit
    to_int_unit should be a string or a dictionary of column names to units
    
    Example:
    
    ```python
    df, time_delta_cols = strip_duration_to_int(df, to_int_unit="ms")
    ```"""
    df=df.copy()
    time_delta_cols = {}
    for col in df.columns:
        if pd.api.types.is_timedelta64_dtype(df[col].dtype):
            col_to_int_unit = to_int_unit
            if isinstance(to_int_unit, dict):
                col_to_int_unit = to_int_unit[col]
            print(f"Converting Timedelta column to integer using units '{col_to_int_unit}': '{col}'")
            time_delta_cols[col] = col_to_int_unit
            df[col] = df[col] // pd.Timedelta(1, unit=col_to_int_unit)
    return df, time_delta_cols

def write_delta(path, data, timedelta_to_int_unit:Union[str,dict[str,str]]="ms", **kwargs):
    data, categories = strip_categorical(data)
    data, time_delta_cols = strip_duration_to_int(data, timedelta_to_int_unit)
    dl.write_deltalake(path, data, **kwargs)
    if len(categories) > 0:
        dl.write_deltalake(path+"_categories", categories,**kwargs)
    if len(time_delta_cols) > 0:
        dl.write_deltalake(path+"_time_delta_cols", time_delta_cols,**kwargs)

@kangshung
Copy link

Are there any plans to implement this?

@ion-elgreco
Copy link
Collaborator

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

@kangshung
Copy link

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

@ion-elgreco
Copy link
Collaborator

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

@kangshung
Copy link

kangshung commented Oct 23, 2023

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

And that's the issue. Delta returns deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8) for Categorical fields.

Here you have a method that raises an exception on Categorical fields in polars: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/delta.py#L323-L329

@ion-elgreco
Copy link
Collaborator

Are there any plans to implement this?

You can use polars.io.delta import _convert_pa_schema_to_delta

What about the _check_for_unsupported_types() method that lists Categorical as an unsupported type? Why would it work without polars if it doesn't with polars?

I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

And that's the issue. Delta returns deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8) for Categorical fields.

Here you have a method that raises an exception on Categorical fields in polars: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/delta.py#L323-L329

I see, we could possibly port these things from Polars into delta-rs, I'll check with the polars contributors. Not super familiar with licenses and all

@ion-elgreco ion-elgreco added the binding/python Issues for the Python package label Nov 22, 2023
ion-elgreco added a commit that referenced this issue Nov 24, 2023
…iter/merge (#1820)

# Description
This ports some functionality that @stinodego and I had worked on in
Polars. Where we converted a pyarrow schema to a compatible delta
schema. It converts the following:

- uint -> int
- timestamp(any timeunit) -> timestamp(us) 

I adjusted the functionality to do schema conversion from large to
normal when necessary, which is still needed in MERGE as workaround
#1753.

Additional things I've added:

- Schema conversion for every input in write_deltalake/merge
- Add Pandas dataframe conversion
- Add Pandas dataframe as input in merge


# Related Issue(s)
- closes #686
- closes #1467

---------

Co-authored-by: Will Jones <[email protected]>
ion-elgreco added a commit to ion-elgreco/delta-rs that referenced this issue Nov 25, 2023
…iter/merge (delta-io#1820)

This ports some functionality that @stinodego and I had worked on in
Polars. Where we converted a pyarrow schema to a compatible delta
schema. It converts the following:

- uint -> int
- timestamp(any timeunit) -> timestamp(us)

I adjusted the functionality to do schema conversion from large to
normal when necessary, which is still needed in MERGE as workaround
delta-io#1753.

Additional things I've added:

- Schema conversion for every input in write_deltalake/merge
- Add Pandas dataframe conversion
- Add Pandas dataframe as input in merge

- closes delta-io#686
- closes delta-io#1467

---------

Co-authored-by: Will Jones <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants