diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 8a2d0dfa76..f6f2d4bea4 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -22,10 +22,10 @@ You will need a ``tox --version`` of at least 2.0. # These commands are pretty fast and will tell if you've # broken something major: tox -e flake8 - tox -e py27-nonhdfs + tox -e py27-core # You can also test particular files for even faster iterations - tox -e py27-nonhdfs test/rpc_test.py + tox -e py27-core test/rpc_test.py # The visualiser tests require phantomjs to be installed on your path tox -e visualiser @@ -35,9 +35,9 @@ You will need a ``tox --version`` of at least 2.0. tox -e py34-hdp Where ``flake8`` is the lint checking, ``py27`` is obviously Python 2.7. -``nonhdfs`` are tests not running on the Hadoop minicluster and ``cdh`` and +``core`` are tests that do not require external components and ``cdh`` and ``hdp`` are two different hadoop distributions. For most local development it's -usually enough to run the lint checking and a python version for ``nonhdfs`` +usually enough to run the lint checking and a python version for ``core`` and let Travis run for the whole matrix. For `cdh` and `hdp`, tox will download the hadoop distribution for you. You diff --git a/README.rst b/README.rst index 86d4623b4f..da8f449036 100644 --- a/README.rst +++ b/README.rst @@ -154,7 +154,8 @@ or held presentations about Luigi: * `Glossier `_ `(blog, 2018) `__ * `Data Revenue `_ `(blog, 2018) `_ * `Uppsala University `_ `(tutorial) `_ / `(presentation, 2015) `_ / `(slides, 2015) `_ / `(poster, 2015) `_ / `(paper, 2016) `_ / `(project) `_ -* `GIPHY `_ `(blog, 2019) `_ +* `GIPHY `_ `(blog, 2019) `__ +* `xtream `__ `(blog, 2019) `__ Some more companies are using Luigi but haven't had a chance yet to write about it: @@ -182,8 +183,9 @@ Some more companies are using Luigi but haven't had a chance yet to write about * `VOYAGE GROUP/Zucks `_ * `Textpert `_ * `Whizar `_ -* `xtream `_ +* `xtream `__ * `Skyscanner `_ +* `Jodel `_ We're more than happy to have your company added here. Just send a PR on GitHub. diff --git a/doc/configuration.rst b/doc/configuration.rst index e9672fff72..36aa7f91e1 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -363,7 +363,7 @@ check_unfulfilled_deps force_multiprocessing By default, luigi uses multiprocessing when *more than one* worker process is - requested. Whet set to true, multiprocessing is used independent of the + requested. When set to true, multiprocessing is used independent of the the number of workers. Defaults to false. @@ -405,8 +405,8 @@ method config files or run Luigi on an EC2 instance with proper instance profile. - In order to use sendgrid, fill in your sendgrid username and password - in the `[sendgrid]`_ section. + In order to use sendgrid, fill in your sendgrid API key in the + `[sendgrid]`_ section. In order to use smtp, fill in the appropriate fields in the `[smtp]`_ section. @@ -834,11 +834,8 @@ metrics_collector These parameters control sending error e-mails through SendGrid. -password - Password used for sendgrid login - -username - Name of the user for the sendgrid login +apikey + API key of the SendGrid account. [smtp] diff --git a/doc/example_top_artists.rst b/doc/example_top_artists.rst index a34bcb3775..67db656b4b 100644 --- a/doc/example_top_artists.rst +++ b/doc/example_top_artists.rst @@ -155,7 +155,7 @@ Python code for the Spark job is found below. # The second field is the artist counts = streams \ .map(lambda row: (row[1], 1)) \ - .reduceByKey(add) + .reduceByKey(operator.add) counts.write.option('sep', '\t').csv(output_path) diff --git a/luigi/contrib/ftp.py b/luigi/contrib/ftp.py index f91e9eafbe..07f8b81b6f 100644 --- a/luigi/contrib/ftp.py +++ b/luigi/contrib/ftp.py @@ -15,14 +15,14 @@ # limitations under the License. # """ -This library is a wrapper of ftplib. -It is convenient to move data from/to FTP. +This library is a wrapper of ftplib or pysftp. +It is convenient to move data from/to (S)FTP servers. There is an example on how to use it (example/ftp_experiment_outputs.py) You can also find unittest for each class. -Be aware that normal ftp do not provide secure communication. +Be aware that normal ftp does not provide secure communication. """ import datetime @@ -368,7 +368,8 @@ class RemoteTarget(luigi.target.FileSystemTarget): """ Target used for reading from remote files. - The target is implemented using ssh commands streaming data over the network. + The target is implemented using intermediate files on the local system. + On Python2, these files may not be cleaned up. """ def __init__( @@ -407,8 +408,23 @@ def open(self, mode): return self.format.pipe_writer(AtomicFtpFile(self._fs, self.path)) elif mode == 'r': - temp_dir = os.path.join(tempfile.gettempdir(), 'luigi-contrib-ftp') - self.__tmp_path = temp_dir + '/' + self.path.lstrip('/') + '-luigi-tmp-%09d' % random.randrange(0, 1e10) + temppath = '{}-luigi-tmp-{:09d}'.format( + self.path.lstrip('/'), random.randrange(0, 1e10) + ) + try: + # store reference to the TemporaryDirectory because it will be removed on GC + self.__temp_dir = tempfile.TemporaryDirectory( + prefix="luigi-contrib-ftp_" + ) + except AttributeError: + # TemporaryDirectory only available in Python3, use old behaviour in Python2 + # this file will not be cleaned up automatically + self.__tmp_path = os.path.join( + tempfile.gettempdir(), 'luigi-contrib-ftp', temppath + ) + else: + self.__tmp_path = os.path.join(self.__temp_dir.name, temppath) + # download file to local self._fs.get(self.path, self.__tmp_path) diff --git a/luigi/contrib/sge.py b/luigi/contrib/sge.py index 253b3a796f..2e6358ad96 100755 --- a/luigi/contrib/sge.py +++ b/luigi/contrib/sge.py @@ -278,9 +278,11 @@ def _dump(self, out_dir=''): d = pickle.dumps(self) module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0] d = d.replace('(c__main__', "(c" + module_name) - open(self.job_file, "w").write(d) + with open(self.job_file, "w") as f: + f.write(d) else: - pickle.dump(self, open(self.job_file, "w")) + with open(self.job_file, "wb") as f: + pickle.dump(self, f) def _run_job(self): diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 9f67b84c54..dce4913eae 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -799,6 +799,19 @@ def forgive_failures(self, task_id=None): self._state.set_status(task, status, self._config) return {"task_id": task_id, "status": task.status} + @rpc_method() + def mark_as_done(self, task_id=None): + status = DONE + task = self._state.get_task(task_id) + if task is None: + return {"task_id": task_id, "status": None} + + # we can force mark DONE for running or failed tasks + if task.status in {RUNNING, FAILED, DISABLED}: + self._update_task_history(task, status) + self._state.set_status(task, status, self._config) + return {"task_id": task_id, "status": task.status} + @rpc_method() def add_task(self, task_id=None, status=PENDING, runnable=True, deps=None, new_deps=None, expl=None, resources=None, @@ -992,6 +1005,10 @@ def get_scheduler_message_response(self, task_id, message_id): response = task.scheduler_message_responses.pop(message_id, None) return {"response": response} + @rpc_method() + def has_task_history(self): + return self._config.record_task_history + @rpc_method() def is_pause_enabled(self): return {'enabled': self._config.pause_enabled} diff --git a/luigi/static/visualiser/index.html b/luigi/static/visualiser/index.html index 76fb137cb9..1c7ca27a47 100644 --- a/luigi/static/visualiser/index.html +++ b/luigi/static/visualiser/index.html @@ -35,13 +35,14 @@ {{#error}}{{/error}} {{#re_enable}}{{/re_enable}} {{#re_enable}}Re-enable{{/re_enable}} + {{#mark_as_done}}{{/mark_as_done}} {{#trackingUrl}}{{/trackingUrl}} {{#statusMessage}}{{/statusMessage}} {{^statusMessage}} {{#progressPercentage}} {{/progressPercentage}} {{/statusMessage}} - {{#acceptsMessages}}{{/acceptsMessages}} + {{#acceptsMessages}}{{/acceptsMessages}} + +