Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only save data from one preprocessing task at a time #2610

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

bouweandela
Copy link
Member

@bouweandela bouweandela commented Dec 5, 2024

Description

Only save the files from one preprocessing task at a time to avoid running out of memory.

Further optimizations could be made by preparing all the task graphs for saving data and then running everything with a single call to dask.compute, as e.g. done in #2316. Such a n approach would allow re-using loaded data if multiple preprocessing tasks load the same file and compute similar things. However, this would lead to an even larger Dask task graph, so we may want to make sure we do not create needlessly large task graphs anywhere in the preprocessor (see e.g. SciTools/iris#5455) before implementing that.

Closes #2609


Before you get started

Checklist

It is the responsibility of the author to make sure the pull request is ready to review. The icons indicate whether the item will be subject to the 🛠 Technical or 🧪 Scientific review.


To help with the number pull requests:

@bouweandela bouweandela added the dask related to improvements using Dask label Dec 5, 2024
Copy link

codecov bot commented Dec 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 94.97%. Comparing base (90e12d4) to head (bff5e77).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2610   +/-   ##
=======================================
  Coverage   94.96%   94.97%           
=======================================
  Files         253      253           
  Lines       14701    14713   +12     
=======================================
+ Hits        13961    13973   +12     
  Misses        740      740           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@bouweandela bouweandela requested a review from schlunma December 5, 2024 11:33
@bouweandela bouweandela marked this pull request as ready for review December 5, 2024 11:33
@bouweandela
Copy link
Member Author

This could really help to avoid issues with large memory use, but it would require some benchmarking to see how it affects the runtime of large recipes before merging.

@bouweandela bouweandela changed the title Only save one preprocessing task at a time Only save data from one preprocessing task at a time Dec 5, 2024
Copy link
Contributor

@schlunma schlunma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks very good! I am running some tests at them moment, but it looks very promising. Thanks Bouwe 🚀

@schlunma
Copy link
Contributor

schlunma commented Dec 6, 2024

Here are some first test results with recipe_schlund20esd.yml on one compute node of Levante. The distributed scheduler cases use 32 workers à 8 GiB (4 threads per worker).

--max-parallel-tasks Dask scheduler run time before #2522 (11a4787) (H:MM) run time current main (65c7b28) (H:MM) run time this PR (H:MM)
1 default 3:04 2:23 3:34
16 default 0:51 fails 1:30
1 distributed 3:04 1:28 2:20
16 distributed 0:35 0:32 0:32

During the v2.11.0 testing, the recipe ran in 0:38:01.

@bouweandela
Copy link
Member Author

Thanks for all the testing @schlunma!

The NetCDF library used to read the data can only be used by a single thread at a time. I suspect that severely limits the amount of parallelism that can be achieved by the default, thread-based scheduler. On a compute node with many cores like the Levante compute nodes, this effect will be very notable. Observing CPU usage during a run of this recipe using the current main branch using htop suggests that in the best case almost 10 CPU cores are busy, while typically 1.2 are busy. Therefore we can set num_workers: 10 without any loss of performance. With max_parallel_tasks: 16, that results in a runtime of 34 minutes with the current main branch.

Therefore this pull request only offers a performance advantage with the distributed scheduler. An advantage for the default scheduler is that it prevents the tool from crashing if the number of Dask workers is not configured or not correctly configured, but it comes at a considerable computational performance cost so it is probably not worth it. Better documentation seems a more attractive solution.

@schlunma
Copy link
Contributor

schlunma commented Dec 9, 2024

Thanks Bouwe, I didn't know the number of workers for the threaded scheduler has such a big impact! But considering that each process spawns its own workers, this probably makes sense. Thus, I agree that for the threaded scheduler, this approach here is not a good solution, since we actually want the different schedulers to run in parallel.

Therefore this pull request only offers a performance advantage with the distributed scheduler.

This is also not the case (see my latest numbers; comparison of main vs. this PR).

What I really wonder here is why the run times from main and this PR are so vastly different when only 1 process is used. The lock shouldn't be active in this case, should it? I actually quite like the idea of a lock, at least for the distributed scheduler.

@valeriupredoi
Copy link
Contributor

Manu, by "vastly different" you mean this row?

1 distributed 3:04 1:28 2:20

@schlunma
Copy link
Contributor

Yeah, basically the difference in the columns main vs. this PR with 1 process, i.e., 2:23 -> 3:34 for the threaded scheduler and 1:28 -> 2:20 for the distributed one.

@valeriupredoi
Copy link
Contributor

Threaded is Dreaded: as much as possible we shouldn't use Threaded bc that is heavily dependent on the lock; you'd also expect this PR to make things wait a wee while longer bc it's just one process/task at a time vs a pool of processes all writing concurrently (to different files of course); best bet to test would be a recipe that outputs a lot of preprocesses files - there you should see the memory going down, while the times they may be longer, and by quite a bit

@valeriupredoi
Copy link
Contributor

...which I think you've already found and are using (ie your tests here) since I just noticed those are HH:MM and not MM:SS as I initially thought 😁

@schlunma
Copy link
Contributor

The thing is - the lock should only for parallel processing. For serial processing, the corresponding variable is just None. So if I understand the code correctly, there shouldn't be ANY difference in run times for --max-parallel-tasks=1 between the current main and this PR...

@valeriupredoi
Copy link
Contributor

no because the process (when you use Threaded scheduler) uses threading even if max parallel tasks = 1 ie it's one process/task that is distributed on the threads of the core where it's running

@schlunma
Copy link
Contributor

But that's already the case for the current main, right? This PR doesn't change anything in the _run_sequential() code, and because of this the new lock is None and in this case not used. So I really don't understand why there is a difference in run time of more than an hour between main and this PR.

@valeriupredoi
Copy link
Contributor

hang on let me actually look at the code here 😁

But by default, it's what Bouwe says here

The NetCDF library used to read the data can only be used by a single thread at a time. I suspect that severely limits the amount of parallelism that can be achieved by the default, thread-based scheduler.

Note that the same applies to writing as well, it's limited by the lock because HDF5 is not thread-safe. That's why we should turn off threading everywhere, irrespective of how many procs we have, it simply don't work with HDF5

@bouweandela
Copy link
Member Author

For serial processing, the corresponding variable is just None. So if I understand the code correctly, there shouldn't be ANY difference in run times for --max-parallel-tasks=1 between the current main and this PR...

Indeed, I would not expect any difference. Is the difference you're seeing reproducible? Maybe it was just a fluke

@schlunma
Copy link
Contributor

schlunma commented Dec 11, 2024

Is the difference you're seeing reproducible? Maybe it was just a fluke

Yes, I ran the recipes twice. So unless I screwed up big time by looking at the wrong logs, I'm pretty sure it's there.

@valeriupredoi
Copy link
Contributor

it'd be good to measure how long it takes this bit to run:

        try:
            logger.info(
                "Computing and saving data for preprocessing task %s",
                self.name,
            )
            _compute_with_progress(delayeds, description=self.name)

ie from lock acquiring to lock release. The problem you are seeing here is that there are two different locks: a scheduler's lock that Bouwe turns on, lets the task run and finish, then turns it off; but there's a second lock, inside netCDF4/HDF5 that stops a process spread on multiple threads to IO synchronously, so it basically waits until one thread has finished IOing, to let the next thread do IO. Bouwe's lock could be None for that since that's a process' lock, while the threads' lock is still active. Now, the question is, why is this implementation slower for nprocs=1 ie Bouwe's lock is indeed None, and the work is done over one process that threads as before?

@valeriupredoi
Copy link
Contributor

I'd make 100% sure Bouwe's lock is indeed None for the nprocs=1 case, otherwise the scheduler is doing something funky; print that out and measure the times between acquire/release, I can't think of anything else here, why the difference in time; unless you ran on a shared node, and not on your own machine

@valeriupredoi
Copy link
Contributor

proves out I was waffling about the scheduler lock, that is not set unless in _run_parallel which is not called unles max parallel tasks > 1. So for a small recipe, with only three sequential tasks, ie the Python example, all looks good:

with Bouwe

COMPUTE_WITH_PROGRESS 0.30625486373901367
COMPUTE_WITH_PROGRESS 0.10400819778442383
COMPUTE_WITH_PROGRESS 0.7586956024169922
Time for running the recipe was: 0:00:16.538996

COMPUTE_WITH_PROGRESS 0.305743932723999
COMPUTE_WITH_PROGRESS 0.10391664505004883
COMPUTE_WITH_PROGRESS 0.8586757183074951
Time for running the recipe was: 0:00:17.099344

without Bouwe

COMPUTE_WITH_PROGRESS 0.30536580085754395
COMPUTE_WITH_PROGRESS 0.10322737693786621
COMPUTE_WITH_PROGRESS 0.764275312423706
Time for running the recipe was: 0:00:15.638896

COMPUTE_WITH_PROGRESS 0.30529236793518066
COMPUTE_WITH_PROGRESS 0.10352373123168945
COMPUTE_WITH_PROGRESS 0.7534520626068115
Time for running the recipe was: 0:00:16.489703

It's also crazy to see that the actual 3 preproc tasks take 1.2s but the entire preproc takes about 11s (crazy overheads on IO)

Manu, I am wondering about those times you posted, I am thinking something funny is going on in IO land, for some bizarre reason, could you maybe measure the times it takes _compute_with_progress like I did here?

@bouweandela am loving that progress bar, man! 🍺

@schlunma
Copy link
Contributor

schlunma commented Dec 12, 2024

Okay, I retested this with --max-parallel-tasks=1. Turns out that the run times vary A LOT. All of my tests use the same dask distributed settings and an entire compute node on Levante (which is just used by me, no other users). Here are the results:

  • current main:
    • run 1: 2:18:42
    • run 2: 2:56:50
    • run mentioned above: 1:28:00
  • this PR:
    • run 1: 2:05:45
    • run mentioned above: 2:20:00

I am starting to believe that this is actually a Levante issue...

@valeriupredoi
Copy link
Contributor

yeh that's why I'd measure just the compute times, Manu - all sorts of issues with IO via a busy FS can happen, even if you are the sole user of the system 🍺

@bouweandela
Copy link
Member Author

Okay, I retested this with --max-parallel-tasks=1. Turns out that the run times vary A LOT.

Great, that means this pull request is good to go for the distributed scheduler, right? In my experience, Levante is most responsive after 9 PM or on weekends.. but usually the variation is not this big.

I've disabled the lock for the default scheduler in 1d4e23f and will open a new pull request to try to improve on the issue reported in #2609 with the default scheduler.

@valeriupredoi
Copy link
Contributor

after 9 PM or on weekends

maybe, but those are the best times to watch Top Gun Maverick too, or sleep, if one had a few too many pints on Friday 😁

Copy link
Contributor

@valeriupredoi valeriupredoi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on my limited testing, and reading through code changes, all looks spiffy to me (at least) 🍺

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask related to improvements using Dask
Projects
None yet
Development

Successfully merging this pull request may close these issues.

max_parallel_tasks and Dask
3 participants