Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge back #6

Merged
merged 2 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ send_messages
metrics_collector
Optional setting allowing Luigi to use a contribution to collect metrics
about the pipeline to a third-party. By default this uses the default metric
collector that acts as a shell and does nothing. The only currently available
option is "datadog".
collector that acts as a shell and does nothing. The currently available
options are "datadog" and "prometheus".


[sendgrid]
Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/docker_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def run(self):
self.__logger.error("Container " + container_name +
" exited with non zero code: " + message)
raise
except ImageNotFound as e:
except ImageNotFound:
self.__logger.error("Image " + self._image + " not found")
raise
except APIError as e:
Expand Down
4 changes: 2 additions & 2 deletions luigi/contrib/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _rm_recursive(self, ftp, path):

try:
names = ftp.nlst()
except ftplib.all_errors as e:
except ftplib.all_errors:
# some FTP servers complain when you try and list non-existent paths
return

Expand All @@ -226,7 +226,7 @@ def _rm_recursive(self, ftp, path):
ftp.cwd(wd) # don't try a nuke a folder we're in
ftp.cwd(path) # then go back to where we were
self._rm_recursive(ftp, name)
except ftplib.all_errors as e:
except ftplib.all_errors:
ftp.delete(name)

try:
Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/mssqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

try:
import _mssql
except ImportError as e:
except ImportError:
logger.warning("Loading MSSQL module without the python package pymssql. \
This will crash at runtime if SQL Server functionality is used.")

Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/mysqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
try:
import mysql.connector
from mysql.connector import errorcode, Error
except ImportError as e:
except ImportError:
logger.warning("Loading MySQL module without the python package mysql-connector-python. \
This will crash at runtime if MySQL functionality is used.")

Expand Down
60 changes: 60 additions & 0 deletions luigi/contrib/prometheus_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest
from luigi.metrics import MetricsCollector


class PrometheusMetricsCollector(MetricsCollector):

def __init__(self):
super(PrometheusMetricsCollector, self).__init__()
self.registry = CollectorRegistry()
self.task_started_counter = Counter(
'luigi_task_started_total',
'number of started luigi tasks',
['family'],
registry=self.registry
)
self.task_failed_counter = Counter(
'luigi_task_failed_total',
'number of failed luigi tasks',
['family'],
registry=self.registry
)
self.task_disabled_counter = Counter(
'luigi_task_disabled_total',
'number of disabled luigi tasks',
['family'],
registry=self.registry
)
self.task_done_counter = Counter(
'luigi_task_done_total',
'number of done luigi tasks',
['family'],
registry=self.registry
)
self.task_execution_time = Gauge(
'luigi_task_execution_time_seconds',
'luigi task execution time in seconds',
['family'],
registry=self.registry
)

def generate_latest(self):
return generate_latest(self.registry)

def handle_task_started(self, task):
self.task_started_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family)

def handle_task_failed(self, task):
self.task_failed_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)

def handle_task_disabled(self, task, config):
self.task_disabled_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)

def handle_task_done(self, task):
self.task_done_counter.labels(family=task.family).inc()
# time_running can be `None` if task was already complete
if task.time_running is not None:
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
2 changes: 1 addition & 1 deletion luigi/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _read_pids_file(pid_file):
# First setup a python 2 vs 3 compatibility
# http://stackoverflow.com/a/21368622/621449
try:
FileNotFoundError
FileNotFoundError # noqa: F823
except NameError:
# Should only happen on python 2
FileNotFoundError = IOError
Expand Down
7 changes: 7 additions & 0 deletions luigi/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class MetricsCollectors(Enum):
default = 1
none = 1
datadog = 2
prometheus = 3

@classmethod
def get(cls, which):
Expand All @@ -16,6 +17,9 @@ def get(cls, which):
elif which == MetricsCollectors.datadog:
from luigi.contrib.datadog_metric import DatadogMetricsCollector
return DatadogMetricsCollector()
elif which == MetricsCollectors.prometheus:
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector
return PrometheusMetricsCollector()
else:
raise ValueError("MetricsCollectors value ' {0} ' isn't supported", which)

Expand Down Expand Up @@ -46,6 +50,9 @@ def handle_task_disabled(self, task, config):
def handle_task_done(self, task):
pass

def generate_latest(self):
return


class NoMetricsCollector(MetricsCollector):
"""Empty MetricsCollector when no collector is being used
Expand Down
24 changes: 8 additions & 16 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,9 @@ def set_status(self, task, new_status, config=None):

if new_status == FAILED and task.status != DISABLED:
task.add_failure()
self.update_metrics_task_failed(task)
if task.has_excessive_failures():
task.scheduler_disable_time = time.time()
new_status = DISABLED
self.update_metrics_task_disabled(task, config)
if not config.batch_emails:
notifications.send_error_email(
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
Expand All @@ -594,9 +592,7 @@ def set_status(self, task, new_status, config=None):
self._status_tasks[new_status][task.id] = task
task.status = new_status
task.updated = time.time()

if new_status == DONE:
self.update_metrics_task_done(task)
self.update_metrics(task, config)

if new_status == FAILED:
task.retry = time.time() + config.retry_delay
Expand Down Expand Up @@ -680,17 +676,13 @@ def disable_workers(self, worker_ids):
worker.disabled = True
worker.tasks.clear()

def update_metrics_task_started(self, task):
self._metrics_collector.handle_task_started(task)

def update_metrics_task_disabled(self, task, config):
self._metrics_collector.handle_task_disabled(task, config)

def update_metrics_task_failed(self, task):
self._metrics_collector.handle_task_failed(task)

def update_metrics_task_done(self, task):
self._metrics_collector.handle_task_done(task)
def update_metrics(self, task, config):
if task.status == DISABLED:
self._metrics_collector.handle_task_disabled(task, config)
elif task.status == DONE:
self._metrics_collector.handle_task_done(task)
elif task.status == FAILED:
self._metrics_collector.handle_task_failed(task)


class Scheduler(object):
Expand Down
15 changes: 14 additions & 1 deletion luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import time

import pkg_resources
from prometheus_client import CONTENT_TYPE_LATEST
import tornado.httpserver
import tornado.ioloop
import tornado.netutil
Expand Down Expand Up @@ -274,6 +275,17 @@ def get(self):
self.redirect("/static/visualiser/index.html")


class MetricsHandler(tornado.web.RequestHandler):
def initialize(self, scheduler):
self._scheduler = scheduler

def get(self):
metrics = self._scheduler._state._metrics_collector.generate_latest()
if metrics:
self.write(metrics)
self.set_header('Content-Type', CONTENT_TYPE_LATEST)


def app(scheduler):
settings = {"static_path": os.path.join(os.path.dirname(__file__), "static"),
"unescape": tornado.escape.xhtml_unescape,
Expand All @@ -287,7 +299,8 @@ def app(scheduler):
(r'/history', RecentRunHandler, {'scheduler': scheduler}),
(r'/history/by_name/(.*?)', ByNameHandler, {'scheduler': scheduler}),
(r'/history/by_id/(.*?)', ByIdHandler, {'scheduler': scheduler}),
(r'/history/by_params/(.*?)', ByParamsHandler, {'scheduler': scheduler})
(r'/history/by_params/(.*?)', ByParamsHandler, {'scheduler': scheduler}),
(r'/metrics', MetricsHandler, {'scheduler': scheduler})
]
api_app = tornado.web.Application(handlers, **settings)
return api_app
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def get_static_files(path):
},
install_requires=install_requires,
extras_require={
'prometheus': ['prometheus-client==0.5.0'],
'toml': ['toml<2.0.0'],
},
classifiers=[
Expand Down
70 changes: 70 additions & 0 deletions test/contrib/prometheus_metric_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from helpers import unittest
from nose.plugins.attrib import attr

from luigi.contrib.prometheus_metric import PrometheusMetricsCollector
from luigi.metrics import MetricsCollectors
from luigi.scheduler import Scheduler

WORKER = 'myworker'
TASK_ID = 'TaskID'
TASK_FAMILY = 'TaskFamily'


@attr('contrib')
class PrometheusMetricTest(unittest.TestCase):
def setUp(self):
self.collector = PrometheusMetricsCollector()
self.s = Scheduler(metrics_collector=MetricsCollectors.prometheus)
self.gauge_name = 'luigi_task_execution_time_seconds'
self.labels = {'family': TASK_FAMILY}

def startTask(self):
self.s.add_task(worker=WORKER, task_id=TASK_ID, family=TASK_FAMILY)
task = self.s._state.get_task(TASK_ID)
task.time_running = 0
task.updated = 5
return task

def test_handle_task_started(self):
task = self.startTask()
self.collector.handle_task_started(task)

counter_name = 'luigi_task_started_total'
gauge_name = self.gauge_name
labels = self.labels

assert self.collector.registry.get_sample_value(counter_name, labels=self.labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == 0

def test_handle_task_failed(self):
task = self.startTask()
self.collector.handle_task_failed(task)

counter_name = 'luigi_task_failed_total'
gauge_name = self.gauge_name
labels = self.labels

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running

def test_handle_task_disabled(self):
task = self.startTask()
self.collector.handle_task_disabled(task, self.s._config)

counter_name = 'luigi_task_disabled_total'
gauge_name = self.gauge_name
labels = self.labels

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running

def test_handle_task_done(self):
task = self.startTask()
self.collector.handle_task_done(task)

counter_name = 'luigi_task_done_total'
gauge_name = self.gauge_name
labels = self.labels

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
7 changes: 7 additions & 0 deletions test/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import luigi.metrics as metrics

from luigi.contrib.datadog_metric import DatadogMetricsCollector
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector


class TestMetricsCollectors(unittest.TestCase):
Expand All @@ -18,6 +19,12 @@ def test_datadog_value(self):

assert type(output) is DatadogMetricsCollector

def test_prometheus_value(self):
collector = metrics.MetricsCollectors.prometheus
output = metrics.MetricsCollectors.get(collector)

assert type(output) is PrometheusMetricsCollector

def test_none_value(self):
collector = metrics.MetricsCollectors.none
output = metrics.MetricsCollectors.get(collector)
Expand Down
9 changes: 9 additions & 0 deletions test/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ def test_datadog_metrics_collector(self):
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, DatadogMetricsCollector))

@with_config({'scheduler': {'metrics_collector': 'prometheus'}})
def test_prometheus_metrics_collector(self):
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector

s = luigi.scheduler.Scheduler()
scheduler_state = s._state
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, PrometheusMetricsCollector))


class SchedulerWorkerTest(unittest.TestCase):
def get_pending_ids(self, worker, state):
Expand Down
3 changes: 3 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ deps=
responses<1.0.0
azure-storage
datadog==0.22.0
prometheus-client==0.5.0
passenv =
USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI
setenv =
Expand Down Expand Up @@ -97,6 +98,7 @@ commands =
# By putting it here, local flake8 runs will also pick it up.
[flake8]
max-line-length=160
builtins = unicode

[testenv:flake8]
deps =
Expand All @@ -123,6 +125,7 @@ deps =
sphinx_rtd_theme
enum34>1.1.0
azure-storage
prometheus-client==0.5.0
commands =
# build API docs
sphinx-apidoc -o doc/api -T luigi --separate
Expand Down