Skip to content

Commit

Permalink
Merge pull request #12 from spotify/master
Browse files Browse the repository at this point in the history
Merge into master
  • Loading branch information
GoodDok authored Aug 23, 2019
2 parents 7fcdb93 + 7f67fbc commit 8e1c966
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 43 deletions.
8 changes: 4 additions & 4 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ You will need a ``tox --version`` of at least 2.0.
# These commands are pretty fast and will tell if you've
# broken something major:
tox -e flake8
tox -e py27-nonhdfs
tox -e py27-core
# You can also test particular files for even faster iterations
tox -e py27-nonhdfs test/rpc_test.py
tox -e py27-core test/rpc_test.py
# The visualiser tests require phantomjs to be installed on your path
tox -e visualiser
Expand All @@ -35,9 +35,9 @@ You will need a ``tox --version`` of at least 2.0.
tox -e py34-hdp
Where ``flake8`` is the lint checking, ``py27`` is obviously Python 2.7.
``nonhdfs`` are tests not running on the Hadoop minicluster and ``cdh`` and
``core`` are tests that do not require external components and ``cdh`` and
``hdp`` are two different hadoop distributions. For most local development it's
usually enough to run the lint checking and a python version for ``nonhdfs``
usually enough to run the lint checking and a python version for ``core``
and let Travis run for the whole matrix.

For `cdh` and `hdp`, tox will download the hadoop distribution for you. You
Expand Down
6 changes: 4 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ or held presentations about Luigi:
* `Glossier <https://www.glossier.com/>`_ `(blog, 2018) <https://medium.com/glossier/how-to-build-a-data-warehouse-what-weve-learned-so-far-at-glossier-6ff1e1783e31>`__
* `Data Revenue <https://www.datarevenue.com/>`_ `(blog, 2018) <https://www.datarevenue.com/en/blog/how-to-scale-your-machine-learning-pipeline>`_
* `Uppsala University <http://pharmb.io>`_ `(tutorial) <http://uppnex.se/twiki/do/view/Courses/EinfraMPS2015/Luigi.html>`_ / `(presentation, 2015) <https://www.youtube.com/watch?v=f26PqSXZdWM>`_ / `(slides, 2015) <https://www.slideshare.net/SamuelLampa/building-workflows-with-spotifys-luigi>`_ / `(poster, 2015) <https://pharmb.io/poster/2015-sciluigi/>`_ / `(paper, 2016) <https://doi.org/10.1186/s13321-016-0179-6>`_ / `(project) <https://github.com/pharmbio/sciluigi>`_
* `GIPHY <https://giphy.com/>`_ `(blog, 2019) <https://engineering.giphy.com/luigi-the-10x-plumber-containerizing-scaling-luigi-in-kubernetes/>`_
* `GIPHY <https://giphy.com/>`_ `(blog, 2019) <https://engineering.giphy.com/luigi-the-10x-plumber-containerizing-scaling-luigi-in-kubernetes/>`__
* `xtream <https://xtreamers.io/>`__ `(blog, 2019) <https://towardsdatascience.com/lessons-from-a-real-machine-learning-project-part-1-from-jupyter-to-luigi-bdfd0b050ca5>`__

Some more companies are using Luigi but haven't had a chance yet to write about it:

Expand Down Expand Up @@ -182,8 +183,9 @@ Some more companies are using Luigi but haven't had a chance yet to write about
* `VOYAGE GROUP/Zucks <https://zucks.co.jp/en/>`_
* `Textpert <https://www.textpert.ai/>`_
* `Whizar <https://www.whizar.com/>`_
* `xtream <https://www.xtreamers.io/>`_
* `xtream <https://www.xtreamers.io/>`__
* `Skyscanner <https://www.skyscanner.net/>`_
* `Jodel <https://www.jodel.com/>`_

We're more than happy to have your company added here. Just send a PR on GitHub.

Expand Down
13 changes: 5 additions & 8 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ check_unfulfilled_deps

force_multiprocessing
By default, luigi uses multiprocessing when *more than one* worker process is
requested. Whet set to true, multiprocessing is used independent of the
requested. When set to true, multiprocessing is used independent of the
the number of workers.
Defaults to false.

Expand Down Expand Up @@ -405,8 +405,8 @@ method
config files or run Luigi on an EC2 instance with proper instance
profile.

In order to use sendgrid, fill in your sendgrid username and password
in the `[sendgrid]`_ section.
In order to use sendgrid, fill in your sendgrid API key in the
`[sendgrid]`_ section.

In order to use smtp, fill in the appropriate fields in the `[smtp]`_
section.
Expand Down Expand Up @@ -834,11 +834,8 @@ metrics_collector

These parameters control sending error e-mails through SendGrid.

password
Password used for sendgrid login

username
Name of the user for the sendgrid login
apikey
API key of the SendGrid account.


[smtp]
Expand Down
2 changes: 1 addition & 1 deletion doc/example_top_artists.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Python code for the Spark job is found below.
# The second field is the artist
counts = streams \
.map(lambda row: (row[1], 1)) \
.reduceByKey(add)
.reduceByKey(operator.add)
counts.write.option('sep', '\t').csv(output_path)
Expand Down
28 changes: 22 additions & 6 deletions luigi/contrib/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
# limitations under the License.
#
"""
This library is a wrapper of ftplib.
It is convenient to move data from/to FTP.
This library is a wrapper of ftplib or pysftp.
It is convenient to move data from/to (S)FTP servers.
There is an example on how to use it (example/ftp_experiment_outputs.py)
You can also find unittest for each class.
Be aware that normal ftp do not provide secure communication.
Be aware that normal ftp does not provide secure communication.
"""

import datetime
Expand Down Expand Up @@ -368,7 +368,8 @@ class RemoteTarget(luigi.target.FileSystemTarget):
"""
Target used for reading from remote files.
The target is implemented using ssh commands streaming data over the network.
The target is implemented using intermediate files on the local system.
On Python2, these files may not be cleaned up.
"""

def __init__(
Expand Down Expand Up @@ -407,8 +408,23 @@ def open(self, mode):
return self.format.pipe_writer(AtomicFtpFile(self._fs, self.path))

elif mode == 'r':
temp_dir = os.path.join(tempfile.gettempdir(), 'luigi-contrib-ftp')
self.__tmp_path = temp_dir + '/' + self.path.lstrip('/') + '-luigi-tmp-%09d' % random.randrange(0, 1e10)
temppath = '{}-luigi-tmp-{:09d}'.format(
self.path.lstrip('/'), random.randrange(0, 1e10)
)
try:
# store reference to the TemporaryDirectory because it will be removed on GC
self.__temp_dir = tempfile.TemporaryDirectory(
prefix="luigi-contrib-ftp_"
)
except AttributeError:
# TemporaryDirectory only available in Python3, use old behaviour in Python2
# this file will not be cleaned up automatically
self.__tmp_path = os.path.join(
tempfile.gettempdir(), 'luigi-contrib-ftp', temppath
)
else:
self.__tmp_path = os.path.join(self.__temp_dir.name, temppath)

# download file to local
self._fs.get(self.path, self.__tmp_path)

Expand Down
6 changes: 4 additions & 2 deletions luigi/contrib/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,11 @@ def _dump(self, out_dir=''):
d = pickle.dumps(self)
module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0]
d = d.replace('(c__main__', "(c" + module_name)
open(self.job_file, "w").write(d)
with open(self.job_file, "w") as f:
f.write(d)
else:
pickle.dump(self, open(self.job_file, "w"))
with open(self.job_file, "wb") as f:
pickle.dump(self, f)

def _run_job(self):

Expand Down
17 changes: 17 additions & 0 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,19 @@ def forgive_failures(self, task_id=None):
self._state.set_status(task, status, self._config)
return {"task_id": task_id, "status": task.status}

@rpc_method()
def mark_as_done(self, task_id=None):
status = DONE
task = self._state.get_task(task_id)
if task is None:
return {"task_id": task_id, "status": None}

# we can force mark DONE for running or failed tasks
if task.status in {RUNNING, FAILED, DISABLED}:
self._update_task_history(task, status)
self._state.set_status(task, status, self._config)
return {"task_id": task_id, "status": task.status}

@rpc_method()
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
Expand Down Expand Up @@ -992,6 +1005,10 @@ def get_scheduler_message_response(self, task_id, message_id):
response = task.scheduler_message_responses.pop(message_id, None)
return {"response": response}

@rpc_method()
def has_task_history(self):
return self._config.record_task_history

@rpc_method()
def is_pause_enabled(self):
return {'enabled': self._config.pause_enabled}
Expand Down
13 changes: 11 additions & 2 deletions luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
{{#error}}<button class="btn btn-warning btn-xs forgiveFailures" title="Forgive failures" data-toggle="tooltip"><i class="fa fa-ambulance"></i></button>{{/error}}
{{#re_enable}}<button class="btn btn-danger btn-xs showError" title="Show error" data-toggle="tooltip"><i class="fa fa-bug"></i></button>{{/re_enable}}
{{#re_enable}}<a class="btn btn-warning btn-xs re-enable-button" title="Re-enable" data-toggle="tooltip" data-task-id="{{taskId}}">Re-enable</a>{{/re_enable}}
{{#mark_as_done}}<a class="btn btn-success btn-xs markAsDone" title="Mark as done" data-toggle="tooltip" data-task-id="{{taskId}}"><i class="fa fa-fast-forward"></i></a>{{/mark_as_done}}
{{#trackingUrl}}<a target="_blank" href="{{trackingUrl}}" class="btn btn-primary btn-xs" title="Track Progress" data-toggle="tooltip"><i class="fa fa-eye"></i></a>{{/trackingUrl}}
{{#statusMessage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>{{/statusMessage}}
{{^statusMessage}}
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}"><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
{{#acceptsMessages}}<button class="btn btn-success btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>{{/acceptsMessages}}
{{#acceptsMessages}}<button class="btn btn-default btn-xs schedulerMessage" title="Send message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name="{{displayName}}" data-worker="{{workerIdRunning}}"><i class="fa fa-paper-plane"></i></button>{{/acceptsMessages}}
</div>
</script>
<script type="text/template" name="errorTemplate">
Expand Down Expand Up @@ -361,6 +362,14 @@ <h3 class="box-title">{{name}}</h3>
</div>
</script>

<script type="text/template" name="topNavbarItem">
<li>
<a class="js-nav-link" href="{{href}}" {{#dataTab}}data-tab="{{dataTab}}"{{/dataTab}}>
{{label}}
</a>
</li>
</script>

</head>
<body class="skin-green-light fixed">
<div class="modal fade" id="errorModal" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
Expand All @@ -387,7 +396,7 @@ <h3 class="box-title">{{name}}</h3>
</button>
</div>
<div class="collapse navbar-collapse">
<ul class="nav navbar-nav">
<ul class="nav navbar-nav" id="topNavbar">
<li><a class="js-nav-link" href="#tab=tasks" data-tab="taskList">Task List</a></li>
<li><a class="js-nav-link" href="#tab=graph" data-tab="dependencyGraph">Dependency Graph</a></li>
<li><a class="js-nav-link" href="#tab=workers" data-tab="workerList">Workers</a></li>
Expand Down
12 changes: 12 additions & 0 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.markAsDone = function (taskId, callback) {
return jsonRPC(this.urlRoot + "/mark_as_done", {task_id: taskId}, function(response) {
callback(flatten(response.response));
});
};

LuigiAPI.prototype.getFailedTaskList = function(callback) {
return jsonRPC(this.urlRoot + "/task_list", {status: "FAILED", upstream_status: "", search: searchTerm()}, function(response) {
callback(flatten(response.response));
Expand Down Expand Up @@ -180,6 +186,12 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.hasTaskHistory = function(callback) {
jsonRPC(this.urlRoot + '/has_task_history', {}, function(response) {
callback(response.response);
});
};

LuigiAPI.prototype.pause = function() {
jsonRPC(this.urlRoot + '/pause');
};
Expand Down
31 changes: 28 additions & 3 deletions luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ function visualiserApp(luigi) {
graph: (task.status == "PENDING" || task.status == "RUNNING" || task.status == "DONE"),
error: task.status == "FAILED",
re_enable: task.status == "DISABLED" && task.re_enable_able,
mark_as_done: (task.status == "RUNNING" || task.status == "FAILED" || task.status == "DISABLED"),
statusMessage: task.status_message,
progressPercentage: task.progress_percentage,
acceptsMessages: task.accepts_messages,
Expand Down Expand Up @@ -949,7 +950,7 @@ function visualiserApp(luigi) {
others.addClass('expanded')
others.next().show()
}

$('.sidebar-menu').on('click', 'li:not(.sidebar-folder)', function (e) {
e.stopPropagation();
if (this.dataset.task) {
Expand Down Expand Up @@ -979,7 +980,7 @@ function visualiserApp(luigi) {
else {
$('#warnings').html(renderWarnings());
}

processHashChange();
});
}
Expand Down Expand Up @@ -1086,7 +1087,7 @@ function visualiserApp(luigi) {
return $('.resource-box.in').toArray().map(function (val) { return val.dataset.resource; });
}

function expandResources(resources) {
function expandResources(resources) {
if (resources === undefined) {
resources = [];
} else {
Expand Down Expand Up @@ -1126,6 +1127,15 @@ function visualiserApp(luigi) {
$(document).ready(function() {
loadTemplates();

luigi.hasTaskHistory(function(hasTaskHistory) {
if (hasTaskHistory) {
$('#topNavbar').append(renderTemplate('topNavbarItem', {
label: "History",
href: "../../history",
}).children()[0]);
}
});

luigi.isPauseEnabled(function(enabled) {
if (enabled) {
luigi.isPaused(createPauseToggle);
Expand Down Expand Up @@ -1334,6 +1344,21 @@ function visualiserApp(luigi) {
});
} );

$('#taskTable tbody').on('click', 'td.details-control .markAsDone', function (ev) {
var that = $(this);
var tr = that.closest('tr');
var row = dt.row( tr );
var data = row.data();
luigi.markAsDone(data.taskId, function(data) {
if (ev.altKey) {
updateTasks(); // update may not be cheap
} else {
that.tooltip('hide');
that.remove();
}
});
} );

$('#taskTable tbody').on('click', 'td.details-control .re-enable-button', function (ev) {
var that = $(this);
luigi.reEnable(that.attr("data-task-id"), function(data) {
Expand Down
2 changes: 1 addition & 1 deletion luigi/tools/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def finite_datetimes(self, finite_start, finite_stop):
# Validate that the minutes_interval can divide 60 and it is greater than 0 and lesser than 60
if not (0 < self.minutes_interval < 60):
raise ParameterException('minutes-interval must be within 0..60')
if (60 / self.minutes_interval) * self.minutes_interval != 60:
if 60 % self.minutes_interval != 0:
raise ParameterException('minutes-interval does not evenly divide 60')
# start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10
start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def get_static_files(path):

setup(
name='luigi',
version='2.8.7',
version='2.8.8',
description='Workflow mgmgt + task scheduling + dependency resolution',
long_description=long_description,
author='The Luigi Authors',
Expand Down
16 changes: 14 additions & 2 deletions test/_test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import ftplib
import os
import shutil
import sys
from helpers import unittest
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
from io import BytesIO

def StringIO(s):
return BytesIO(s.encode('utf8'))

from luigi.contrib.ftp import RemoteFileSystem, RemoteTarget

Expand Down Expand Up @@ -180,11 +184,19 @@ def test_get(self):
with remotetarget.open('r') as fin:
self.assertEqual(fin.read(), "something to fill")

# check for cleaning temporary files
if sys.version_info >= (3, 2):
# cleanup uses tempfile.TemporaryDirectory only available in 3.2+
temppath = remotetarget._RemoteTarget__tmp_path
self.assertTrue(os.path.exists(temppath))
remotetarget = None # garbage collect remotetarget
self.assertFalse(os.path.exists(temppath))

# file is successfuly created
self.assertTrue(os.path.exists(local_filepath))

# test RemoteTarget with mtime
ts = datetime.datetime.now() - datetime.timedelta(minutes=2)
ts = datetime.datetime.now() - datetime.timedelta(days=2)
delayed_remotetarget = RemoteTarget(remote_file, HOST, username=USER, password=PWD, mtime=ts)
self.assertTrue(delayed_remotetarget.exists())

Expand Down
Loading

0 comments on commit 8e1c966

Please sign in to comment.