-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Serialization to support more objects in XCom #16786
Conversation
closes apache#8703 This way we will support Set, Tuple (and others like DAG, BaseOperator, relativedelta, datetime) in XCom in serialization and de-serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. But I wonder whether it will support old serialized data with full compatibility? Maybe worth to add a test where we serialize complex-ish structure with json and de-serialize with BaseSerializer (unless it is already done?).
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
return json.dumps(value).encode('UTF-8') | ||
from airflow.serialization.serialized_objects import BaseSerialization | ||
|
||
return json.dumps(BaseSerialization._serialize(value, fail_on_error=True)).encode('UTF-8') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a BaseSerialization.to_json()
method for symmetry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, excetp:
@classmethod
def to_json(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) -> str:
"""Stringifies DAGs and operators contained by var and returns a JSON string of var."""
return json.dumps(cls.to_dict(var), ensure_ascii=True)
@classmethod
def to_dict(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) -> dict:
"""Stringifies DAGs and operators contained by var and returns a dict of var."""
# Don't call on this class directly - only SerializedDAG or
# SerializedBaseOperator should be used as the "entrypoint"
raise NotImplementedError()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this entirely break loading current XCom values?
And "BaseSerialization" has all sorts of behaviour for registering OperatorLinks etc that we don't want here.
So 🤔 not really sure this is right
Yeah, this toally breaks all existing JSON XComs:
|
Maybe we need to start adding info against XCom rows, as I mentioned in #16823 (comment) -- that way this could be used for new XComs but old ones would stay as the were. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
closes #8703
This way we will support Set, Tuple (and others like DAG,
BaseOperator, relativedelta, datetime) in XCom in serialization and
de-serialization.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.