Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow IO manager keys to be StrEnums, not just strs #18623

Open
ShootingStarD opened this issue Dec 10, 2023 · 9 comments
Open

Allow IO manager keys to be StrEnums, not just strs #18623

ShootingStarD opened this issue Dec 10, 2023 · 9 comments

Comments

@ShootingStarD
Copy link

ShootingStarD commented Dec 10, 2023

Can only serialize whitelisted Enums, received JobKeysEnum -> whitelist enums from the StrEnum library

Dagster version

1.5.9

What's the issue?

Hi,
I want to by able to provide StrEnums as keys for various dagster elements such as io_manager, jobs, assets etc. It works well for asset but not for IO manager and Jobs. In fact I manage to reproduce the error for jobs but not for IO manager, eventhough I use the same dagster version.

Nevertheless when I try to execute a Job where the name was defined via a child of StrEnum (from the StrEnum library I get the following error : dagster._serdes.errors.SerializationError:

Can only serialize whitelisted Enums, received JobKeysEnum.
               Descent path: <root:JobSnapshot>.name

For me it is important to use StrEnum because :

  • it prevents me from making typo in the keys/name definitions
  • it enables autocompletion
  • StrEnum also extends from str so I do not need to call the .value attribute of the enum which removes boilerplate

Like I said, in another project, I get the same error with IO Managers when defining they keys for the resources of a definition

What did you expect to happen?

From what I understand, the error is "just" that StrEnum are not whitelisted. But since they can be treated as strings, I don't see why we could not whitelist them. Would it be possible to do so?

How to reproduce?

assets.py

from dagster import Definitions, asset
import pandas as pd

@asset
def upstream_asset():
    return pd.DataFrame([1, 2, 3])


@asset
def downstream_asset(upstream_asset):
    return upstream_asset *  4

dagster_keys.py

from enum import  auto
from strenum import SnakeCaseStrEnum

class IOManagerKeysSTR(SnakeCaseStrEnum): # SnakeCaseStrEnum inherits from StrEnum to convert the name into snake case
    IO_MANAGER_STR_ENUM = auto()

class JobKeysEnum(SnakeCaseStrEnum):
    SIMPLE_JOB = auto()

jobs.py

from dagster import AssetSelection, define_asset_job
from my_dagster_project.assets import upstream_asset, downstream_asset
from my_dagster_project.dagster_keys import JobKeysEnum

dummy_job = define_asset_job(
    name=JobKeysEnum.SIMPLE_JOB,
    selection=AssetSelection.assets(upstream_asset, downstream_asset),

)

io_manager.py

from dagster import ConfigurableIOManager, InputContext, OutputContext
from typing import List
import pandas as pd


class MyIOManager(ConfigurableIOManager):
    # specifies an optional string list input, via config system
    path_prefix: List[str] = []

    def _get_path(self, context) -> str:
        return "/".join(self.path_prefix + context.asset_key.path)

    def handle_output(self, context: OutputContext, obj:pd.DataFrame):
        obj.to_csv(self._get_path(context))

    def load_input(self, context: InputContext):
        return pd.read_csv(self._get_path(context))

definitions.py

from dagster import Definitions, asset
from .io_manager import MyIOManager
import pandas as pd
from my_dagster_project.dagster_keys import  IOManagerKeysSTR
from my_dagster_project.assets import upstream_asset, downstream_asset
from my_dagster_project.jobs import dummy_job


defs_enum_str = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={
        IOManagerKeysSTR.IO_MANAGER_STR_ENUM : MyIOManager(
 
        ),
    },
    jobs=[dummy_job]

)

tests.py

from my_dagster_project.definitions import (
    defs_enum_str
)
from my_dagster_project.dagster_keys import JobKeysEnum


def test_str_enum_io_manager():
    job_job = defs_enum_str.get_job_def(
        JobKeysEnum.SIMPLE_JOB
    )
    job_job_results = job_job.execute_in_process()

    assert job_job_results.success

Like I said previously, I am not able to reproduce the error for the IO Manager keys (and I cannot share it since it is from my company)

Deployment type

None

Deployment details

No response

Additional information

I haven't tried to use the Python 3.11 StrEnum but even if it works, our production is still on python 3.9 and a migration is not possible at the moment

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@ShootingStarD ShootingStarD added the type: bug Something isn't working label Dec 10, 2023
@sryza
Copy link
Contributor

sryza commented Dec 11, 2023

Hey @ShootingStarD - are you able to share your full stack trace?

@sryza sryza changed the title Can only serialize whitelisted Enums, received JobKeysEnum -> whitelist enums from the StrEnum library Allow IO manager keys to be subclasses of str Dec 11, 2023
@sryza sryza changed the title Allow IO manager keys to be subclasses of str Allow IO manager keys to be StrEnums, not just strs Dec 11, 2023
@sryza sryza added type: idea and removed type: bug Something isn't working labels Dec 11, 2023
@ShootingStarD
Copy link
Author

Hi @sryza here it is :

___________________________________ test_str_enum_io_manager ___________________________________

    def test_str_enum_io_manager(
    
    ):
    
    
        job_job = defs_enum_str.get_job_def(
            JobKeysEnum.SIMPLE_JOB
        )
>       job_job_results = job_job.execute_in_process()

my-dagster-project/my_dagster_project_tests/test_enum.py:34: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:702: in execute_in_process
    return core_execute_in_process(
.venv/lib/python3.10/site-packages/dagster/_core/execution/execute_in_process.py:51: in core_execute_in_process
    run = execute_instance.create_run_for_job(
.venv/lib/python3.10/site-packages/dagster/_core/instance/__init__.py:1161: in create_run_for_job
    job_def.get_job_snapshot_id(),
.venv/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:953: in get_job_snapshot_id
    return self.get_job_index().job_snapshot_id
.venv/lib/python3.10/site-packages/dagster/_core/host_representation/job_index.py:83: in job_snapshot_id
    self._job_snapshot_id = create_job_snapshot_id(self.job_snapshot)
.venv/lib/python3.10/site-packages/dagster/_core/snap/job_snapshot.py:60: in create_job_snapshot_id
    return create_snapshot_id(snapshot)
.venv/lib/python3.10/site-packages/dagster/_serdes/utils.py:9: in create_snapshot_id
    json_rep = serialize_value(snapshot, **kwargs)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:616: in serialize_value
    packed_value = pack_value(val, whitelist_map=whitelist_map)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:663: in pack_value
    return _pack_value(val, whitelist_map=whitelist_map, descent_path=descent_path)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:695: in _pack_value
    return serializer.pack(cast(NamedTuple, val), whitelist_map, descent_path)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:531: in pack
    packed[storage_key] = _pack_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

val = <JobKeysEnum.SIMPLE_JOB: 'simple_job'>
whitelist_map = WhitelistMap(tuple_serializers={'ConfigurableClassData': <dagster._serdes.config_class.ConfigurableClassDataSerializer...rializer object at 0x7fd7f49b7bb0>, 'JobTickStatus': <dagster._serdes.serdes.EnumSerializer object at 0x7fd7f49b7bb0>})
descent_path = '<root:JobSnapshot>.name'

    def _pack_value(
        val: PackableValue,
        whitelist_map: WhitelistMap,
        descent_path: str,
    ) -> JsonSerializableValue:
        # this is a hot code path so we handle the common base cases without isinstance
        tval = type(val)
        if tval in (int, float, str, bool) or val is None:
            return cast(JsonSerializableValue, val)
        if tval is list:
            return [
                _pack_value(item, whitelist_map, f"{descent_path}[{idx}]")
                for idx, item in enumerate(cast(list, val))
            ]
        if tval is dict:
            return {
                key: _pack_value(value, whitelist_map, f"{descent_path}.{key}")
                for key, value in cast(dict, val).items()
            }
    
        # inlined is_named_tuple_instance
        if isinstance(val, tuple) and hasattr(val, "_fields"):
            klass_name = val.__class__.__name__
            if not whitelist_map.has_tuple_serializer(klass_name):
                raise SerializationError(
                    "Can only serialize whitelisted namedtuples, received"
                    f" {val}.\nDescent path: {descent_path}",
                )
            serializer = whitelist_map.get_tuple_serializer(klass_name)
            return serializer.pack(cast(NamedTuple, val), whitelist_map, descent_path)
        if isinstance(val, Enum):
            klass_name = val.__class__.__name__
            if not whitelist_map.has_enum_entry(klass_name):
>               raise SerializationError(
                    "Can only serialize whitelisted Enums, received"
                    f" {klass_name}.\nDescent path: {descent_path}",
                )
E               dagster._serdes.errors.SerializationError: Can only serialize whitelisted Enums, received JobKeysEnum.
E               Descent path: <root:JobSnapshot>.name

.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:699: SerializationError

@sryza
Copy link
Contributor

sryza commented Dec 12, 2023

🙏

@ShootingStarD
Copy link
Author

I am open to try to add, during the week-end, the StrEnum classes to the whitelist, will keep you in touch
If you have any advices other than reading the contributing guide let me know!

@sryza
Copy link
Contributor

sryza commented Dec 14, 2023

@alangenfeld do you have thoughts on whitelisting StrEnums for serdes so they can be used in place of strings?

I'm not an expert on StrEnums, but a potential challenge here is that enum classes defined in user code processes will not be present in host processes, which means we wouldn't be able to deserialize them there.

@alangenfeld
Copy link
Member

alangenfeld commented Dec 14, 2023

I think it may be as easy as reordering the cases in pack_value and moving https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_serdes/serdes.py#L772-L774 above https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_serdes/serdes.py#L747-L755 assuming its ok to get a regular string back out if it ever goes across a serialization boundary. Based on my reading of the issue I think that would be ok. Will want to put various conditions under test in the PR that adds it to make the behavior clear.

edit: well, that naive idea would risk changing the behavior of existing whitelisted Enums that inherit from str so may want to have it be a fallback only if there isn't a whitelist entry for the Enum

@ShootingStarD
Copy link
Author

ShootingStarD commented Dec 14, 2023

@alangenfeld Indeed deserializing into a regular string is good for me as it is just an easy way to define strings that will be used throughout my projects. In fact , I will never read the de-serialized values except maybe in the debugger but I don"t mind

@ShootingStarD
Copy link
Author

@alangenfeld tested your solution, it passed my testes,

will try make a PR this wk, thanks for all the help!

@ShootingStarD
Copy link
Author

ShootingStarD commented Dec 16, 2023

Pull request in : #18778
@alangenfeld

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants