-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve control over dynamic requirements #3179
Improve control over dynamic requirements #3179
Conversation
The failing test reports ModuleNotFoundError: No module named 'openapi_spec_validator' and connected to that _pytest.nodes.Collector.CollectError: ImportError while importing test module '/home/runner/work/luigi/luigi/test/contrib/ecs_test.py'. Is it possible there's something wrong with |
Seems like a temporary glitch, all tests pass now 🎉 |
This could likely improve many of our jobs. Thanks! I have considered implementing something similar, but in the storage interface layer, e.g. a GCSClient that batches lookups, potentially backed by a cache shared by workers. We already have a GCSClient with an in-memory cache, which improves things, but doesn't go all the way. Our cache implementation is very simple and makes some assumptions, so we haven't shared it. What would the pros and cons be of DynamicRequirements vs similar functionality in the storage layer? DynamicRequirements can be used for multiple types of storage. A solution in the storage layer would be opaque to the worker core, and might arguably better separate concerns by keeping storage interface complexity within the storage layer. I am not sure how valuable a shared or persistent cache would be. I do not doubt that this is a good solution. Just taking the opportunity to reason. Open source architecture is often accidental. :-) |
Thanks for the feedback @lallea !
Interesting, we also work with in-memory / local caching on SSDs (if we can) plus batched lookups. However, I think having these DynamicRequirements on-top can be beneficial a) to reduce the load even further, and Some more background on our case (Physics research): We sometimes have dynamic tasks with O(k) yielded tasks, and on some of our file systems, saving O(k) cache lookups can speed things up a lot. Depending on where we process things (we can't always choose that), we cannot control the worker infrastructure so we often end up on machines that only have shared, slow NFS's where caching via local disks isn't an option. Low-hanging fruits such as defining DynamicRequirements could be really helpful there. |
Yes, I agree it looks useful even in the presence of other caching. Could it make the bulk complete code in the Range classes |
@dlstadther Kind ping :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the ping @riga !
Changes LGTM; only left 2 small documentation questions and 1 optional code comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you for your hard work and contribution!
This PR is meant as a continuation of #3178 and further improves the control over dynamic task requirements handled by the worker.
Description
I added a shallow class
DynamicRequirements
which is intended to wrap a batch of tasks being yielded as dyn. reqs. and optionally to define a custom, probably optimized completeness check for that batch. The new class is understood by theTaskProcess
as a valid yield value of run() methods, which required changes to only a few lines.Motivation and Context
As already mentioned in #3178, we are sometimes dealing with k's of tasks yielded as dynamic requirements in some wrapper task. These tasks store their outputs in a remote location and we can make the assumption that they are located in the same directory (presumably not unusual). For this matter we don't want to "stat" all output files separately, but just do a "listdir" on the common base directory, followed by a local comparison of basenames, saving us k's of interactions with remote resources. Although we make use of the caching implemented in the referenced PR, we would like to further reduce remote API calls but the workers (or rather TaskProcess) internally flatten all requirements and perform separate completeness checks:
https://github.com/spotify/luigi/blob/master/luigi/worker.py#L154
Which requirements exist and which don't is not of interest at this point, so batched completeness checks as the one suggested above would be fully compatible with the current logic :)
Have you tested this? If so, how?
Yep, I added a new test case, update the docs, and amended the
dynamic_requirements.py
example.