From f13be080c717fe0ee9520a4303d29a3311002474 Mon Sep 17 00:00:00 2001 From: Karl Sutt Date: Fri, 1 Dec 2023 16:03:17 +0200 Subject: [PATCH] Report busy job metrics for correct queues (#77) --- judoscale/celery/collector.py | 14 +++++----- tests/test_collectors.py | 52 ++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/judoscale/celery/collector.py b/judoscale/celery/collector.py index 1100e8c..ecd6180 100644 --- a/judoscale/celery/collector.py +++ b/judoscale/celery/collector.py @@ -110,15 +110,15 @@ def collect(self) -> List[Metric]: if not self.should_collect: return metrics - if self.adapter_config["TRACK_BUSY_JOBS"] and ( - workers_tasks := self.inspect.active() - ): + if self.adapter_config["TRACK_BUSY_JOBS"]: busy_counts = defaultdict(lambda: 0) - for active_tasks in workers_tasks.values(): - for task in active_tasks: - busy_counts[task["delivery_info"]["routing_key"]] += 1 + if workers_tasks := self.inspect.active(): + for active_tasks in workers_tasks.values(): + for task in active_tasks: + busy_counts[task["delivery_info"]["routing_key"]] += 1 - for queue, count in busy_counts.items(): + for queue in self.queues: + count = busy_counts[queue] metrics.append(Metric.for_busy_queue(queue_name=queue, busy_jobs=count)) logger.debug(f"Collecting metrics for queues {list(self.queues)}") diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 4f55c5d..b7b7300 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -224,7 +224,6 @@ def test_collect_with_busy_jobs(self, worker_1, celery, monkeypatch): celery.connection_for_read().channel().client.lindex.return_value = bytes( json.dumps({"properties": {"published_at": now - 60}}), "utf-8" ) - celery.connection_for_read().channel().client.llen.return_value = 1 worker_1["CELERY"] = {"TRACK_BUSY_JOBS": True} collector = CeleryMetricsCollector(worker_1, celery) @@ -240,6 +239,57 @@ def test_collect_with_busy_jobs(self, worker_1, celery, monkeypatch): assert metrics[1].queue_name == "foo" assert metrics[1].value == approx(60000, abs=100) + def test_collect_with_busy_jobs_and_user_defined_queues( + self, worker_1, celery, monkeypatch + ): + now = time.time() + + inspect = Mock() + inspect.active.return_value = { + "some_worker": [{"name": "a_task", "delivery_info": {"routing_key": "foo"}}] + } + + monkeypatch.setattr(celery.control, "inspect", lambda: inspect) + celery.connection_for_read().channel().client.scan_iter.return_value = [ + b"foo", + ] + + def mock_lindex(queue, _): + return { + "bar": {}, + "foo": bytes( + json.dumps({"properties": {"published_at": now - 60}}), "utf-8" + ), + }[queue] + + monkeypatch.setattr( + celery.connection_for_read().channel().client, "lindex", mock_lindex + ) + + worker_1["CELERY"] = {"QUEUES": ["foo", "bar"], "TRACK_BUSY_JOBS": True} + collector = CeleryMetricsCollector(worker_1, celery) + metrics = collector.collect() + + assert len(metrics) == 4 + metrics = sorted(metrics, key=lambda m: m.queue_name) + metrics = sorted(metrics, key=lambda m: m.measurement) + + assert metrics[0].measurement == "busy" + assert metrics[0].queue_name == "bar" + assert metrics[0].value == 0 + + assert metrics[1].measurement == "busy" + assert metrics[1].queue_name == "foo" + assert metrics[1].value == 1 + + assert metrics[2].measurement == "queue_time" + assert metrics[2].queue_name == "bar" + assert metrics[2].value == 0 + + assert metrics[3].measurement == "queue_time" + assert metrics[3].queue_name == "foo" + assert metrics[3].value == approx(60000, abs=100) + class TestRQMetricsCollector: def test_adapter_config(self, render_worker):