Skip to content

Commit

Permalink
Merge pull request #1 from spotify/master
Browse files Browse the repository at this point in the history
Pull latest changes
  • Loading branch information
tashrifbillah authored Nov 27, 2020
2 parents 4dfb142 + edd868b commit 963b4d6
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ Some more companies are using Luigi but haven't had a chance yet to write about
* `Hopper <https://www.hopper.com/>`_
* `VOYAGE GROUP/Zucks <https://zucks.co.jp/en/>`_
* `Textpert <https://www.textpert.ai/>`_
* `Tracktics <https://www.tracktics.com/>`_
* `Whizar <https://www.whizar.com/>`_
* `xtream <https://www.xtreamers.io/>`__
* `Skyscanner <https://www.skyscanner.net/>`_
Expand Down
2 changes: 1 addition & 1 deletion luigi/__meta__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
__author__ = 'The Luigi Authors'
__contact__ = 'https://github.com/spotify/luigi'
__license__ = 'Apache License 2.0'
__version__ = '3.0.1'
__version__ = '3.0.2'
__status__ = 'Production'
5 changes: 4 additions & 1 deletion luigi/contrib/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
# Maximum number of sleeps for eventual consistency.
EVENTUAL_CONSISTENCY_MAX_SLEEPS = 300

# Uri for batch requests
GCS_BATCH_URI = 'https://storage.googleapis.com/batch/storage/v1'


def _wait_for_consistency(checker):
"""Eventual consistency: wait until GCS reports something is true.
Expand Down Expand Up @@ -231,7 +234,7 @@ def remove(self, path, recursive=True):
raise InvalidDeleteException(
'Path {} is a directory. Must use recursive delete'.format(path))

req = http.BatchHttpRequest()
req = http.BatchHttpRequest(batch_uri=GCS_BATCH_URI)
for it in self._list_iter(bucket, self._add_path_delimiter(obj)):
req.add(self.client.objects().delete(bucket=bucket, object=it['name']))
req.execute()
Expand Down
5 changes: 4 additions & 1 deletion luigi/contrib/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ def run(self):
"backoffLimit": self.backoff_limit,
"template": {
"metadata": {
"name": self.uu_name
"name": self.uu_name,
"labels": {}
},
"spec": self.spec_schema
}
Expand All @@ -376,6 +377,8 @@ def run(self):
self.active_deadline_seconds
# Update user labels
job_json['metadata']['labels'].update(self.labels)
job_json['spec']['template']['metadata']['labels'].update(self.labels)

# Add default restartPolicy if not specified
if "restartPolicy" not in self.spec_schema:
job_json["spec"]["template"]["spec"]["restartPolicy"] = "Never"
Expand Down
8 changes: 7 additions & 1 deletion luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,13 @@ def get(self, name):

class RootPathHandler(BaseTaskHistoryHandler):
def get(self):
self.redirect("/static/visualiser/index.html")
# we omit the leading slash in case the visualizer is behind a different
# path (as in a reverse proxy setup)
#
# For example, if luigi is behind my.app.com/my/luigi/, we want / to
# redirect relative (so it goes to my.app.com/my/luigi/static/visualizer/index.html)
# instead of absolute (which would be my.app.com/static/visualizer/index.html)
self.redirect("static/visualiser/index.html")

def head(self):
"""HEAD endpoint for health checking the scheduler"""
Expand Down
1 change: 1 addition & 0 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ def __exit__(self, type, value, traceback):
for task in self._running_tasks.values():
if task.is_alive():
task.terminate()
self._task_result_queue.close()
return False # Don't suppress exception

def _generate_worker_info(self):
Expand Down
11 changes: 4 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ def get_static_files(path):
else:
install_requires.append('python-daemon')

# Tornado >=5 requires updated ssl module so we only allow it for recent enough
# versions of python (3.4+).
if sys.version_info[:2] >= (3, 4):
install_requires.append('tornado>=4.0,<6')
# Start from tornado 6, the minimum supported Python version is 3.5.2.
if sys.version_info[:3] >= (3, 5, 2):
install_requires.append('tornado>=5.0,<7')
else:
install_requires.append('tornado>=4.0,<5')
install_requires.append('tornado>=5.0,<6')

# Note: To support older versions of setuptools, we're explicitly not
# using conditional syntax (i.e. 'enum34>1.1.0;python_version<"3.4"').
Expand Down Expand Up @@ -111,8 +110,6 @@ def get_static_files(path):
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
Expand Down
5 changes: 5 additions & 0 deletions test/contrib/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class FailJob(KubernetesJobTask):
}]
}

@property
def labels(self):
return {"dummy_label": "dummy_value"}


@attr('contrib')
class TestK8STask(unittest.TestCase):
Expand All @@ -90,6 +94,7 @@ def test_fail_job(self):
job = Job(kube_api, jobs.response["items"][0])
self.assertTrue("failed" in job.obj["status"])
self.assertTrue(job.obj["status"]["failed"] > fail.max_retrials)
self.assertTrue(job.obj['spec']['template']['metadata']['labels'] == fail.labels())

@mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__get_job_status")
@mock.patch.object(KubernetesJobTask, "signal_complete")
Expand Down
5 changes: 5 additions & 0 deletions test/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def test_404(self):
def test_api_404(self):
self._test_404('/api/foo')

def test_root_redirect(self):
response = self.fetch("/", follow_redirects=False)
self.assertEqual(response.code, 302)
self.assertEqual(response.headers['Location'], 'static/visualiser/index.html') # assert that doesnt beging with leading slash !

def test_api_preflight_cors_headers(self):
response = self.fetch('/api/graph', method='OPTIONS', headers={'Origin': 'foo'})
headers = dict(response.headers)
Expand Down
6 changes: 3 additions & 3 deletions test/worker_external_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ def test_external_task_complete_but_missing_dep_at_runtime(self):
# split up scheduling task and running to simulate runtime scenario
with self._make_worker() as w:
w.add(test_task)
# touch output so test_task should be considered complete at runtime
open(test_task.output_path, 'a').close()
success = w.run()
# touch output so test_task should be considered complete at runtime
open(test_task.output_path, 'a').close()
success = w.run()

self.assertTrue(success)
# upstream dependency output didn't exist at runtime
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py{33,34,35,36,37}-{cdh,hdp,core,contrib,apache,aws,gcloud,postgres,unixsocket,azureblob,dropbox}, visualiser, docs, flake8
envlist = py{35,36,37}-{cdh,hdp,core,contrib,apache,aws,gcloud,postgres,unixsocket,azureblob,dropbox}, visualiser, docs, flake8
skipsdist = True

[testenv]
Expand All @@ -23,7 +23,7 @@ deps =
cdh,hdp: hdfs>=2.0.4,<3.0.0
postgres: psycopg2<3.0
mysql-connector-python>=8.0.12
gcloud: google-api-python-client>=1.4.0,<2.0
gcloud: google-api-python-client>=1.6.6,<2.0
avro-python3
gcloud: google-auth==1.4.1
gcloud: google-auth-httplib2==0.0.3
Expand Down

0 comments on commit 963b4d6

Please sign in to comment.