[Exploration]: How Dask works and how it is utilized in xarray #376
Replies: 5 comments
-
Here's a small intro from what I can recall when I was working with Xarray and Dask a lot. Most of my experience with these libraries came from working with ESGF Compute Service. This service would translate WPS requests into Xarray DAG and then execute on a Dask Cluster that was allocated using Dask Gateway. This service also tried to utilize Xarray's ability to read Zarr formatted datasets off of S3 stores to improve throughput for parallelized operations. Here's a quick intro to Dask. Anything built with a dask array, bag, dataframe, delayed or future is turned into a task graph, the scheduler can optimize the graph and finally assign the tasks to workers. To answer the first question, the communication depends on the scheduler. There's either a single-machine or distributed scheduler. For single-machine you have single thread, multi-threaded or processes. Multi-threaded is pretty straight forward as it can use shared variables in it's thread pool, but processes actually uses cloudpickle to serial/deserialize messages/data passed between processes. The pattern of serialize/deserialize message/data is the same used when using distributed for local/remote clusters. In my experience chunking is recommended when dealing with out-of-core operations. I remember losing performance with small datasets and chunking with a Local Cluster due to the communication overhead. Chunking works best when you have an independent variable e.g. if you're averaging over time you could chunk by lat, lon, lev or some combination. You can still benefit from chunking even if some of the tasks are not operating on an independent variable e.g. building large task graphs. An issue I ran into when working on the Compute service was using |
Beta Was this translation helpful? Give feedback.
-
Here are some related links.
|
Beta Was this translation helpful? Give feedback.
-
Exploring whether the Xarray Groupby API sequential or parallelOverviewWe use the Background
It is good to know that I found the related xarray issue: pydata/xarray#2852. Comments from that issue:
Action Items:
ConclusionAll the step are sequential. In Xarray < 2022.06.0, the groupby and resampling operations are sequential (refer to notes below).
Other workarounds:
Next steps:Xarray >= v2022.0.6.0 includes
We should experiment with |
Beta Was this translation helpful? Give feedback.
-
Explore where we explicitly load Dask arrays into memory for sequential operationsBackgroundIn xarray, Dask arrays are not loaded into memory unexpectedly (an exception is raised instead). In xcdat, we load Dask arrays into memory in specific spots.
Action Items
ConclusionxCDAT loads Dask arrays into memory when performing operations or computations using multi-dimensional arrays, specifically coordinate bounds. As of xCDAT loads coordinate bounds into memory in the following APIs during specific situations:
|
Beta Was this translation helpful? Give feedback.
-
Experiment with bounds and edges:
|
Beta Was this translation helpful? Give feedback.
-
This post explores the internals of Dask for us to get a better understanding of how it works. We will also explore how Dask is utilized in xarray, and when to
chunk
xarray Datasets using Dask.Dask Array Best Practices
https://docs.dask.org/en/stable/array-best-practices.html#best-practices
Chunking Best Practices
Other performance factors:
How do chunks communicate with one another?
Chunks communicate using indexing.
NOTE: Xarray docs say Dask commute by index is not yet implemented so grouping and resampling is not optimized.
This documentation is partially outdated because pydata/xarray#5734 is now merged, which addresses the performance issues with
groupby()
and multi-file datasets.Additional Questions:
Resources:
Beta Was this translation helpful? Give feedback.
All reactions