Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pangeo / earth science workflows #770

Open
jrbourbeau opened this issue Apr 5, 2023 · 23 comments
Open

Pangeo / earth science workflows #770

jrbourbeau opened this issue Apr 5, 2023 · 23 comments
Labels
workflows Related to representative Dask user workflows

Comments

@jrbourbeau
Copy link
Member

There is a large portion of dask.array users who use Dask in geospatial / climatology / earth science / etc. use cases. We should make sure these use cases are represented here in our benchmarks.

@TomNicholas @dcherian we're trying to establish a set of benchmarks for common workloads faced by Dask users. Ideally these are more realistic and less toy examples. These get used day-to-day by engineers as they run AB tests on various changes that they want to make. Workloads in this suite will naturally be optimized over time. What are 1-3 workflows you'd most like to see run regularly as part of Dask's benchmark suite?

xref #725

@jrbourbeau jrbourbeau added the workflows Related to representative Dask user workflows label Apr 5, 2023
@jrbourbeau jrbourbeau mentioned this issue Apr 5, 2023
10 tasks
@dcherian
Copy link
Contributor

dcherian commented Apr 7, 2023

How would you like the code to be written/contributed?

  1. Idealized dataset?
  2. Presumably some kind of small/big version of the problem, controlled by a small number of parameters?
  3. Can xarray be a dependency?

I think some combo of groupby/groupby-anomaly (exercise shuffling, reductions); then rolling/rolling anomaly (to exercise map_overlap) is probably what we want.

@aulemahal / @huard do you have a prototype xclim workflow that would be good to test?

@jrbourbeau
Copy link
Member Author

How would you like the code to be written/contributed?

Ideally there's some Jupyter notebook somewhere that someone could point us to. Maybe you already have a notebook or two in mind?

Ultimately we want to add the workflow in this repo as a pytest-style test (here's an example of an existing embarrassingly parallel workflow). These tests are run regularly and used in AB tests as we're developing dask and distributed. Eventually these workflows will probably graduate into an example repo with notebooks that people could checkout, play with, and adapt to their own use cases.

Idealized dataset?
Presumably some kind of small/big version of the problem, controlled by a small number of parameters?

We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome. For example, that embarrassingly parallel workflow processes ~3 TB of data on AWS.

In terms of complexity we'd love something in between 20-200 lines of code. We're trying to strike a balance between these workflows being representative of what large Dask user groups actual do, while also making them approachable (i.e. we don't want people's eyes to glaze over when they see 1000 lines of code).

Can xarray be a dependency?

Yeah, definitely. Workflows using dask or xarray (backed by dask) are totally welcome.

@TomNicholas
Copy link

Hi @jrbourbeau - this sounds like a good idea.

We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome.

What about requester-pays data?

Can xarray be a dependency?

Yeah, definitely. Workflows using dask or xarray (backed by dask) are totally welcome.

How about xGCM or xhistogram? They both ultimately just wrap xarray / dask, but produce task graphs with structures that conventional xarray usage doesn't. (If not that's okay, it would just make a neat example trickier.)

@jbusecke might also have some suggestions

1 similar comment
@TomNicholas
Copy link

Hi @jrbourbeau - this sounds like a good idea.

We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome.

What about requester-pays data?

Can xarray be a dependency?

Yeah, definitely. Workflows using dask or xarray (backed by dask) are totally welcome.

How about xGCM or xhistogram? They both ultimately just wrap xarray / dask, but produce task graphs with structures that conventional xarray usage doesn't. (If not that's okay, it would just make a neat example trickier.)

@jbusecke might also have some suggestions

@jrbourbeau
Copy link
Member Author

What about requester-pays data?

Yeah, requester-pays is fine. We can spin up a cluster and pay for data access anywhere on AWS (and GCP soon). For example, that embarassingly parallel example I referenced earlier is processing ~3TB with requester_pays=True.

How about xGCM or xhistogram?

Good question. I'm inclined to stick with just dask and xarray dependencies at the moment as they're more broadly used

@aulemahal
Copy link

aulemahal commented Apr 11, 2023

@dcherian I don't think we have a xclim workflow that would be straightforward and simple enough for what I understand is the goal of this issue.

On the other hand, it would be easy to come up with one. Sticking with xarray and dask, I guess one could pull data from the CMIP6 AWS bucket and then reduce it with rolling and resample calls. In my experience, these two would be the main climate workflow bottlenecks ? The workflow would be scalable by parametrizing the size of the spatial subset and model selection.

I am imagining something like this https://medium.com/pangeo/easy-ipcc-part-1-multi-model-datatree-469b87cf9114, but stripped down to use only xarray and with a more complex function instead of temperature anomalies, like "yearly number of heat wave days", where "heat wave" is spells of N days above a given threshold. I take this example because it involves both a rolling and a resample.

@jbusecke
Copy link

I support @TomNicholas suggestions from above.
I think including some sort of vector calculus operator like div/curl would be incredibly valuable in the long run. This sort of operation usually works fine with toy examples but can turn out to be really tricky when running under real-world conditions due to:

  • combining several data_arrays of different size (e.g. full 4D array of velocities and a 2D array of cell area)
  • not reducing output dimensionality via aggregation (e.g. output data-arrays are the same size as input)
  • array padding, which increases the amount of comms between workers

@TomNicholas
Copy link

Yeah, requester-pays is fine. We can spin up a cluster and pay for data access anywhere on AWS (and GCP soon). For example, that embarassingly parallel example I referenced earlier is processing ~3TB with requester_pays=True.

Okay that's great.

@jrbourbeau can we specify the resources we want the workload to be tested on? i.e. specify that the total RAM available is larger than a chunk but smaller than the full calculation.

Good question. I'm inclined to stick with just dask and xarray dependencies at the moment as they're more broadly used

I think including some sort of vector calculus operator like div/curl would be incredibly valuable in the long run.

@jbusecke do you think we could make a simplified version of the first step of the hero calculation using just xarray & dask? I'm thinking we use LLC4320, but call xarray.pad explicitly then broadcast metrics across the larger time-dependent arrays. That should create a task graph that has the same gotchas but without importing xGCM. The histogram workload is maybe less important to capture because that literally just calls dask.blockwise.

@jrbourbeau
Copy link
Member Author

can we specify the resources we want the workload to be tested on?

Yeah, definitely. If you look here at the coiled.Cluster API docs, we can specify a specific AWS instance type or the amount of memory / CPU we want workers to have and Coiled will pick an appropriate instance type.

these two would be the main climate workflow bottlenecks
That should create a task graph that has the same gotchas

Just to clarify, for these workflows we're primarily interested in common operations that Pangeo folks want to do. In particular, I'm hopeful that these are representative in general so:

  1. If we want to make a change to dask or distributed, we'll be able to have a sense for what broad impact it might have on Pangeo folks
  2. If some geospatial / oceanographer / etc. person is curious about Dask and asks themself "Can Dask help me?", they'll be able to looks at these workflows and see something that looks familiar to what they'd want to do

If these common geospatial operations also happen to stress Dask and fail in some cases, then that's also really useful information. I just want to make sure that we include operations that are representative of what geospatial user groups often want to perform, not necessarily cases where Dask falls over.

@jrbourbeau
Copy link
Member Author

Looking through some of the existing pangeo example, I'm wondering if something like https://gallery.pangeo.io/repos/pangeo-gallery/physical-oceanography/01_sea-surface-height.html or https://gallery.pangeo.io/repos/pangeo-gallery/cmip6/global_mean_surface_temp.html would be considered representative. @dcherian @TomNicholas @jbusecke do those notebooks look like common operations the pangeo community cares about?

@dcherian
Copy link
Contributor

dcherian commented Apr 13, 2023

Those seem too "easy" at a quick glance.

Here's one pangeo-data/distributed-array-examples#2 . Gabe and I discussed this a bit in
pydata/xarray#6709

This point

I think there is a small but important increase in complexity here because we do ds.u.mean(), ds.v.mean(), (ds.u*ds.v).mean() all together so each chunk of ds.u and ds.v is used for two different outputs.

is definitely a common "feature" of Pangeo workloads.

EDIT: We could add some pad and diff operations to make it look like what Tom's talking about I think

@dcherian
Copy link
Contributor

dcherian commented Apr 13, 2023

This one was fixed by worker saturation for a single year of data. Not sure how well it does for all 60 years of data.

The example there uses 20 years, it's taking quite a while for the computation to start.

EDIT: This example takes a loooooooong time to show up on the dashboard. There are lots of tasks. But once it starts, it feels snappy, though I do see some white space in the task stream.
image

@dcherian
Copy link
Contributor

Finally a pangeo classic, anomalies with respect to a group mean: pangeo-data/distributed-array-examples#4 .

I don't have time to try it now, but excited to see what you coilies figure out :P

@jbusecke
Copy link

@jbusecke do you think we could make a simplified version of the first step of the hero calculation using just xarray & dask? I'm thinking we use LLC4320, but call xarray.pad explicitly then broadcast metrics across the larger time-dependent arrays. That should create a task graph that has the same gotchas but without importing xGCM. The histogram workload is maybe less important to capture because that literally just calls dask.blockwise.

Yeah I think that should be totally fine. I do not think that at this point there is a big difference in the computation via xgcm (because it all runs though apply_ufunc), you would basically just have to hardcode some of the things that xgcm does for you. So yes I think that is a reasonable way forward. I do unfortunately have limited time to work on this on my end though.

If these common geospatial operations also happen to stress Dask and fail in some cases, then that's also really useful information. I just want to make sure that we include operations that are representative of what geospatial user groups often want to perform, not necessarily cases where Dask falls over.

That makes sense, but I think that many of the 'typical cases' are also the ones were dask tends to fall over. So I think a simplified example as @TomNicholas suggests serves double duty here. I agree with @dcherian that the global means etc are too easy and not really all too 'geo' specific.

@mrocklin
Copy link
Member

Thanks folks. I'm curious, the benchmarks currently run on AWS. Are there AWS variants of the datasets linked to above by any chance? We can also run on GCP, it's just more flags/keywords to set and billing accounts to keep track of.

@mrocklin
Copy link
Member

If I take #770 (comment) from @dcherian above and swap out 20 years for 1 year it seems not terrible. It's about a 20s startup time (which I suspect may have as much to do with da.random.random as anything else).

It's pretty much entirely network bound:

Screen Shot 2023-04-13 at 1 17 46 PM

import pandas as pd
import xarray as xr

import dask.array
from dask.distributed import wait
import flox.xarray

dims = ("time", "level", "lat", "lon")
# nyears is number of years, adjust to make bigger, 
# full dataset is 60-ish years.
nyears = 1
shape = (nyears * 365 * 24, 37, 721, 1440)
chunks = (24, 1, -1, -1)

ds = xr.Dataset(
    {
        "U": (dims, dask.array.random.random(shape, chunks=chunks)),
        "V": (dims, dask.array.random.random(shape, chunks=chunks)),
        "W": (dims, dask.array.random.random(shape, chunks=chunks)),
        "T": (dims, dask.array.random.random(shape, chunks=chunks)),
    },
    coords={"time": pd.date_range("2001-01-01", periods=shape[0], freq="H")},
)
zonal_means = ds.mean("lon")
anomaly = ds - zonal_means

anomaly['uv'] = anomaly.U*anomaly.V
anomaly['vt'] = anomaly.V*anomaly.T
anomaly['uw'] = anomaly.U*anomaly.W

temdiags = zonal_means.merge(anomaly[['uv','vt','uw']].mean("lon"))

# note method="blockwise" uses flox
temdiags = temdiags.resample(time="D").mean(method="blockwise")

wait(temdiags.persist())

I'd be curious to learn if this is necessary network traffic (in which case we should improve our handling of network) or if it's unnecessary network traffic (in which case we should improve our task placement).

In either case it seems like a reasonable and easy benchmark to add. With 50 workers it looks like it'll take me 10 minutes or so (although this should probably decrease once we swap in the new hardware defaults). We can tune this down easily if we want to.

If I were to ask for an improvement on this it would be to run on a real dataset rather than something random.

@mrocklin
Copy link
Member

Well, not entirely network bound I guess. It's oddly balanced between network and CPU.

@mrocklin
Copy link
Member

Those seem too "easy" at a quick glance.

There are a couple of different overlapping goals here:

  1. Benchmarks so that Dask development is sensitive to Pangeo community needs over time
  2. Examples to show what's possible

These are different goals, but our hope is that these benchmarks/workflows can serve both. We hope that by making things more demo-able that that'll force us more towards what people actually do, and away from artifical examples that highlight a particular technical challenge (There is probably an analogy here between playing sports and targetting specific muscle groups while working out at a gym. We're looking for sports).

What I like about sea surface altitude is that the results are interpretable and pretty. It also makes us sensitive to issues in Zarr, reading from cloud storage, s3 flakiness, etc..

@dcherian a lot of your recent examples are great because they're easy to run, but they lack some of the breadth of the real-world examples. Thoughts?

I've only looked through the last few comments. I'm going to start working my way up the thread now.

@ntabris
Copy link
Member

ntabris commented Apr 13, 2023

Well, not entirely network bound I guess. It's oddly balanced between network and CPU.

I suspect a fair amount of the ~20% system cpu util is network-related, so most likely network bound and network also explains higher CPU util (but also CPU util is in there range where I don't think CPU is bottleneck)

@mrocklin
Copy link
Member

Also FWIW I probably disagree a little with @jrbourbeau about external dependencies. Mostly we want to make sure that the workload looks familiar to Pangeo users and represents their needs well. If tools like xGCM or xhistogram or flox aren't likely to bias our thinking then I don't really care about whether or not their active. If we find ourselves focusing a lot of engineering time one what ended up being an xGCM or xhistogram issue though then I think that we will have missed.

@mrocklin
Copy link
Member

@ntabris https://cloud.coiled.io/clusters/192751/details?account=dask-engineering if you want to go deeper

(please anyone else let me know if you want access to Coiled to look at metrics. It's easy to add you all to a team where we can look at this stuff together, or where you can run things)

@dcherian
Copy link
Contributor

What I like about sea surface altitude is that the results are interpretable and pretty. It also makes us sensitive to issues in Zarr, reading from cloud storage, s3 flakiness, etc..

@dcherian a lot of your recent examples are great because they're easy to run, but they lack some of the breadth of the real-world examples. Thoughts?

These are all based on real-world use cases but with random data swapped in for actual datasets. I can add those pieces back if you want "end-to-end" demos with pretty pictures.

  1. Pangeo / earth science workflows #770 (comment) stems from a user comment on xarray's issue tracker, but the data layout is like the "LLC" simulation that Tom/Julius are talking about.
  2. The second one is the ERA5 dataset (quite famous!). This is a real workflow but skips a bit where the ERA5 dataset gets regridded to a coarser model grid for comparison purposes. I can work with the original author to make some nice images.
  3. The third climatological mean is also ERA5, that one does actually hit the real dataset.

If I take #770 (comment) from @dcherian above and swap out 20 years for 1 year it seems not terrible.

Also on this one I noted a large increase in head node memory use before it started computing. That killed my cloud session a couple of times before I realized and then sized up.

@mrocklin
Copy link
Member

These are all based on real-world use cases but with random data swapped in for actual datasets. I can add those pieces back if you want "end-to-end" demos with pretty pictures.

Pretty pictures and also we get to see the impact of reading real data, which I suspect will be significant. For example, maybe there's some GIL-holding thing happening in computation which is affecting fsspec's ability to read data. We want to see those kinds of interactions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
workflows Related to representative Dask user workflows
Projects
None yet
Development

No branches or pull requests

7 participants