Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify XCom key for operators and task decorator #16823

Closed

Conversation

ephraimbuddy
Copy link
Contributor

Currently, we cannot specify a key for XCom using the task decorator
or any Operator except we explicitly use ti.xcom_push(key=key, value=value).

Whenever XCom is pushed, it implicitly use the key 'return_value' if we don't
explicitly push it and in the case of task decorator, we can't change this
value.

This change adds an operator argument called xcom_key which helps us to change
the xcom_key


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

Currently, we cannot specify a key for XCom using the task decorator
or any Operator except we explicitly use ti.xcom_push(key=key, value=value).

Whenever XCom is pushed, it implicitly use the key 'return_value' if we don't
explicitly push it and in the case of task decorator, we can't change this
value.

This change adds an operator argument called `xcom_key` which helps us to change
the xcom_key
@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Jul 5, 2021

Tested with these two dags:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

dag =  DAG(
    "example_xcom",
    schedule_interval="@once",
    start_date=days_ago(2)
) 

def push_xcom(ti):
    return [3,4,5,6]

def pull_xcom(ti):
    xcom = ti.xcom_pull(task_ids='push_xcom', key='mykey')
    print("Xcom pulled: ", xcom)


with dag:
    task1 = PythonOperator(
        task_id="push_xcom",
        python_callable=push_xcom,
        xcom_key='mykey',
    )
    task2 = PythonOperator(
        task_id="pull_xcom",
        python_callable=pull_xcom
    )
    task1 >> task2

Another one:

from airflow import DAG
from airflow.utils.dates import days_ago

dag = DAG(dag_id="test-xcom-key", schedule_interval="@once", start_date=days_ago(2))

@dag.task(xcom_key="mykey")
def push_xcom():
    return {"mydata":[3,4,5,6]}

@dag.task()
def push_another_xcom(data):
    print(data)
    return {'mydata':"mydata"}


push_another_xcom(push_xcom())

@ashb ashb modified the milestones: Airflow 2.1.3, Airflow 2.2 Jul 6, 2021
@ashb
Copy link
Member

ashb commented Jul 6, 2021

Can you explain why this is desirable behaviour?

Doesn't returning a dictionary (and setting multiple outputs?) already set different xcom keys, or am I making that up?

@@ -161,6 +161,7 @@
"weight_rule": { "type": "string" },
"executor_config": { "$ref": "#/definitions/dict" },
"do_xcom_push": { "type": "boolean" },
"xcom_key": { "type": "string" },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attribute isn't needed for webserver or scheduler, so we should exclude it from the serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need it since it's an additional field in the base operator. It's actually a test failure that pointed me to it

task_id=task_id,
dag=dag,
python_callable=lambda: value,
xcom_key=key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should a property of BaseOperator -- if we have this feature (and I'm still not sure why we need it) then it should be solely on the TaskFlow decorator, not all operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have provided answers to why we may need it in all operators. Check #16823 (comment)

@ephraimbuddy
Copy link
Contributor Author

Can you explain why this is desirable behaviour?

Doesn't returning a dictionary (and setting multiple outputs?) already set different xcom keys, or am I making that up?

It does set different xcom keys for 'each' key in the dictionary and at the end, the whole dictionary is also set with return_value as key.

What I want in this case, is to change this 'return_value' key that's outputted which we don't have control over.
Take, for example, a python operator task that pushes xcom by returning a value.:

def push_xcom(ti):
    return [3,4,5,6]

 task1 = PythonOperator(
        task_id="push_xcom",
        python_callable=push_xcom,
    )

Here, we can't change the key for the xcom pushed, the only way to do it is use

def push_xcom(ti):
    ti.xcom_push(key='mykey', value=[3,4,5,6])

And in task decorator we can't change the xcom key used for this function :

@task()
def push_xcom(ti):
    return [3,4,5,6]

If we use multiple_outputs and do something like:

@task(multiple_outputs=True)
def push_xcom(ti):
    return {'mykey':[3,4,5,6]}

Then Xcoms pushed will be

key=mykey, value=[3,4,5,6]
key=return_value, value={'mykey':[3,4,56]}

With this change, if we have:

@task(xcom_key='mykey')
def push_xcom(ti):
    return [3,4,5,6]

Xcom pushed will just be:

key=mykey, value=[3,4,5,6]

Which is mostly what anyone wants

@ephraimbuddy
Copy link
Contributor Author

One thing that one can do if he's able to control the xcom_key:

In overriding the orm_deserialize_value we can do something like:

def orm_deserialize_value(self):
    if self.key.starts_with('df'):
        return 'Xcom uploaded to GCS'
    return BaseXCom.deserialize_value(self)

@ashb
Copy link
Member

ashb commented Jul 6, 2021

Then Xcoms pushed will be

key=mykey, value=[3,4,5,6]
key=return_value, value={'mykey':[3,4,56]}

FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)

One thing that one can do if he's able to control the xcom_key:

In overriding the orm_deserialize_value we can do something like:

def orm_deserialize_value(self):
    if self.key.starts_with('df'):
        return 'Xcom uploaded to GCS'
    return BaseXCom.deserialize_value(self)

I need convincing that a) this is the right thing to trigger this behaviour one, and b) that the interface you have proposed is right for users.

For instance:

@task
def my_task():
    return some_large_df()

That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :)

So lets take an big step back: what feature/functionality are you trying to achieve?

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Jul 6, 2021

FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)

Above is the current behaviour when using multiple outputs. With this change only this key=return_value, value={'mykey':[3,4,56]} is the output

For instance:

```
@task
def my_task():
      return some_large_df()
```

That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :)

The above code, will output some_large_df as xcom value with key='return_value', if we use this change and say @task(xcom_key='somekey'), the only thing that would change is the xcom key. Xcom value will be some_large_df while key will be `somekey'.

So lets take an big step back: what feature/functionality are you trying to achieve?

I want to be able to control the key used in pushing xcom when a value is returned from a task

@ashb
Copy link
Member

ashb commented Jul 6, 2021

I want to be able to control the key used in pushing xcom when a value is returned from a task

Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Jul 6, 2021

I want to be able to control the key used in pushing xcom when a value is returned from a task

Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.

😀😀
Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the orm_deserialize_value.

Take for instance, I'm building a custom xcom backend where I want to have XComs stored in GCS if the value is large or store in db if the value is small. And I want to know in the UI when I visit the XCom tab that this XCOM is in db or in GCS.

Is there another way to accomplish this other than to prefix the keys for xcoms stored in GCS with say `df' and do something like:

def orm_deserialize_value(self):
    if self.key.starts_with('df'):
        return 'Xcom uploaded to GCS'
    return BaseXCom.deserialize_value(self)

I don't have a strong opinion on this feature though, we can close it if we don't want it

@ephraimbuddy
Copy link
Contributor Author

Also, I feel the way multiple_outputs work is wrong and this can be used instead of it

@ashb
Copy link
Member

ashb commented Jul 7, 2021

Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the orm_deserialize_value.

If that is the goal, then I would suggest we add an extra column to the XCom table (json?) that is usable by xcom backends for storing "whatever they like" -- and then that is what we use to decide what to show.

Because the XCom key should be entirely under user/dag author control -- and while we could put restrictions on it if we wanted to, it could lead to non-obvious behaviour as I mentioned earlier.

@ephraimbuddy
Copy link
Contributor Author

@ashb, I have just discovered that XCom key can actually be changed with the TaskFlow API currently:

from airflow import DAG
from airflow.utils.dates import days_ago

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}

dag = DAG(
    'example_xcom',
    schedule_interval="@once",
    start_date=days_ago(2),
    tags=['example'],
)

@dag.task()
def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


@dag.task()
def push_by_returning():
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


@dag.task()
def puller(data1, data2, **kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    if pulled_value_1 != value_1:
        raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')

    # get value_2
    pulled_value_2 = data2
    if pulled_value_2 != value_2:
        raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
    if pulled_value_1 != value_1:
        raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
    if pulled_value_2 != value_2:
        raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')


puller(push(), push_by_returning())

The decorated function accepts the **kwargs and we can use ti.xcom_push and also ti.xcom_pull

Closing this PR!

@ephraimbuddy ephraimbuddy deleted the add-xcom-key-to-tasks-decorator branch August 11, 2021 10:14
@ldacey
Copy link
Contributor

ldacey commented Nov 28, 2021

@ephraimbuddy Can I overwrite the default "return_value" key in custom operators?

I have some historical custom keys for certain DAGs. For example, in one operator I push a list of files with the key extract_files and I also push the maximum ID from the database table which is a default return_value. Can I have my operator return the list of files (extract_files) as an xcom with the key extract_files? I want to make use of the .output feature.

Not a huge deal because I could potentially just rename all of the old xcom keys in the database, and replace any references to extract_files key in my code.

@ephraimbuddy
Copy link
Contributor Author

@Idacey, for custom operators, if you are pushing XComs with ti.xcom_push, you can specify a key. Then you can wrap your operator instance with XComArg specifying the key to pull. e.g

xcom_arg = XComArg(custom_op, key=`extract_files`)

instead of

xcom_arg = custom_op.output

@ldacey
Copy link
Contributor

ldacey commented Nov 29, 2021

Okay, thanks. So no way to actually return a custom key name (without using ti.xcom_push explicitly)? I tried to override the output function from the BaseOperator, but no luck it seems. It is fine either way, I just assumed I was missing something obvious.

@ldacey
Copy link
Contributor

ldacey commented Nov 29, 2021

This worked for me.

  • extract.output can be used because the key is return_value
  • I added a template_field called input_files in my custom DatasetToDatasetOperator
  • I used XcomArg to overwrite the key from return_value to filter_list
  • The input_files argument reads the XComArg
    transform = PrepareParquetOperator(
        task_id="transform",
        input_files=extract.output,
    )

    transformed_files = XComArg(transform, "filter_list")

    finalize = DatasetToDatasetOperator(
        task_id="finalize",
        input_files=transformed_files,
    
    )
    extract >> transformed_files >> finalize

It seems like I have to use transformed_files as my upstream task instead of transform. Everything works though.

It would be nice if we could just overwrite the return_value key with anything we want to without requiring intermediate tasks or XComArgs. I will most likely just replace all of my custom keys with return_value instead though in the database, and remove any of my custom xcom.push code in favor of just returning results though.

Thanks @ephraimbuddy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants