-
Notifications
You must be signed in to change notification settings - Fork 0
Problems with PyWPS, Dask and Xarray
In the context of PAVICS, we are observing that the PyWPS process queues of our services (which are backed by a Postgres database) regularly fill up with stuck processes (in the "stated" state, but not progressing in terms of their complete percentage).
Our first assumption was that the problem is at the PyWPS level, but after a careful analysis I have established that it's rather related with dask, xarray and netcdf, which are used by our processes.
A process gets stuck when it is called in async mode, and following a (successful) process that was called in sync mode, by the same server instance. This is important because a series of exclusively sync calls, or alternatively a series of exclusively async calls, does NOT trigger the problem. It's only when the server needs to operate in both modes, with code using the libraries mentioned above, that the issue arises.
To be more precise, I have been able to reproduce and isolate two different types of crash in our environment, and I'm not certain to what extent they are related. One thing is clear: they are both triggered by the same sync + async sequence described above.
The first problem requires that the server runs through gunicorn, and hangs exactly there:
when trying to save a NetCDF file.
After investigating this problem for a while, and because by now the principal suspect was clearly Dask (which makes sense, given that this whole setup is making use of no less than THREE different layers of multiprocessing, with gunicorn, PyWPS and Dask!), I had the idea of trying to refactor our Dask scheduler setup. In our current code, we are using the default scheduler, so I simply introduced a dask.distributed.Client, connecting to an externally running dask scheduler, at the location in the code which seems to make the most sense for this: the beginning of the PyWPS process _handler. This experiment lead to the discovery of a second problem, which is described in this issue: dask/distributed#5334, because it is easier to isolate and reproduce.
Given that it's related to other similar existing issues in the dask.distributed project, this second problem has a potential fix: setting the multiprocessing.set_start_method to spawn, instead of its default fork (at least on Linux). This however leads to pickle-related problems, because the process and WPS request and response classes contain some hard to serialize objects. Among many things, I have tried to replace pickle with dill, which is supposedly more powerful, to no avail.
So in the end we are stuck with these problems (or maybe it's a unique problem?) and my only mitigation solution for now is to make birdy, which is the client to talk with our WPS processes, async-only, which is clearly not an ideal solution.
Consider the following code:
import dask
import xarray as xr
import numpy as np
from dask.distributed import Client
if __name__ == '__main__':
with dask.config.set(scheduler='processes'):
rs = np.random.RandomState(0)
da = xr.DataArray(rs.randn(1000, 2000), dims=["x", "y"]).chunk({"x": 100, "y": 100})
da.to_netcdf('bla.nc')
It doesn't work because NetCDF4DataStore contains a lock that is not picklable.
I noticed that while the threaded scheduler uses a lock that is defined in function of the file name (as the key
):
the process-based scheduler throws away the key:
I'm not sure yet what are the consequences and logical interpretation of that. Should this scenario simply raise a NotImplemented
error because it cannot be supported?
There are simply too many questions related to our current use of xarray and dask in the PAVICS environment, and as long as these questions are not answered, and the underlying logic not better understood, some parts of our stack will remain brittle and unpredictable, in terms of the required computing load.