Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Genetics data IO performance stats/doc #437

Open
ravwojdyla opened this issue Jan 12, 2021 · 9 comments
Open

Genetics data IO performance stats/doc #437

ravwojdyla opened this issue Jan 12, 2021 · 9 comments
Labels
documentation Improvements or additions to documentation IO Issues related to reading and writing common third-party file formats performance

Comments

@ravwojdyla
Copy link
Collaborator

This is a dump of some of the performance experiments. It's part of a larger issue of performance setup and best practices for dask/sgkit and genetic data. The goal is to share the findings and continue the discussion.

Where not otherwise stated, the test machine is a GCE VM, 16 cores and 64GB of memory, 400 SPD. Dask cluster is a single node process based. If the data is read from GCS, the bucket is in the same region as the VM:

Specs/libs
~ uname -a
Linux rav-dev 4.19.0-13-cloud-amd64 #1 SMP Debian 4.19.160-2 (2020-11-28) x86_64 GNU/Linux~ conda list
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       1_gnu    conda-forge
aiohttp                   3.7.2            py38h1e0a361_0    conda-forge
appdirs                   1.4.4                    pypi_0    pypi
argon2-cffi               20.1.0           py38h1e0a361_2    conda-forge
asciitree                 0.3.3                    pypi_0    pypi
async-timeout             3.0.1                   py_1000    conda-forge
async_generator           1.10                       py_0    conda-forge
atk                       2.36.0               ha770c72_4    conda-forge
atk-1.0                   2.36.0               h0d5b62e_4    conda-forge
attrs                     20.2.0             pyh9f0ad1d_0    conda-forge
backcall                  0.2.0              pyh9f0ad1d_0    conda-forge
backports                 1.0                        py_2    conda-forge
backports.functools_lru_cache 1.6.1                      py_0    conda-forge
bleach                    3.2.1              pyh9f0ad1d_0    conda-forge
blinker                   1.4                        py_1    conda-forge
bokeh                     2.2.3            py38h578d9bd_0    conda-forge
brotlipy                  0.7.0           py38h8df0ef7_1001    conda-forge
ca-certificates           2020.11.8            ha878542_0    conda-forge
cachetools                4.1.1                      py_0    conda-forge
cairo                     1.16.0            h488836b_1006    conda-forge
cbgen                     0.1.6                    pypi_0    pypi
certifi                   2020.11.8        py38h578d9bd_0    conda-forge
cffi                      1.14.3           py38h1bdcb99_1    conda-forge
chardet                   3.0.4           py38h924ce5b_1008    conda-forge
click                     7.1.2              pyh9f0ad1d_0    conda-forge
cloudpickle               1.6.0                      py_0    conda-forge
cryptography              3.2.1            py38h7699a38_0    conda-forge
cython                    0.29.21                  pypi_0    pypi
cytoolz                   0.11.0           py38h25fe258_1    conda-forge
dask                      2.30.0                     py_0    conda-forge
dask-core                 2.30.0                     py_0    conda-forge
dask-glm                  0.2.0                    pypi_0    pypi
dask-ml                   1.7.0                    pypi_0    pypi
decorator                 4.4.2                      py_0    conda-forge
defusedxml                0.6.0                      py_0    conda-forge
distributed               2.30.0                   pypi_0    pypi
entrypoints               0.3             py38h32f6830_1002    conda-forge
expat                     2.2.9                he1b5a44_2    conda-forge
fasteners                 0.15                     pypi_0    pypi
font-ttf-dejavu-sans-mono 2.37                 hab24e00_0    conda-forge
font-ttf-inconsolata      2.001                hab24e00_0    conda-forge
font-ttf-source-code-pro  2.030                hab24e00_0    conda-forge
font-ttf-ubuntu           0.83                 hab24e00_0    conda-forge
fontconfig                2.13.1            h7e3eb15_1002    conda-forge
fonts-conda-ecosystem     1                             0    conda-forge
fonts-conda-forge         1                             0    conda-forge
freetype                  2.10.4               h7ca028e_0    conda-forge
fribidi                   1.0.10               h36c2ea0_0    conda-forge
fsspec                    0.8.4                      py_0    conda-forge
gcsfs                     0.7.1                      py_0    conda-forge
gdk-pixbuf                2.42.0               h0536704_0    conda-forge
gettext                   0.19.8.1          hf34092f_1004    conda-forge
giflib                    5.2.1                h36c2ea0_2    conda-forge
gil-load                  0.4.0                    pypi_0    pypi
glib                      2.66.3               h58526e2_0    conda-forge
gobject-introspection     1.66.1           py38h4eacb9c_3    conda-forge
google-auth               1.23.0             pyhd8ed1ab_0    conda-forge
google-auth-oauthlib      0.4.2              pyhd8ed1ab_0    conda-forge
graphite2                 1.3.13            h58526e2_1001    conda-forge
graphviz                  2.42.3               h6939c30_2    conda-forge
gtk2                      2.24.32              h194ddfc_3    conda-forge
gts                       0.7.6                h17b2bb4_1    conda-forge
harfbuzz                  2.7.2                ha5b49bf_1    conda-forge
heapdict                  1.0.1                      py_0    conda-forge
icu                       67.1                 he1b5a44_0    conda-forge
idna                      2.10               pyh9f0ad1d_0    conda-forge
importlib-metadata        2.0.0                      py_1    conda-forge
importlib_metadata        2.0.0                         1    conda-forge
iniconfig                 1.1.1                    pypi_0    pypi
ipykernel                 5.3.4            py38h1cdfbd6_1    conda-forge
ipython                   7.19.0           py38h81c977d_0    conda-forge
ipython_genutils          0.2.0                      py_1    conda-forge
ipywidgets                7.5.1                    pypi_0    pypi
jedi                      0.17.2           py38h32f6830_1    conda-forge
jinja2                    2.11.2             pyh9f0ad1d_0    conda-forge
joblib                    0.17.0                   pypi_0    pypi
jpeg                      9d                   h36c2ea0_0    conda-forge
json5                     0.9.5              pyh9f0ad1d_0    conda-forge
jsonschema                3.2.0                      py_2    conda-forge
jupyter-server-proxy      1.5.0                      py_0    conda-forge
jupyter_client            6.1.7                      py_0    conda-forge
jupyter_core              4.6.3            py38h32f6830_2    conda-forge
jupyterlab                2.2.9                      py_0    conda-forge
jupyterlab_pygments       0.1.2              pyh9f0ad1d_0    conda-forge
jupyterlab_server         1.2.0                      py_0    conda-forge
lcms2                     2.11                 hcbb858e_1    conda-forge
ld_impl_linux-64          2.35                 h769bd43_9    conda-forge
libblas                   3.9.0                3_openblas    conda-forge
libcblas                  3.9.0                3_openblas    conda-forge
libffi                    3.2.1             he1b5a44_1007    conda-forge
libgcc-ng                 9.3.0               h5dbcf3e_17    conda-forge
libgfortran-ng            9.3.0               he4bcb1c_17    conda-forge
libgfortran5              9.3.0               he4bcb1c_17    conda-forge
libglib                   2.66.3               hbe7bbb4_0    conda-forge
libgomp                   9.3.0               h5dbcf3e_17    conda-forge
libiconv                  1.16                 h516909a_0    conda-forge
liblapack                 3.9.0                3_openblas    conda-forge
libopenblas               0.3.12          pthreads_h4812303_1    conda-forge
libpng                    1.6.37               h21135ba_2    conda-forge
libsodium                 1.0.18               h516909a_1    conda-forge
libstdcxx-ng              9.3.0               h2ae2ef3_17    conda-forge
libtiff                   4.1.0                h4f3a223_6    conda-forge
libtool                   2.4.6             h58526e2_1007    conda-forge
libuuid                   2.32.1            h14c3975_1000    conda-forge
libwebp                   1.1.0                h76fa15c_4    conda-forge
libwebp-base              1.1.0                h36c2ea0_3    conda-forge
libxcb                    1.13              h14c3975_1002    conda-forge
libxml2                   2.9.10               h68273f3_2    conda-forge
llvmlite                  0.34.0                   pypi_0    pypi
locket                    0.2.0                      py_2    conda-forge
lz4-c                     1.9.2                he1b5a44_3    conda-forge
markupsafe                1.1.1            py38h8df0ef7_2    conda-forge
mistune                   0.8.4           py38h1e0a361_1002    conda-forge
monotonic                 1.5                      pypi_0    pypi
msgpack-python            1.0.0            py38h82cb98a_2    conda-forge
multidict                 4.7.5            py38h1e0a361_2    conda-forge
multipledispatch          0.6.0                    pypi_0    pypi
nbclient                  0.5.1                      py_0    conda-forge
nbconvert                 6.0.7            py38h32f6830_2    conda-forge
nbformat                  5.0.8                      py_0    conda-forge
ncurses                   6.2                  he1b5a44_2    conda-forge
nest-asyncio              1.4.1                      py_0    conda-forge
notebook                  6.1.4            py38h32f6830_1    conda-forge
numba                     0.51.2                   pypi_0    pypi
numcodecs                 0.7.2                    pypi_0    pypi
numpy                     1.19.3                   pypi_0    pypi
oauthlib                  3.0.1                      py_0    conda-forge
olefile                   0.46               pyh9f0ad1d_1    conda-forge
openssl                   1.1.1h               h516909a_0    conda-forge
packaging                 20.4               pyh9f0ad1d_0    conda-forge
pandas                    1.1.4                    pypi_0    pypi
pandoc                    2.11.0.4             hd18ef5c_0    conda-forge
pandocfilters             1.4.2                      py_1    conda-forge
pango                     1.42.4               h69149e4_5    conda-forge
parso                     0.7.1              pyh9f0ad1d_0    conda-forge
partd                     1.1.0                      py_0    conda-forge
pcre                      8.44                 he1b5a44_0    conda-forge
pexpect                   4.8.0              pyh9f0ad1d_2    conda-forge
pickleshare               0.7.5                   py_1003    conda-forge
pillow                    8.0.1            py38h70fbd49_0    conda-forge
pip                       20.2.4                     py_0    conda-forge
pixman                    0.38.0            h516909a_1003    conda-forge
pluggy                    0.13.1                   pypi_0    pypi
pooch                     1.2.0                    pypi_0    pypi
prometheus_client         0.8.0              pyh9f0ad1d_0    conda-forge
prompt-toolkit            3.0.8                      py_0    conda-forge
psutil                    5.7.3            py38h8df0ef7_0    conda-forge
pthread-stubs             0.4               h14c3975_1001    conda-forge
ptyprocess                0.6.0                   py_1001    conda-forge
py                        1.9.0                    pypi_0    pypi
py-spy                    0.3.3                    pypi_0    pypi
pyasn1                    0.4.8                      py_0    conda-forge
pyasn1-modules            0.2.7                      py_0    conda-forge
pycparser                 2.20               pyh9f0ad1d_2    conda-forge
pygments                  2.7.2                      py_0    conda-forge
pyjwt                     1.7.1                      py_0    conda-forge
pyopenssl                 19.1.0                     py_1    conda-forge
pyparsing                 2.4.7              pyh9f0ad1d_0    conda-forge
pyrsistent                0.17.3           py38h1e0a361_1    conda-forge
pysocks                   1.7.1            py38h924ce5b_2    conda-forge
pytest                    6.1.2                    pypi_0    pypi
python                    3.8.6           h852b56e_0_cpython    conda-forge
python-dateutil           2.8.1                      py_0    conda-forge
python-graphviz           0.15                     pypi_0    pypi
python_abi                3.8                      1_cp38    conda-forge
pytz                      2020.1                   pypi_0    pypi
pyyaml                    5.3.1                    pypi_0    pypi
pyzmq                     19.0.2           py38ha71036d_2    conda-forge
readline                  8.0                  he28a2e2_2    conda-forge
rechunker                 0.3.1                    pypi_0    pypi
requests                  2.24.0             pyh9f0ad1d_0    conda-forge
requests-oauthlib         1.3.0              pyh9f0ad1d_0    conda-forge
rsa                       4.6                pyh9f0ad1d_0    conda-forge
scikit-learn              0.23.2                   pypi_0    pypi
scipy                     1.5.3                    pypi_0    pypi
send2trash                1.5.0                      py_0    conda-forge
setuptools                49.6.0           py38h924ce5b_2    conda-forge
sgkit                     0.1.dev290+gb81de07          pypi_0    pypi
simpervisor               0.3                        py_1    conda-forge
six                       1.15.0             pyh9f0ad1d_0    conda-forge
sortedcontainers          2.2.2                    pypi_0    pypi
sqlite                    3.33.0               h4cf870e_1    conda-forge
tblib                     1.7.0                    pypi_0    pypi
terminado                 0.9.1            py38h32f6830_1    conda-forge
testpath                  0.4.4                      py_0    conda-forge
threadpoolctl             2.1.0                    pypi_0    pypi
tk                        8.6.10               hed695b0_1    conda-forge
toml                      0.10.2                   pypi_0    pypi
toolz                     0.11.1                     py_0    conda-forge
tornado                   6.1              py38h25fe258_0    conda-forge
traitlets                 5.0.5                      py_0    conda-forge
typing-extensions         3.7.4.3                       0    conda-forge
typing_extensions         3.7.4.3                    py_0    conda-forge
urllib3                   1.25.11                    py_0    conda-forge
wcwidth                   0.2.5              pyh9f0ad1d_2    conda-forge
webencodings              0.5.1                      py_1    conda-forge
wheel                     0.35.1             pyh9f0ad1d_0    conda-forge
widgetsnbextension        3.5.1                    pypi_0    pypi
xarray                    0.16.1                   pypi_0    pypi
xorg-kbproto              1.0.7             h14c3975_1002    conda-forge
xorg-libice               1.0.10               h516909a_0    conda-forge
xorg-libsm                1.2.3             h84519dc_1000    conda-forge
xorg-libx11               1.6.12               h516909a_0    conda-forge
xorg-libxau               1.0.9                h14c3975_0    conda-forge
xorg-libxdmcp             1.1.3                h516909a_0    conda-forge
xorg-libxext              1.3.4                h516909a_0    conda-forge
xorg-libxpm               3.5.13               h516909a_0    conda-forge
xorg-libxrender           0.9.10            h516909a_1002    conda-forge
xorg-libxt                1.1.5             h516909a_1003    conda-forge
xorg-renderproto          0.11.1            h14c3975_1002    conda-forge
xorg-xextproto            7.3.0             h14c3975_1002    conda-forge
xorg-xproto               7.0.31            h14c3975_1007    conda-forge
xz                        5.2.5                h516909a_1    conda-forge
yaml                      0.2.5                h516909a_0    conda-forge
yarl                      1.6.2            py38h1e0a361_0    conda-forge
zarr                      2.5.0                    pypi_0    pypi
zeromq                    4.3.3                he1b5a44_2    conda-forge
zict                      2.0.0                    pypi_0    pypi
zipp                      3.4.0                      py_0    conda-forge
zlib                      1.2.11            h516909a_1010    conda-forge

The issue with suboptimal saturation was originally reported for this code:

import fsspec
import xarray as xr
from sgkit.io.bgen.bgen_reader import unpack_variables
from dask.diagnostics import ProgressBar, ResourceProfiler, Profiler

path = "gs://foobar/data.zarr"
store = fsspec.mapping.get_mapper(path, check=False, create=False)
ds = xr.open_zarr(store, concat_characters=False, consolidated=False)
ds = unpack_variables(ds, dtype='float16')

ds["variant_dosage_std"] = ds["call_dosage"].astype("float32").std(dim="samples")
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
    ds['variant_dosage_std'] = ds['variant_dosage_std'].compute()

With local input, performance graph:

bokeh_plot (1)

It's pretty clear the cores are well saturated. I also measure GIL, GIL was held for 13% of time and waited on for 2.1%, with each worker thread (16 threads) holding it for 0.7% and waiting for 0.1% of time.

For GCS input (via fsspec):

bokeh_plot (2)

GIL summary: GIL was held for 18% of time and waited on for 3.8%, with each worker thread (16 threads) holding it for 0.6% and waiting for 0.2% of time, with one thread holding GIL for 6.5% and waiting 1.6% time.

held: 0.186 (0.191, 0.187, 0.186)
wait: 0.038 (0.046, 0.041, 0.039)
  <140287451305792>
    held: 0.015 (0.029, 0.017, 0.015)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140284185433856>
    held: 0.065 (0.061, 0.064, 0.065)
    wait: 0.016 (0.015, 0.017, 0.016)
  <140284540389120>
    held: 0.0 (0.0, 0.0, 0.0)
    wait: 0.0 (0.0, 0.0, 0.0)
  <140284590728960>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140284599121664>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.001)
  <140284759570176>
    held: 0.006 (0.008, 0.007, 0.007)
    wait: 0.002 (0.001, 0.001, 0.002)
  <140284751177472>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.001, 0.002)
  <140283956950784>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
  <140283948558080>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
  <140283940165376>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140283931772672>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.002, 0.002)
  <140283923379968>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.002, 0.002)
  <140283914987264>
    held: 0.006 (0.007, 0.007, 0.007)
    wait: 0.002 (0.001, 0.002, 0.002)
  <140283295561472>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140283287168768>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140283278776064>
    held: 0.006 (0.006, 0.007, 0.006)
    wait: 0.002 (0.002, 0.002, 0.002)
  <140283270383360>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.001, 0.001, 0.001)
  <140283261990656>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.001 (0.002, 0.002, 0.001)
  <140283253597952>
    held: 0.006 (0.006, 0.006, 0.006)
    wait: 0.002 (0.001, 0.001, 0.001)
  <140283245205248>
    held: 0.0 (0.0, 0.0, 0.0)
    wait: 0.0 (0.0, 0.0, 0.0)
  <140282691581696>
    held: 0.001 (0.0, 0.001, 0.001)
    wait: 0.001 (0.001, 0.001, 0.001)
  <140282683188992>
    held: 0.002 (0.002, 0.002, 0.002)
    wait: 0.001 (0.001, 0.001, 0.001)
  <140282674796288>
    held: 0.001 (0.001, 0.001, 0.001)
    wait: 0.003 (0.012, 0.004, 0.003)

It's clear that the CPU usage is lower, and not fully saturated, GIL wait time is a bit up (with a concerning spike in one thread). With remote/fsspec input, we have the overhead of data decryption and potential network IO overhead (tho it doesn't seem like we hit network limits).

@ravwojdyla
Copy link
Collaborator Author

Some doc for fsspec:

  • most of file object implementation use AbstractBufferedFile which is a buffered implementation. Default block size is 5MiB, and buffer implementation is readahead which is best suited for sequential reads (keeps only a single/current block in memory, and read next block when the control reaches the block boundary)
  • there are 5 buffer implementations in fsspec (doc):
    • BaseCache: no buffering/caching
    • ReadAheadCache : read a block size when the reader crosses the block boundary, keeps one block in memory
    • MMapCache: uses temporary file memory mapped file, which is filled blocks-wise when data is requested
    • BlockCache: data read blocksize at a time, and are stored in an LRU cache
    • BytesCache: read-ahead by the block size, for semi-random reads progressing through the file
  • LocalFileSystem uses local open buffering, usually 4-8KiB block size

This is a partial output of the method profile on the code above (https://github.com/pystatgen/sgkit/issues/437#issue-784385179):

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
 40.00%  40.00%    3811s     3811s   npy_floatbits_to_halfbits (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    2593s     5033s   npy_half_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%    2269s     2269s   npy_halfbits_to_floatbits (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    1825s     5854s   HALF_add (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 40.00%  80.00%    1770s     3848s   _aligned_contig_cast_float_to_half (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%  20.00%    1720s     5337s   HALF_multiply (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    1393s     1413s   PyArray_Where (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%   847.4s     1521s   _aligned_contig_cast_half_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   844.4s    844.4s   0x7fe616f5fdf0 (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   757.2s    757.2s   BOOL_logical_or (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%   752.4s    752.4s   DOUBLE_subtract (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   687.0s    687.0s   _aligned_contig_cast_ubyte_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)

Unsurprising we get numpy methods, what is interesting and can't be seen here, are the sporadic spikes in CPU usage, looking at the network IO:

11:42:17        IFACE   rxpck/s   txpck/s    rxkB/s    txkB/s   rxcmp/s   txcmp/s  rxmcst/s   %ifutil
11:42:18         ens4     64.00     64.00     11.58     20.74      0.00      0.00      0.00      0.00
11:42:19         ens4    397.00     70.00   4581.27     14.38      0.00      0.00      0.00      0.00
11:42:20         ens4    146.00     67.00   1501.01     15.61      0.00      0.00      0.00      0.00
11:42:21         ens4    276.00    103.00   2982.42     27.17      0.00      0.00      0.00      0.00
11:42:22         ens4    235.00     91.00   4396.02     19.31      0.00      0.00      0.00      0.00
11:42:23         ens4     43.00     43.00      6.50     12.35      0.00      0.00      0.00      0.00

There are also spikes of network IO, which could suggest that we should look into smoothing out the network buffering. This reminded me about some stats I once stumbled upon in Beam codebase (here):

# This is the size of each partial-file read operation from GCS.  This
# parameter was chosen to give good throughput while keeping memory usage at
# a reasonable level; the following table shows throughput reached when
# reading files of a given size with a chosen buffer size and informed the
# choice of the value, as of 11/2016:
#
# +---------------+------------+-------------+-------------+-------------+
# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
# +---------------+------------+-------------+-------------+-------------+
# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
# +---------------+------------+-------------+-------------+-------------+
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024

So Beam is using 16MiB read buffer. But it's also using Google's storage API client (python-storage), whilst fsspec calls GCS API directly via fsspec implementation of GCS gcsfs and there definitely are implementation differences between those two and thus the consequences of buffer size might be different.

@ravwojdyla
Copy link
Collaborator Author

ravwojdyla commented Jan 12, 2021

In the original issue: I have tried fsspec block size of 16MiB, but the results look very similar to the 5MiB block size, which was somewhat suspicious, and turns out that the zarr data ("gs://foobar/data.zarr") is saved as a large number of small files, for example for the call_genotype_probability, there are 20570 chunks/objects with average object size of about 2MiB. This might be suboptimal for sequential reads.

As a general rule, when you download large objects within a Region from Amazon S3 to Amazon EC2, we suggest making concurrent requests for byte ranges of an object at the granularity of 8–16 MB.

From S3 best practices.

The Pangeo project has found that chunk sizes in the range of 10–200MB work well with Dask reading from Zarr data in cloud storage

From cloud performant reading of netcdf4 hdf5 data using the zarr library.

Interesting/related issue "File Chunk Store" zarr-python#556.

To validate the impact of the chunk size, let's rechunked the call_genotype arrays, specifically call_genotype_probability array from 20570 chunks (chunk size: (5216, 5792, 2), avg. GCS object size 2MiB, average in memory chunk size: 230MiB) to 2349 chunks (chunk size: (15648, 17376, 2), avg. GCS object size 17MiB, average in memory chunk size: 2GiB):

Performance graph:

bokeh_plot (3)

There is a cost to it tho, visible in the memory usage, notice that compressed chunk of 17MiB translates to 2GiB in memory (vs 2MiB -> 230MiB), this bumps the memory usage roughly from 6GB to 40GB.

We could look into investigating the cost of reading zarr data in smaller chunks (then it was written as). This is a profile of the computation with zarr chunks (15648, 17376, 2) read as (5216, 5792, 2) via xarray.open_zarr:

bokeh_plot (4)

Notice:

  • memory usage is much less, akin to the usage with zarr chunks at (5216, 5792, 2), which is expected
  • the whole computation takes much longer about 1.7h vs 1.3h, which likely due to the extra cost of rereading zarr chunks to split into smaller chunks for xarray
  • when zarr reads subset of a chunk, it needs to fetch full compressed chunk, decompress and then retrieve part of it, see below:

Say we have a random float zarr array shape (10_000, 100_000) saved on local disk, with chunk shape (10_000, 10_000), thus 10 chunks of size ~660MiB on disk (compressed).

We retrieve a single element:

zs = zarr.open("/tmp/zarr", mode="r")
print(zs.foo[0, 0])

Zarr will:

  • read full compressed ~660M into memory
  • decompress the full chunk, whilst keeping the compressed chunk on the side (thus memory usage is right now full compressed chunk + decompressed data), in case of random floats ~2x (1.4G)
  • then slicing happens to select the 1st element of the full decompressed data
  • 1st element is returned and memory (in this case 1.4G) is freed up

So overall the lesson from this exercise is that - slicing zarr in a way that doesn't align with the native chunks is inefficient as a common step in pipelines.

Zarr issues for partial chunk reads:

Highlights:

some compressors also support partial decompression which would allow for extracting out part of a compressed chunk (e.g. the blosc_getitem method in Blosc)

  • it sounds like even with partial decompress, we will need to fetch full compressed file

@ravwojdyla
Copy link
Collaborator Author

Some experiments that incorporate xarray + zarr + dask:

TLDR

  • dask uses slicing to fetch data from underlying "array"
  • zarr won't use async chunk fetching if dask slice/chunk aligns with zarr chunk
  • we can force zarr async chunk fetching if dask chunking multiplies zarr chunk shape
  • if dask chunking doesn't align with zarr chunking, performance degrades

Long story

Say we have a dataset:

fsmap = fsspec.get_mapper("/tmp/zarr")

ar = np.random.random((10_000, 100_000))
xar = xr.Dataset(data_vars={"foo": (("x", "y"), ar)})
xar.foo.encoding["chunks"] = (10_000, 10_000)

xar.to_zarr(fsmap)

This gives us a zarr store with one 2d array stored as 10 chunks (10_000, 10_000). Now, say we read this back via:

xar_back = xr.open_zarr(fsmap)

Dask graph looks like this:

Screenshot 2020-11-23 at 13 01 02

and an example of one zarr task definition:

('zarr-9c3a2bfde447a3cde69b39f8c28181f9', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-9c3a2bfde447a3cde69b39f8c28181f9',
   (slice(0, 10000, None), slice(0, 10000, None)))

Notes:

  • we get 10 tasks
  • 1 for each zarr chunk
  • each task reads data via slice (that perfectly fits the chunk shape)
  • this also means this graph does not leverage async multiget from zarr, since there is just one chunk to fetch per dask task

The question now is - how can we force async zarr multiget via dask.

  1. If we follow up xr.open_zarr(fsmap) with a rechunk, like this:
xar_back = xr.open_zarr(fsmap).chunk({"y": 20_000, "x": 10_000})

This won't work:
Screenshot 2020-11-23 at 13 09 34

Notes:

  • we still read each chunk
  • we do rechunking at later stage (which is even worse)
  1. But if we specify chunk at read time, like this:
xar_back = xr.open_zarr(fsmap, chunks={"y": 20_000, "x": 10_000})

we get:

Screenshot 2020-11-23 at 13 11 37

and an example of one zarr task definition:

('zarr-351e7d43530c734ce1fd649b79f9fc59', 0,0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-351e7d43530c734ce1fd649b79f9fc59',
   (slice(0, 10000, None), slice(0, 20000, None)))

Notes:

  • notice the slice definition on y is 20000 (which is double the chunk size on that dimension)
  • from previous comments, this will result in zarr potentially fetching 2 chunks asynchronously

Another interesting question is: how would dask react to chunking that doesn't align with zarr native chunks:

xar_back = xr.open_zarr(fsmap, chunks={"y": 12_500, "x": 10_000})

Notice that y now has 2_500 more elements than the chunk, so based on the comments above, dask will likely use slicing to request chunks, which in turn would delegate that to zarr slicing, and zarr would have to read 2 chunks for each dask task.

When we run the code above, xarray tells us:

/Users/rav/miniconda3/envs/tr-dev/lib/python3.8/site-packages/xarray/backends/zarr.py:695: UserWarning: Specified Dask chunks 12500 would separate Zarr chunk shape 10000 for dimension 'y'. This significantly degrades performance. Consider rechunking after loading instead. chunk_spec = get_chunk(name, var, chunks)

Let's see the graph:

Screenshot 2020-11-23 at 13 21 27

and an example of two zarr task definitions:

('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-c5cbd628b3db23f6ff80870f5d0d52ec',
   (slice(0, 10000, None), slice(0, 12500, None))),

  ('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 1):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-c5cbd628b3db23f6ff80870f5d0d52ec',
   (slice(0, 10000, None), slice(12500, 25000, None)))

Notes:

  • looking at the graph doesn't tell us much, we just see 8 tasks (8 because 10_000/12_500 = 8)
  • but looking at the zarr task definition we can see the slice definitions, and how they span across chunks
  • based on the comments above, dask tasks would use zarr to slice the data, and based on the slices zarr would need to fetch multiple chunks, in some cases to read just small parts, thus performance degrades

@ravwojdyla
Copy link
Collaborator Author

Experiments with simplified computation and random data.

We generate reasonably large float array 1e5 x 1e5 and save it in large chunks (400MB) and small chunks (4MB). Then run 3 experiments (many times 30 - 100):

  1. read the large chunks using native zarr chunks, this will read one 400MB chunk per task (aka 400/400/4)
  2. read the small chunks using native zarr chunks, this will read one 4MB chunk per task (aka 4/4/4)
  3. read the small chunks using large zarr chunks, this will read 100 small chunks concurrently (using zarr -> fsspec getitems) which accumulate to 400MB per task (aka 4/400/4)

In the (X/Y/Z) notation, X stands for chunk size at rest time (storage), Y is read chunk size (the size zarr will use to read data from storage, at this point the data is already in memory), Z stands for chunk size used for computation.

In each experiment we multiple the array by a scalar, and use 4MB chunking for this computation (this is to imitate that our problem requires certain chunk size that doesn't necessarily correspond to the zarr chunk size). We run the computations many times and measure time/memory/network. All data is stored on GCS (in the same region as the VM).

Dask cluster setup:

We are using distributed process pool based cluster on a single 16 core VM, with 16 workers/processes with 1 thread each. This is to avoid any GIL issues, and since the computation is embarrassingly parallel, unless there is some job stilling happening there is very little communication between workers. We run 2 rounds of computation to warm up the cluster. I have also tried a thread pool based cluster but was seeing GIL impact.

Results

Storage chunks IO chunks Computation chunks Min time Mean ± stdev Mean GCS BW
400MB 400MB 4MB 37.06 47.68 ± 17.35 774.45 MB/s
4MB 4MB 4MB 112.83 126.8 ± 10.19 287.62 MB/s
4MB 400MB 4MB 52.89 62.05 ± 15.13 572.71 MB/s

Notes:

  • Mean GCS BW (bandwidth) is over the period of all 30 repeats of each experiment
  • peak GCS BW was ~4GB/s

newplot (44)

And next a macro (much less accurate than the data used for the network histogram above) view of the CPU/IO (Network)/Memory usage for the cases (400/400/4) and (4/400/4). First we perform many 400/400/4 from 3pm until around 4:30, and then switch to 4/400/4 until the end.
Screenshot

Notes:

  • this is obviously an IO bound task
  • 400/400/4 has best network bandwidth and fastest computation
  • it arguable makes more sense to look at the min than average time for these experiment
  • 4/4/4 will have lowest memory footprint since it reads the least amount of data into memory

Conclusion: this validates that larger chunks result in better throughput even when compared with concurrent read of many small chunks. But reading large chunks (or many small chunks) comes at the cost of memory (as mentioned in the previous comments).

Code

Code: generate the data
def create_data(path, chunks):
    fs_map = gcs.get_mapper(path)

    ar_dsk = da.random.random(size=(100_000, 100_000), chunks=chunks).astype(np.float32)
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))

    with ProgressBar(), ResourceProfiler() as rprof:
        ar_xr.to_zarr(fs_map, mode="w")
Code: computations
def large_chunks(path):
    # 16MB block size
    gcs = gcsfs.GCSFileSystem(block_size=16 * 1024 * 1024)

    fs_map = gcs.get_mapper(path)

    # use zarr chunking
    ar_xr = xr.open_zarr(fs_map)
    # rechunk/split to smaller chunks
    ar_xr["foo"] = ar_xr.foo.chunk(dict(x=1_000, y=1_000))

    # Simple computation on part of the matrix
    dat = (ar_xr.foo * 2.0).data.persist()
    wait(dat)
    del dat

def small_chunks(path):
    # default block size of 5MB
    gcs = gcsfs.GCSFileSystem()

    fs_map = gcs.get_mapper(path)

    # read in many zarr chunks at the same time
    ar_xr = xr.open_zarr(fs_map, chunks=dict(x=10_000, y=10_000))
    # rechunk/split to smaller chunks
    ar_xr["foo"] = ar_xr.foo.chunk(dict(x=1_000, y=1_000))

    # Simple computation on part of the matrix
    dat = (ar_xr.foo * 2.0).data.persist()
    wait(dat)
    del dat

def native_chunks(path):
    # default block size of 5MB
    gcs = gcsfs.GCSFileSystem()

    fs_map = gcs.get_mapper(path)

    # use zarr chunking
    ar_xr = xr.open_zarr(fs_map)

    # Simple computation on part of the matrix
    dat = (ar_xr.foo * 2.0).data.persist()
    wait(dat)
    del dat

@ravwojdyla
Copy link
Collaborator Author

GCS stats:

  • reads 1GiB file in sequential reads
  • both VM and GCS bucket are in EU
  • using gcsfs 0.7.1 as the GCS client
    newplot (22)
    newplot (23)

@ravwojdyla
Copy link
Collaborator Author

TLDR thus far:

  • when dask.array fetches data it uses slicing on the underlying array implementation
  • fsspec has async "multi_get", but to see the benefits in the dask context, the task must slice across multiple FS objects
  • reading zarr data in chunks smaller than the native/storage chunk is suboptimal
  • genetics data is likely going to compress extremely well, resulting in small objects on storage, but large dense arrays in memory
  • large number of small chunks results in large number of dask tasks

Another discussion that came up is the memory impact on the performance, given that larger chunk will require more memory. afaiu by default in a dask cluster:

  • dask will use as many workers (W) as available nodes (1 worker per node)
  • dask worker uses as many "worker threads" (WT) as CPUs
  • each worker gets access to memory (M) available for all worker threads (WT)
  • so each WT gets about ~M/WT memory

In GCP, default VMs are divided into classes of machines like “standard” or “highmem”, in each class the memory to CPU ratio is constant, for standard class it's 4GB/CPU and for highmem it's 8GB/CPU. So in theory increasing the size of the VM within a class doesn’t affect the M/WT ratio. In the context of Spark or MR/YARN, a user would often set the executor/container memory and cores request per pipeline (each pipeline having different requirements), in the context of Dask the same thing can be achieved, but is arguably less explicit. There are worker resources and more recently layer annotations dask/dask#6701, but we can also use the cluster configuration to manipulate the M/WT ratio, by setting the number of WT (+ making sure the worker gets full memory, and potentially adjusting the number of thread available for numpy ops).

@jeromekelleher
Copy link
Collaborator

A lot to digest here, thanks for the great work @ravwojdyla!

@ravwojdyla ravwojdyla added documentation Improvements or additions to documentation IO Issues related to reading and writing common third-party file formats performance labels Jan 13, 2021
@ravwojdyla
Copy link
Collaborator Author

ravwojdyla commented Jan 13, 2021

And a way to connect these with #390, I personally would be very curious to see the #390 GWAS benchmark with:

@ravwojdyla
Copy link
Collaborator Author

ravwojdyla commented Jan 18, 2021

Based on the performance tests done above, here are some high level guidelines for dask performance experiments (this is a starting point, we might find a better home for this later, and potentially have someone from Dask review them):

  • disable spilling during performance tuning (for example: when you search for the right chunking scheme, spilling involves serde and IO, both are expensive, it's better to immediately fail spilling job since the chunking and/or cluster spec is likely suboptimal)
def get_dask_cluster(n_workers=1, threads_per_worker=None):
    dk.config.set({"distributed.worker.memory.terminate": False})
    workers_kwargs = {"memory_target_fraction": False,
                      "memory_spill_fraction": False,
                      "memory_pause_fraction": .9}
    return Client(n_workers=n_workers,
                  threads_per_worker=threads_per_worker,
                  **workers_kwargs)
  • use distributed cluster (and if there is a single worker and you perform non-numpy operations measure GIL, see gil_load, if you want to measure cluster communication overhead, you need more workers)
  • capture performance report, see diagnostics distributed
  • if you need high granularity of VM status use atop to record stats, remember to install netatop to capture network stats
  • if you read data from GCS, make sure your VM is in the same region as the data bucket

TODO:

  • add VM specs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation IO Issues related to reading and writing common third-party file formats performance
Projects
None yet
Development

No branches or pull requests

2 participants