-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Genetics data IO performance stats/doc #437
Comments
Some doc for fsspec:
This is a partial output of the method profile on the code above (https://github.com/pystatgen/sgkit/issues/437#issue-784385179):
Unsurprising we get numpy methods, what is interesting and can't be seen here, are the sporadic spikes in CPU usage, looking at the network IO:
There are also spikes of network IO, which could suggest that we should look into smoothing out the network buffering. This reminded me about some stats I once stumbled upon in Beam codebase (here):
So Beam is using 16MiB read buffer. But it's also using Google's storage API client (python-storage), whilst fsspec calls GCS API directly via fsspec implementation of GCS gcsfs and there definitely are implementation differences between those two and thus the consequences of buffer size might be different. |
In the original issue: I have tried fsspec block size of 16MiB, but the results look very similar to the 5MiB block size, which was somewhat suspicious, and turns out that the zarr data ("gs://foobar/data.zarr") is saved as a large number of small files, for example for the
From S3 best practices.
From cloud performant reading of netcdf4 hdf5 data using the zarr library. Interesting/related issue "File Chunk Store" zarr-python#556. To validate the impact of the chunk size, let's rechunked the Performance graph: There is a cost to it tho, visible in the memory usage, notice that compressed chunk of 17MiB translates to 2GiB in memory (vs 2MiB -> 230MiB), this bumps the memory usage roughly from 6GB to 40GB. We could look into investigating the cost of reading zarr data in smaller chunks (then it was written as). This is a profile of the computation with zarr chunks Notice:
Say we have a random float zarr array shape We retrieve a single element: zs = zarr.open("/tmp/zarr", mode="r")
print(zs.foo[0, 0]) Zarr will:
So overall the lesson from this exercise is that - slicing zarr in a way that doesn't align with the native chunks is inefficient as a common step in pipelines. Zarr issues for partial chunk reads:
Highlights:
|
Some experiments that incorporate xarray + zarr + dask: TLDR
Long storySay we have a dataset: fsmap = fsspec.get_mapper("/tmp/zarr")
ar = np.random.random((10_000, 100_000))
xar = xr.Dataset(data_vars={"foo": (("x", "y"), ar)})
xar.foo.encoding["chunks"] = (10_000, 10_000)
xar.to_zarr(fsmap) This gives us a zarr store with one 2d array stored as 10 chunks (10_000, 10_000). Now, say we read this back via: xar_back = xr.open_zarr(fsmap) Dask graph looks like this: and an example of one zarr task definition:
Notes:
The question now is - how can we force async zarr multiget via dask.
xar_back = xr.open_zarr(fsmap).chunk({"y": 20_000, "x": 10_000}) Notes:
xar_back = xr.open_zarr(fsmap, chunks={"y": 20_000, "x": 10_000}) we get: and an example of one zarr task definition:
Notes:
Another interesting question is: how would dask react to chunking that doesn't align with zarr native chunks: xar_back = xr.open_zarr(fsmap, chunks={"y": 12_500, "x": 10_000}) Notice that When we run the code above, xarray tells us:
Let's see the graph: and an example of two zarr task definitions:
Notes:
|
TLDR thus far:
Another discussion that came up is the memory impact on the performance, given that larger chunk will require more memory. afaiu by default in a dask cluster:
In GCP, default VMs are divided into classes of machines like “standard” or “highmem”, in each class the memory to CPU ratio is constant, for |
A lot to digest here, thanks for the great work @ravwojdyla! |
And a way to connect these with #390, I personally would be very curious to see the #390 GWAS benchmark with:
|
Based on the performance tests done above, here are some high level guidelines for dask performance experiments (this is a starting point, we might find a better home for this later, and potentially have someone from Dask review them):
def get_dask_cluster(n_workers=1, threads_per_worker=None):
dk.config.set({"distributed.worker.memory.terminate": False})
workers_kwargs = {"memory_target_fraction": False,
"memory_spill_fraction": False,
"memory_pause_fraction": .9}
return Client(n_workers=n_workers,
threads_per_worker=threads_per_worker,
**workers_kwargs)
TODO:
|
This is a dump of some of the performance experiments. It's part of a larger issue of performance setup and best practices for dask/sgkit and genetic data. The goal is to share the findings and continue the discussion.
Where not otherwise stated, the test machine is a GCE VM, 16 cores and 64GB of memory, 400 SPD. Dask cluster is a single node process based. If the data is read from GCS, the bucket is in the same region as the VM:
Specs/libs
The issue with suboptimal saturation was originally reported for this code:
With local input, performance graph:
It's pretty clear the cores are well saturated. I also measure GIL, GIL was held for 13% of time and waited on for 2.1%, with each worker thread (16 threads) holding it for 0.7% and waiting for 0.1% of time.
For GCS input (via fsspec):
GIL summary: GIL was held for 18% of time and waited on for 3.8%, with each worker thread (16 threads) holding it for 0.6% and waiting for 0.2% of time, with one thread holding GIL for 6.5% and waiting 1.6% time.
It's clear that the CPU usage is lower, and not fully saturated, GIL wait time is a bit up (with a concerning spike in one thread). With remote/fsspec input, we have the overhead of data decryption and potential network IO overhead (tho it doesn't seem like we hit network limits).
The text was updated successfully, but these errors were encountered: