Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Serialization to support more objects in XCom #16786

Closed
wants to merge 1 commit into from

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Jul 2, 2021

closes #8703

This way we will support Set, Tuple (and others like DAG,
BaseOperator, relativedelta, datetime) in XCom in serialization and
de-serialization.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

closes apache#8703

This way we will support Set, Tuple (and others like DAG,
BaseOperator, relativedelta, datetime) in XCom in serialization and
de-serialization.
@kaxil kaxil requested a review from turbaszek July 2, 2021 23:36
@kaxil kaxil requested review from ashb and XD-DENG as code owners July 2, 2021 23:36
@kaxil kaxil added this to the Airflow 2.2 milestone Jul 2, 2021
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. But I wonder whether it will support old serialized data with full compatibility? Maybe worth to add a test where we serialize complex-ish structure with json and de-serialize with BaseSerializer (unless it is already done?).

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jul 3, 2021
@github-actions
Copy link

github-actions bot commented Jul 3, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

return json.dumps(value).encode('UTF-8')
from airflow.serialization.serialized_objects import BaseSerialization

return json.dumps(BaseSerialization._serialize(value, fail_on_error=True)).encode('UTF-8')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a BaseSerialization.to_json() method for symmetry

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, excetp:

    @classmethod
    def to_json(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) -> str:
        """Stringifies DAGs and operators contained by var and returns a JSON string of var."""
        return json.dumps(cls.to_dict(var), ensure_ascii=True)

    @classmethod
    def to_dict(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) -> dict:
        """Stringifies DAGs and operators contained by var and returns a dict of var."""
        # Don't call on this class directly - only SerializedDAG or
        # SerializedBaseOperator should be used as the "entrypoint"
        raise NotImplementedError()

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this entirely break loading current XCom values?

And "BaseSerialization" has all sorts of behaviour for registering OperatorLinks etc that we don't want here.

So 🤔 not really sure this is right

@ashb
Copy link
Member

ashb commented Jul 7, 2021

Yeah, this toally breaks all existing JSON XComs:

In [1]: from airflow.serialization.serialized_objects import BaseSerialization

In [2]: BaseSerialization.from_dict({"x": "y"})
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-2-5a1376cdcc27> in <module>
----> 1 BaseSerialization.from_dict({"x": "y"})

~/code/airflow/airflow/airflow/serialization/serialized_objects.py in from_dict(cls, serialized_obj)
    136         reconstructs all DAGs and operators it contains.
    137         """
--> 138         return cls._deserialize(serialized_obj)
    139 
    140     @classmethod

~/code/airflow/airflow/airflow/serialization/serialized_objects.py in _deserialize(cls, encoded_var)
    269         if not isinstance(encoded_var, dict):
    270             raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
--> 271         var = encoded_var[Encoding.VAR]
    272         type_ = encoded_var[Encoding.TYPE]
    273 

KeyError: <Encoding.VAR: '__var'>

In [3]: BaseSerialization.from_dict({"x": "y"})

@ashb
Copy link
Member

ashb commented Jul 7, 2021

Maybe we need to start adding info against XCom rows, as I mentioned in #16823 (comment) -- that way this could be used for new XComs but old ones would stay as the were.

@kaxil kaxil marked this pull request as draft July 7, 2021 11:21
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 22, 2021
@github-actions github-actions bot closed this Aug 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:serialization full tests needed We need to run full set of tests for this PR to merge stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for set in XCom serialization
4 participants