Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUGFIX: Make sure XComs work correctly in MSGraphAsyncOperator with paged results and dynamic task mapping #40301

Merged
merged 14 commits into from
Jun 20, 2024

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Jun 18, 2024

Now I fully understand why we had a "concurrency" issue with the XCom's when using the MSGraphAsyncOperator which has a paged result with a dynamic task mapping.

The issue is not a concurrency issue at all, but rather a misusage of the Xcoms when handling paged results within dynamic mapped tasks.

You have to be aware that when the MSGraphAsyncOperator detects that the result will be composed of multiple pages, it will append the consequent pages to the already fetched ones within the current XCom.
This worked fine until we start using is with the partial and expand functionally, e.g. applying dynamic task mapping on the MSGraphAsyncOperator.

This issue there was that we didn't take into account the map_index of the TaskInstance, and by default when you use the xcom_pull method of the BaseOperator, it will automatically used all available map_indices to retrieve the XCom, and thus return all fetched page results for all mapped tasks, which is wrong in this case as we need to only append the paged result to the already existing one for the current task instance in which it is being run and NOT to all the xcoms, which is the default behaviour when you do a xcom_pull.

Unfortunately, the xcom_pull method of the BaseOperator doesn't allow you to specify the map_index, so it taks all indices by default. So in order to fix that, I need to call the xcom_pull method directly on the TaskInstance from the context, as there the method allows you to specifiy map_index, which solves then our problem but has nothing to do with concurrency.

I think it would be good to adapt the xcom_pull method of the BaseOperator so it can also take into account the map_index if needed (and also add default while we're at it), but leave it to None as it's done in the TaskInstance.
Of course to be able to used it, the provider will need at least the version of Airflow that has that new implementation. In the meantime I do it directly on the TaskInstance but I think it's a good idea to still adapt the method on BaseOperator,
especially once Airflow 3 will be there, the code will be cleaner.

    @staticmethod
    @provide_session
    def xcom_pull(
        self,
        task_ids: str | Iterable[str] | None = None,
        dag_id: str | None = None,
        key: str = XCOM_RETURN_KEY,
        include_prior_dates: bool = False,
        session: Session = NEW_SESSION,
        *,
        map_indexes: int | Iterable[int] | None = None,  # add those 2 extra params like in TaskInstance
        default: Any = None,  # add those 2 extra params like in TaskInstance
    ) -> Any:
        """
        Pull XComs that optionally meet certain criteria.

        The default value for `key` limits the search to XComs
        that were returned by other tasks (as opposed to those that were pushed
        manually). To remove this filter, pass key=None (or any desired value).

        If a single task_id string is provided, the result is the value of the
        most recent matching XCom from that task_id. If multiple task_ids are
        provided, a tuple of matching values is returned. None is returned
        whenever no matches are found.

        :param context: Execution Context Dictionary
        :param key: A key for the XCom. If provided, only XComs with matching
            keys will be returned. The default key is 'return_value', also
            available as a constant XCOM_RETURN_KEY. This key is automatically
            given to XComs returned by tasks (as opposed to being pushed
            manually). To remove the filter, pass key=None.
        :param task_ids: Only XComs from tasks with matching ids will be
            pulled. Can pass None to remove the filter.
        :param dag_id: If provided, only pulls XComs from this DAG.
            If None (default), the DAG of the calling task is used.
        :param include_prior_dates: If False, only XComs from the current
            execution_date are returned. If True, XComs from previous dates
            are returned as well.
		:param map_indexes: If provided, only pull XComs with matching indexes.
            If *None* (default), this is inferred from the task(s) being pulled
            (see below for details).
        """
        return context["ti"].xcom_pull(
            key=key,
            task_ids=task_ids,
            dag_id=dag_id,
            include_prior_dates=include_prior_dates,
            session=session,
	    map_indexes=map_indexes,
            default=default,
        )


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@dabla dabla marked this pull request as draft June 18, 2024 14:41
@dabla
Copy link
Contributor Author

dabla commented Jun 18, 2024

@potiuk Will try to write a unit test which tests the case with map_index, but not in a multi-threaded way but at least then this case will be documented and verified for regression through a test. Also my apologies for the revert, should have put previous PR in a draft my bad

@potiuk
Copy link
Member

potiuk commented Jun 19, 2024

@potiuk Will try to write a unit test which tests the case with map_index, but not in a multi-threaded way but at least then this case will be documented and verified for regression through a test. Also my apologies for the revert, should have put previous PR in a draft my bad

Good idea.

@potiuk
Copy link
Member

potiuk commented Jun 19, 2024

And no worries about revert. Happens :). I was also too fast.

@dabla
Copy link
Contributor Author

dabla commented Jun 19, 2024

And no worries about revert. Happens :). I was also too fast.

Hello @potiuk , I've updated the description of the issue, now we fully understand the issue we had, it had nothing to do with concurrency but a misuseage of the XCom's, which is also a bit due to the fact that the xcom_pull method of the BaseOperator doesn't allow you to specify the map_indices, while the delegated one on TaskInstance does, which helped me solve the issue, which at the end is quite simple once you know it. So I also suggested in description above if it wouldn't be beter to also adapt the xcom_pull method on the BaseOperator, we won't be able to use it directly as is with the providers due to the backward compatiblilty issue, but at least, we would have a clean solution already in place for the future and maybe Airflow 3.0. What do you think? If you agree, I'll create a new PR for that one.

@dabla dabla changed the title refactor: Make sure xcoms work correctly in MSGraphAsyncOperator with concurrent workers refactor: Make sure XComs work correctly in MSGraphAsyncOperator with paged results and dynamic task mapping Jun 19, 2024
@dabla dabla changed the title refactor: Make sure XComs work correctly in MSGraphAsyncOperator with paged results and dynamic task mapping BUGFIX: Make sure XComs work correctly in MSGraphAsyncOperator with paged results and dynamic task mapping Jun 19, 2024
@dabla
Copy link
Contributor Author

dabla commented Jun 19, 2024

Test have been adapted to take into account the map_index

@dabla dabla marked this pull request as ready for review June 20, 2024 07:48
@potiuk potiuk merged commit d9d0963 into apache:main Jun 20, 2024
51 checks passed
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Jul 26, 2024
…aged results and dynamic task mapping (apache#40301)



---------

Co-authored-by: David Blain <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants