Skip to content
This repository has been archived by the owner on Nov 26, 2022. It is now read-only.

Commit

Permalink
bump v0.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
slavoutich committed Mar 6, 2017
0 parents commit 39bf349
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 0 deletions.
92 changes: 92 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# IPython Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# dotenv
.env

# virtualenv
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject

#SGE
scratch/*
22 changes: 22 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Copyright (c) 2017, Viacheslav Ostroukh
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48 changes: 48 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Dask on SLURM
=============

Deploy a Dask.distributed_ cluster on top of a cluster running a
SLURM_ workload manager.

Written under influence and with code borrowing from Dask-DRMAA_ project.

.. _Dask.distributed: http://distributed.readthedocs.io/en/latest/
.. _SLURM: https://slurm.schedmd.com/
.. _Dask-DRMAA: https://github.com/dask/dask-drmaa/

Example
-------

Launch cluster from Python code and do some simple calculation:

.. code-block:: python
from slurmified import Cluster
slurm_kwargs = {
'mem-per-cpu': '100',
'time': '1-00:00:00'
}
cluster = Cluster(slurm_kwargs)
cluster.start_workers(10)
from distributed import Client
client = Client(cluster)
future = client.submit(lambda x: x + 1, 10)
future.result() # returns 11
If you want cluster to terminate automatically after calculation finished,
you can use the following:

.. code-block:: python
from slurmified import Cluster
from distributed import Client
slurm_kwargs = {
'mem-per-cpu': '100',
'time': '1-00:00:00'
}
inputs = list(range(0, 100))
with Client(Cluster(slurm_kwargs).start_workers(10)) as client:
incremented = client.map(lambda x: x+1, inputs)
inverted = client.map(lambda x: -x, incremented)
outputs = client.gather(inverted)
print(outputs) # prints [-1, .. , -100]
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
distributed
slurmpy
21 changes: 21 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python

from os.path import exists
from setuptools import setup

setup(name='slurmified',
version='0.0.1',
description='Dask.Didtributed on SLURM-managed clusters',
url='https://gitlab.kwant-project.org/slavoutich/slurmified/',
maintainer='Viacheslav Ostroukh',
maintainer_email='[email protected]',
license='BSD 2-clause',
keywords='',
packages=['slurmified'],
install_requires=list(open('requirements.txt')
.read()
.strip()
.split('\n')),
long_description=(open('README.rst').read() if exists('README.rst')
else ''),
zip_safe=False)
1 change: 1 addition & 0 deletions slurmified/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .core import Cluster
203 changes: 203 additions & 0 deletions slurmified/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import logging
import os
import sys
import socket
import slurmpy
import subprocess

from distributed import LocalCluster
from distributed.utils import sync
from time import time, sleep
from toolz import merge

logger = logging.getLogger(__name__)


class Cluster:

def _wait_workers_start(self, n=1, timeout=None):
""" Wait for number of workers, seen by scheduler, will become not less
than n and return True. If timeout is reached, and it is still
not happens, return False """
dt = timeout or self._wait_timeout
end_time = time() + dt
while True:
if self.n_workers >= n:
return True
if time() > end_time:
return False
sleep(self._wait_timestep)

def _start_local_cluster(self, **kwargs):
ip = kwargs.pop("ip", socket.gethostbyname(self._hostname))
scheduler_port = kwargs.pop("scheduler_port", 0)
self._local_cluster = LocalCluster(
n_workers=0, ip=ip, scheduler_port=scheduler_port, **kwargs)
logger.info("Started local scheduler at {addr}".
format(addr=self.scheduler_address))

def __init__(self, slurm_kwargs=None, hostname=None, task_name=None,
timeout=10., **kwargs):
"""
Dask.Distribued workers launched via SLURM workload manager
Parameters
----------
slurm_kwargs: dict
A dictionary with arguments, passed to SLURM batch script
(see Examples). If None, defaults to empty dictionary.
hostname: None or string
Hostname of a controller node, visible by other SLURM nodes.
If None, determined automatically through 'socket.gethostname()'.
task_name: string or None
Name of the job, passed to SLURM. If None, defaults to
'dask-workers'.
timeout: float
Default time to wait until workers start
(see ``self.start_workers``).
**kwargs: dict
Keyword arguments, passed directly to 'distributed.LocalCluster'
constructor.
Examples
--------
>>> from slurmified import Cluster
>>> slurm_kwargs = {
... 'partition': 'default',
... 'mem-per-cpu': '100',
... 'time': '1-00:00:00'
... }
>>> cluster = Cluster(slurm_kwargs)
>>> cluster.start_workers(10)
>>> from distributed import Client
>>> client = Client(cluster)
>>> future = client.submit(lambda x: x + 1, 10)
>>> future.result()
11
"""
self._hostname = hostname or socket.gethostname()
self._start_local_cluster(**kwargs)

self._slurm_kwargs = slurm_kwargs.copy() if slurm_kwargs else {}
nthreads1 = self._slurm_kwargs.pop("cpus-per-task", None)
nthreads2 = self._slurm_kwargs.pop("c", None)
self._nthreads = nthreads1 or nthreads2 or 1
self._jobid = None

self._task_name = task_name or "dask-workers"
self._wait_timeout = timeout
self._wait_timestep = 1

self._worker_exec = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
logger.info("Using dask-worker executable '{exe}'".
format(exe=self._worker_exec))

@property
def scheduler(self):
return self._local_cluster.scheduler

@property
def scheduler_address(self):
return ('{hostname}:{port}'.
format(hostname=self._hostname, port=self.scheduler.port))

@property
def n_workers(self):
return len(self.scheduler.workers)

def start_workers(self, n=1, n_min=None, timeout=None, **kwargs):
"""Start Dask workers via SLURM batch script. If workers are started
already, they are terminated. Returns self.
Parameters
----------
n: int
Number of workers to start.
n_min: None or int
Minimal number of workers launched, needed to start calculations.
Function waits, until it is reached and exits. If it is not
achieved until ``timeout``, RuntimeError will be emited. If None,
wunction will wait for all ``n`` workers to start, but error would
never be emited, only warning.
timeout: None or int
Time in seconds to wait for workers to start. If it is reached, and
workers are not started, warning is emited. If None, default is
used (provided in constructor).
**kwargs: dict
Dictionary with strings as keys and values, can be used to override
SLURM kwargs, passed to the constructor.
"""
if self._jobid:
self.stop_workers()
slurm_kwargs = merge(
self._slurm_kwargs, kwargs or {},
{"array": "0-{}".format(n-1), "cpus-per-task": self._nthreads}
)
s = slurmpy.Slurm(self._task_name, slurm_kwargs)
self._jobid = s.run(
" ".join((self._worker_exec,
"--nthreads", str(self._nthreads),
"--nprocs", "1",
"--reconnect",
self.scheduler_address))
)
if self._wait_workers_start(n_min or n, timeout):
m = ("Started {n} workers, job number {jobid}"
.format(n=self.n_workers, jobid=self._jobid))
logger.info(m)
elif n_min:
m = ("Not enough workers to continue "
"({n}, minimal provided {n_min})"
.format(n=self.n_workers, n_min=n_min))
self.stop_workers()
raise RuntimeError(m)
else:
m = ("Timeout is reached while waiting for {n} workers to start. "
"{n_started} actually started. Job number {jobid}."
.format(n=n, n_started=self.n_workers, jobid=self._jobid))
logger.warning(m)
return self

def stop_workers(self):
""" Stop running workers. """
try:
sync(loop=self._local_cluster.loop,
func=self.scheduler.retire_workers,
workers=self.scheduler.workers_to_close(),
remove=True)
finally:
if self._jobid:
try:
subprocess.check_call(("scancel", str(self._jobid)))
except subprocess.CalledProcessError as ex:
m = ("scancel returned non-zero exit status {code} while "
"stopping Slurm job number {jobid} for workers. "
"You should check manually whether they are "
"terminated successfully."
.format(code=ex.returncode, jobid=self._jobid))
logger.error(m)
finally:
self._jobid = None

def _start(self):
return self._local_cluster._start()

def close(self):
""" Close the cluster. """
logger.info("Closing workers and cluster")
if self._jobid:
self.stop_workers()
self._local_cluster.close()

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

def __del__(self):
try:
self.close()
except:
pass

0 comments on commit 39bf349

Please sign in to comment.