-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BUGFIX: Make sure XComs work correctly in MSGraphAsyncOperator with paged results and dynamic task mapping #40301
Conversation
…t by taking the map_index into account
…ise we will have false positive as map_index is an integer
@potiuk Will try to write a unit test which tests the case with map_index, but not in a multi-threaded way but at least then this case will be documented and verified for regression through a test. Also my apologies for the revert, should have put previous PR in a draft my bad |
Good idea. |
And no worries about revert. Happens :). I was also too fast. |
Hello @potiuk , I've updated the description of the issue, now we fully understand the issue we had, it had nothing to do with concurrency but a misuseage of the XCom's, which is also a bit due to the fact that the xcom_pull method of the BaseOperator doesn't allow you to specify the map_indices, while the delegated one on TaskInstance does, which helped me solve the issue, which at the end is quite simple once you know it. So I also suggested in description above if it wouldn't be beter to also adapt the xcom_pull method on the BaseOperator, we won't be able to use it directly as is with the providers due to the backward compatiblilty issue, but at least, we would have a clean solution already in place for the future and maybe Airflow 3.0. What do you think? If you agree, I'll create a new PR for that one. |
… can specify the map_index from which you want to pull
Test have been adapted to take into account the map_index |
…aged results and dynamic task mapping (apache#40301) --------- Co-authored-by: David Blain <[email protected]>
Now I fully understand why we had a "concurrency" issue with the XCom's when using the MSGraphAsyncOperator which has a paged result with a dynamic task mapping.
The issue is not a concurrency issue at all, but rather a misusage of the Xcoms when handling paged results within dynamic mapped tasks.
You have to be aware that when the MSGraphAsyncOperator detects that the result will be composed of multiple pages, it will append the consequent pages to the already fetched ones within the current XCom.
This worked fine until we start using is with the partial and expand functionally, e.g. applying dynamic task mapping on the MSGraphAsyncOperator.
This issue there was that we didn't take into account the map_index of the TaskInstance, and by default when you use the xcom_pull method of the BaseOperator, it will automatically used all available map_indices to retrieve the XCom, and thus return all fetched page results for all mapped tasks, which is wrong in this case as we need to only append the paged result to the already existing one for the current task instance in which it is being run and NOT to all the xcoms, which is the default behaviour when you do a xcom_pull.
Unfortunately, the xcom_pull method of the BaseOperator doesn't allow you to specify the map_index, so it taks all indices by default. So in order to fix that, I need to call the xcom_pull method directly on the TaskInstance from the context, as there the method allows you to specifiy map_index, which solves then our problem but has nothing to do with concurrency.
I think it would be good to adapt the xcom_pull method of the BaseOperator so it can also take into account the map_index if needed (and also add default while we're at it), but leave it to None as it's done in the TaskInstance.
Of course to be able to used it, the provider will need at least the version of Airflow that has that new implementation. In the meantime I do it directly on the TaskInstance but I think it's a good idea to still adapt the method on BaseOperator,
especially once Airflow 3 will be there, the code will be cleaner.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.