-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to specify XCom key for operators and task decorator #16823
Add ability to specify XCom key for operators and task decorator #16823
Conversation
Currently, we cannot specify a key for XCom using the task decorator or any Operator except we explicitly use ti.xcom_push(key=key, value=value). Whenever XCom is pushed, it implicitly use the key 'return_value' if we don't explicitly push it and in the case of task decorator, we can't change this value. This change adds an operator argument called `xcom_key` which helps us to change the xcom_key
Tested with these two dags:
Another one:
|
Can you explain why this is desirable behaviour? Doesn't returning a dictionary (and setting multiple outputs?) already set different xcom keys, or am I making that up? |
@@ -161,6 +161,7 @@ | |||
"weight_rule": { "type": "string" }, | |||
"executor_config": { "$ref": "#/definitions/dict" }, | |||
"do_xcom_push": { "type": "boolean" }, | |||
"xcom_key": { "type": "string" }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This attribute isn't needed for webserver or scheduler, so we should exclude it from the serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we need it since it's an additional field in the base operator. It's actually a test failure that pointed me to it
task_id=task_id, | ||
dag=dag, | ||
python_callable=lambda: value, | ||
xcom_key=key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should a property of BaseOperator -- if we have this feature (and I'm still not sure why we need it) then it should be solely on the TaskFlow decorator, not all operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have provided answers to why we may need it in all operators. Check #16823 (comment)
It does set different xcom keys for 'each' key in the dictionary and at the end, the whole dictionary is also set with What I want in this case, is to change this 'return_value' key that's outputted which we don't have control over.
Here, we can't change the key for the xcom pushed, the only way to do it is use
And in task decorator we can't change the xcom key used for this function :
If we use multiple_outputs and do something like:
Then Xcoms pushed will be
With this change, if we have:
Xcom pushed will just be:
Which is mostly what anyone wants |
One thing that one can do if he's able to control the xcom_key: In overriding the
|
FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)
I need convincing that a) this is the right thing to trigger this behaviour one, and b) that the interface you have proposed is right for users. For instance: @task
def my_task():
return some_large_df() That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :) So lets take an big step back: what feature/functionality are you trying to achieve? |
Above is the current behaviour when using multiple outputs. With this change only this
The above code, will output
I want to be able to control the key used in pushing xcom when a value is returned from a task |
Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about. |
😀😀 Take for instance, I'm building a custom xcom backend where I want to have XComs stored in GCS if the value is large or store in db if the value is small. And I want to know in the UI when I visit the XCom tab that this XCOM is in db or in GCS. Is there another way to accomplish this other than to prefix the keys for xcoms stored in GCS with say `df' and do something like:
I don't have a strong opinion on this feature though, we can close it if we don't want it |
Also, I feel the way multiple_outputs work is wrong and this can be used instead of it |
If that is the goal, then I would suggest we add an extra column to the XCom table (json?) that is usable by xcom backends for storing "whatever they like" -- and then that is what we use to decide what to show. Because the XCom key should be entirely under user/dag author control -- and while we could put restrictions on it if we wanted to, it could lead to non-obvious behaviour as I mentioned earlier. |
@ashb, I have just discovered that XCom key can actually be changed with the TaskFlow API currently: from airflow import DAG
from airflow.utils.dates import days_ago
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
dag = DAG(
'example_xcom',
schedule_interval="@once",
start_date=days_ago(2),
tags=['example'],
)
@dag.task()
def push(**kwargs):
"""Pushes an XCom without a specific target"""
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
@dag.task()
def push_by_returning():
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
@dag.task()
def puller(data1, data2, **kwargs):
"""Pull all previously pushed XComs and check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
if pulled_value_1 != value_1:
raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
# get value_2
pulled_value_2 = data2
if pulled_value_2 != value_2:
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
if pulled_value_2 != value_2:
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
puller(push(), push_by_returning()) The decorated function accepts the Closing this PR! |
@ephraimbuddy Can I overwrite the default "return_value" key in custom operators? I have some historical custom keys for certain DAGs. For example, in one operator I push a list of files with the key Not a huge deal because I could potentially just rename all of the old xcom keys in the database, and replace any references to |
@Idacey, for custom operators, if you are pushing XComs with xcom_arg = XComArg(custom_op, key=`extract_files`) instead of xcom_arg = custom_op.output |
Okay, thanks. So no way to actually return a custom key name (without using |
This worked for me.
It seems like I have to use It would be nice if we could just overwrite the Thanks @ephraimbuddy |
Currently, we cannot specify a key for XCom using the task decorator
or any Operator except we explicitly use ti.xcom_push(key=key, value=value).
Whenever XCom is pushed, it implicitly use the key 'return_value' if we don't
explicitly push it and in the case of task decorator, we can't change this
value.
This change adds an operator argument called
xcom_key
which helps us to changethe xcom_key
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.