Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Language Client/Workers #586

Open
mrocklin opened this issue Oct 15, 2016 · 22 comments
Open

Cross Language Client/Workers #586

mrocklin opened this issue Oct 15, 2016 · 22 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Oct 15, 2016

The Dask.distributed dynamic task scheduler could be replicated across different languages with low-to-moderate effort. This would require someone to build Client and Worker objects in the other language that communicate to the same Scheduler, which contains most of the logic but is fortunately language agnostic. More specifically, there are three players in a dask.distributed cluster, only two of which would need to be rewritten:

  1. Client: something that users use to submit tasks to the scheduler. Would need to be rewritten but is fairly simple. Needs to know how to serialize functions, encode msgpack, and send data over a socket.
  2. Worker: a process running on a remote node that performs those actual tasks. Would need to be rewritten but is also fairly simple. Needs to know how to communicate over a socket, deserialize functions, and execute them asynchronously in some sort of thread pool.
  3. Scheduler: a process to coordinate the actions of all clients and workers, ensuring that the computation proceeds to completion under various stimuli. This is very complex but would not need to be rewritten as it is language agnostic.

About 90% of the complexity of dask.distributed is in the scheduler. Fortunately the scheduler is also language agnostic, and communicates only using msgpack and long bytestrings. It should be doable to re-implement the Client and Workers in another language like R or Julia if anyone has interest. This would require the following understanding in the other language:

  1. How to serialize and deserialize functions and variables in that language
  2. How to communicate over a network, hopefully in a non-blocking way
  3. How to evaluate functions using a separate thread pool
@ViralBShah
Copy link

Doing all these in Julia should be fairly straightforward. I also see an MsgPack.jl package

https://github.com/kmsquire/MsgPack.jl

  1. All julia objects can be serialized and deserialized in a straightforward way. Some tweaks may be necessary for performance when we get there.
  2. Julia has Tasks, which give a very nice way to do asynchronous communication.
  3. Evaluating in a threadpool may take some work as our multi-threading is in early stages, but we could easily have a pool of julia processes for now.

Are there code samples showing how to do 1, 2 and 3?

cc: @shashi @amitmurthy @kmsquire

@mrocklin
Copy link
Member Author

Some relevant documentation:

  1. Dask.distributed protocol: http://distributed.readthedocs.io/en/latest/protocol.html . It looks like this page is not entirely comprehensive. It should give a good idea on how we send messages around though. I would be happy to go through and improve it if there is interest.
  2. Foundations document for Python developers: http://distributed.readthedocs.io/en/latest/foundations.html . This is mostly on how we organized things internally at the base level. It has nothing to do with how one might manage things in Julia, but is helpful if you want to learn from existing code.
  3. The worker code: https://github.com/dask/distributed/blob/master/distributed/worker.py . It's about 1000 lines of Python, only about half of which is actually necessary for a minimal implementation.
  4. The client code: https://github.com/dask/distributed/blob/master/distributed/client.py . It's about 2000 lines of Python, only about a fifth of which is probably necessary for a minimal implementation. Much of this is special features that have accrued over the past year.

I was looking at Julia tasks yesterday and indeed they seem quite nice. I'm a bit jealous :)

You could also have workers with a single thread for computation. That's a common mode of operation in Python as well when dealing with GIL-holding Pure Python code. However it is important that the worker process can handle incoming messages while computing though. Presumably Julia's asynchronous task API supports this?

It seems like there is some non-trivial interest in this. I'll spend some time scoping out a minimal API for clients and workers.

@mrocklin
Copy link
Member Author

With Julia in particular I'm curious on how to have this satisfy other parallel programming protocols in the language. The main thing that Dask brings here is intelligent dynamic task scheduling. We've thought fairly hard about how to do this well. The APIs that Python Dask uses though might not be the right APIs for Julia-Dask. Python-dask generally uses delayed function calls and futures/promises.

@mrocklin
Copy link
Member Author

Some general notes that came up in private conversation:

  • This approach doesn't move over dask.array or dask.dataframe, it only moves over the distributed dynamic task scheduler. There is a snippet from a SciPy conference talk on this here: https://youtu.be/PAGjm4BMKlk?t=800
  • Long term I'd be happy moving the scheduler implementation to another language as well. This would be preceded by some serious benchmarking of languages like Go, PyPy, C++, and Julia. At the moment scheduler performance in Python is actually pretty decent. Python is actually not-bad when it comes to a lot of complex data structure access.

@ViralBShah
Copy link

Yes, Julia processes can handle asyncrhonous communication while computing, so long as they are executing Julia code. This is done through Julia Tasks. If a lot of time is spent in a C/Fortran library call, say BLAS, then that will starve the julia scheduler. The @async and @sync macros are quite handy.


help?> @async
  @async

  Like @schedule, @async wraps an expression in a Task and adds it to the
  local machine's scheduler queue. Additionally it adds the task to the set of
  items that the nearest enclosing @sync waits for. @async also wraps the
  expression in a let x=x, y=y, ... block to create a new scope with copies of
  all variables referenced in the expression.

help?> @sync
  @sync

  Wait until all dynamically-enclosed uses of @async, @spawn, @spawnat and
  @parallel are complete. All exceptions thrown by enclosed async operations
  are collected and thrown as a CompositeException.

@ViralBShah
Copy link

Here's a simple example using Julia's download() function to download some URLs, which calls curl/wget concurrently using Julia's tasks:

julia> URLs = 
3-element Array{String,1}:
 "www.google.com"
 "www.amazon.com"
 "www.yahoo.com" 

julia> @sync for u in ["www.google.com", "www.amazon.com", "www.yahoo.com"]
           @async download(u)
       end
  % Total    % Received % Xferd   A v%e rTaogtea lS  p e e%d  R e cTeiimvee d   %  TXifmeer d     A vTeirmaeg e  CSuprereedn t 
  T i m e         T i m e           T i m e     C u r r e n t 
    D l o a d     U p l o a d       T o t a l       S p e n t      D lLoeafdt    USppleoeadd
t a l0      S p e0n t      0  L e f t  0  S p e e0d 
0    0        0  0         00   - - : -0- : - -  0 - - : - -0  :  -%  - T  o- t-0a: l-  - :  -   - % 0   R - -e :c0-e-i:v-e-d  -%- :X-f-e:r-d-   -A-v:e-r-a:g-e-  S p e e d0   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   261  100   261    0     0   1619      0 --:--:-- --:--:-- --:--:--  1631
100   304  100   304    0     0   1346      0 --:--:-- --:--:-- --:--:--  1351
100 12584    0 12584    0     0  33003      0 --:--:-- --:--:-- --:--:-- 33003
100   231  100   231    0     0    365      0 --:--:-- --:--:-- --:--:--   366
100    17  100    17    0     0     21      0 --:--:-- --:--:-- --:--:--    21
100  1378  100  1378    0     0    618      0  0:00:02  0:00:02 --:--:--  1065
100  372k    0  372k    0     0   150k      0 --:--:--  0:00:02 --:--:--  229k

@ViralBShah
Copy link

To me the larger question will be how Julia users can use these capabilities. That will eventually need a parallel array/dataframe implementation. The good news is that most Julia array code targets AbstractArrays and increasingly, AbstractDataframes are also falling into place. Thus DaskArrays and DaskDataFrames can transparently provide alternate implementations and be leveraged by users - everything will work out under the hood with multiple dispatch.

@mrocklin
Copy link
Member Author

To me the larger question will be how Julia users can use these capabilities. That will eventually need a parallel array/dataframe implementation.

I agree that a parallel array/dataframe implementation composes well with this, however I would disagree with the term "need". A surprisingly large amount of Dask's use today has little to do with the dask.array and dask.dataframe implementations and a lot to do with arbitrary task scheduling. Big business definitely likes "Big Data Frames" as a database surrogate on Hadoop, but outside of that there is a lot of use of Dask for arbitrary dynamic task scheduling. This was somewhat unexpected and a large pivot of the project from Dask's original implementation as a parallel array.

Here is a slide deck from a recent PyData conference stressing this topic if it's of interest: http://matthewrocklin.com/slides/pydata-dc-2016#/

@ViralBShah
Copy link

That's interesting. Thanks for the link - just went through it. So what do users typically use Dask parallelism for today? This would be quite interesting personally for me - but not sure if this is the right place to ask that question. Any pointers would help.

@mrocklin
Copy link
Member Author

These links have some additional information:

http://dask.pydata.org/en/latest/use-cases.html
http://matthewrocklin.com/blog/work/2016/08/16/dask-for-institutions

@mrocklin
Copy link
Member Author

This project seems to be communicating to the dask-scheduler from Julia: https://github.com/invenia/DaskDistributedDispatcher.jl

@iamed2
Copy link
Contributor

iamed2 commented Jun 16, 2017

@mrocklin Yup that's our project :)

@ViralBShah
Copy link

@mrocklin You should come to JuliaCon next week in Berkeley if you are in the region!

@mrocklin
Copy link
Member Author

mrocklin commented Jun 18, 2017 via email

@dselivanov
Copy link

Didn't know scheduler is language agnostic. Will check whether it is possible to create small proof of concept for R.

@mrocklin
Copy link
Member Author

I'm very glad to hear it @dselivanov . Please let us know if there is anything that causes frustration on the Dask side.

@niallrobinson
Copy link

Hi @dselivanov - did you get anywhere with this? I had a conversation with someone who said that RStudio put the work in to make R play with Spark. Apparently, they ended up using PySpark to glue R to Spark. I wonder if they'd have the appetite to do the same for Dask - we might find that some of the work has already been done.

@dselivanov
Copy link

@niallrobinson this is not true - sparkR and sparklyr work without python.

Regarding dask for R - I haven't had time to create proof of concept. But I'm pretty sure it is possible. The main challenge is how to communicate over a network in a non-blocking way.

@mrocklin
Copy link
Member Author

My hope is that R has some non-blocking concurrent framework. If not then we would have to communicate in separate threads and use queues.

@dselivanov
Copy link

It looks like https://github.com/HenrikBengtsson/future can be useful. @HenrikBengtsson what is your opinion - you've done quite a lot in this field?

@dselivanov
Copy link

@mrocklin would be super useful if you can point to the document/scheme on what is needed to implement very minimal "big data 'hello world' " - word count. Something like

  1. start with 2 workers w1, w2 and client c
  2. Client c send message to scheduler to read data
  3. scheduler sends message to workers w1, w2 to read data and assign results to keys k1_1, k2_1
  4. scheduler send message to workers to make local reduce - local word counts. Assign results to k1_2, k2_2.
  5. scheduler send message to partition results by word (say 2 hash partitions for each worker) - {k1_3, k1_4}, {k2_3, k2_4}
  6. scheduler send message to workers to send results to peers:
    • w1 sends k1_3 to w2
    • w2 sends k2_4 to w1
  7. scheduler asks for local reduce

Something like above but with technical details will be super useful for the start.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 16, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants