Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bottleneck in terms of time consuming of HWE benchmark #736

Closed
LiangdeLI opened this issue Oct 13, 2021 · 9 comments
Closed

Bottleneck in terms of time consuming of HWE benchmark #736

LiangdeLI opened this issue Oct 13, 2021 · 9 comments

Comments

@LiangdeLI
Copy link

LiangdeLI commented Oct 13, 2021

HWE benchmark is doing the similar things to GWAS tutorial. Because a bottleneck in sgkit dataset final selection, I can only run the benchmark on chromosome 21(201MB in original vcf) instead of chr1-22(14GB in original vcf).

Sgkit is doing:

ds = sg.variant_stats(ds)
ds = sg.hardy_weinberg_test(ds, alleles=2)
ds = ds.sel(variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6)))

Hail is doing:

mt = hl.variant_qc(mt)
mt = mt.filter_rows(mt.variant_qc.AF[1] > 0.01)
mt = mt.filter_rows(mt.variant_qc.p_value_hwe > 1e-6)

PLINK has flags:
--hardy --hwe 1e-6 --maf 0.01

The three lines of sgkit take (0.067585, 1.688614, 375.0212) seconds respectively, when using 16 core, sel() takes a lot and Hail's filter_rows() only take 0.010368 seconds.

Sgkit's runtime is dominating by xarray.Dataset.sel() function, and much more than Hail and PLINK take. Do you have an idea, what could be an equivalent but more efficient syntax to do the same selection here?

I research a bit about xarray, sel(), it seems to have been inefficient for years with an open issue (sel() call isel() inside). variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6)) is indexing with boolean arrays, and might not be the preferred way by sel().

@eric-czech
Copy link
Collaborator

One issue here is that I believe the following line leads to two passes over the raw genotype data when the eager computation is forced in sel: ds = ds.sel(variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6))).

You should try this instead:

ds = (
    ds
    .assign(**ds[['variant_allele_frequency', 'variant_hwe_p_value']].compute())
    .pipe(lambda ds: ds.sel(variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6))))
)

Here's a reproducible example to motivate it:

import xarray as xr
import numpy as np
from dask.diagnostics import Profiler

ds = (
    xr.Dataset(data_vars=dict(
        x=(('a',), np.arange(1000)),
        y=(('a',), np.arange(1000))
    ))
    .chunk(100)
    .assign(z1=lambda ds: ds['x'] + ds['y'])
    .assign(z2=lambda ds: ds['x'] * ds['y'])
)

with Profiler() as prof:
    ds2 = (
        ds.assign(**ds[['z1', 'z2']].compute())
        .pipe(lambda ds: ds.sel(a=(ds['z1'] > 0) & (ds['z2'] > 0)))
    )
f'Number of tasks = {len(prof.results)}'
# Number of tasks = 20

with Profiler() as prof:
    ds3 = ds.sel(a=(ds['z1'] > 0) & (ds['z2'] > 0))
f'Number of tasks = {len(prof.results)}'
# Number of tasks = 50

xr.testing.assert_equal(ds2, ds3)

The key difference there is that calling compute explicitly leads to (I think in this case) one pass over the array chunks in x and y. You could verify by viewing the dask graph in the UI. Looking at the difference in the number of tasks here though makes me fairly confident that you should see the same thing on the hwe benchmark.


Also, for the benchmark I would suggest doing something like ds[['variant_hwe_p_value', 'variant_allele_frequency', 'variant_position', 'variant_contig', 'variant_allele']].to_zarr(...) at the end instead of trying to pull the results into memory. Then the .compute call should take the most time and the .to_zarr call won't be negligible either but it should be small by comparison.

@LiangdeLI
Copy link
Author

Thanks @eric-czech ! Your explanation and code samples make much sense to me. Follow your suggestion, I tried to have a looked at dask graph to have more sense. However, I cannot see any task graph in /graph page. Although, I do see update in 'task stream' graph in /status page after I ds2 and ds3 code. I setup the dask client by: from dask.distributed import Client; client = Client() before running any code. Also I found the printout becomes # Number of tasks = 0 then, while without this dask UI, the printouts are # Number of tasks = 40 # Number of tasks = 150.

@LiangdeLI
Copy link
Author

You should try this instead:

A good news is that this method has reduced sel() time from 375 to 20 seconds, calculated on chr21.

@eric-czech
Copy link
Collaborator

How long, approximately, does it take now start to finish for sgkit vs hail (where both are writing .mt or .zarr files respectively)?

@LiangdeLI
Copy link
Author

First, I just noticed recently, Hail can not write p_value/AF to .mt, features like p_value and AF are 'Expression' type/class in their implemention, with export() function. The output is .tsv, which is human readable text file.

My experiment:
Hardware: 16 cores. Task: HWE including filter/selection, and output to their default format(.tsv or .zarr). Data: chr21 and 1kg
Results:
chr21: Hail: 40.2s, Sgkit: 22.1s
1kg: Hail: 382.7s, Sgkit: 1489.6

And 1kg is 70x larger than chr21 in their original .vcf format.

@LiangdeLI
Copy link
Author

What I'm currently doing is to make them both doing eager loading/computation, instead of building the task graph and waiting the final triggering of compution in final output stage. So in this way, ideally I can compare the breakdown of their data loading, execuation and output writing time.

@eric-czech
Copy link
Collaborator

Ok great! Making progress then.

So in this way, ideally I can compare the breakdown of their data loading, execuation and output writing time.

I think we should avoid this -- I like the idea but there's no good way to break that out for a fair comparison in both dask and spark since the amount of work they do in each of those stages will vary so much. It would be sufficient to just measure the execution time for the whole operation, including I/O.

@jeromekelleher
Copy link
Collaborator

Yes, forcing everything to be eager seems like a bad measure of how things would work in practise, since this is something we'd discourage.

@tomwhite
Copy link
Collaborator

tomwhite commented Jan 4, 2023

Closing old issue

@tomwhite tomwhite closed this as completed Jan 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants