Skip to content

Commit

Permalink
add extra metadata (#71846)
Browse files Browse the repository at this point in the history
For `apply_async` we weren't passing in `start_time` so the tasks
weren't getting the queue time
  • Loading branch information
ykamo001 authored Jun 5, 2024
1 parent 1a2ac78 commit 7cb5761
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/sentry/celery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gc
from datetime import datetime
from itertools import chain
from typing import Any

from celery import Celery, Task, signals
from celery.worker.request import Request
Expand Down Expand Up @@ -85,11 +86,22 @@ def celery_prefork_freeze_gc(**kwargs: object) -> None:
class SentryTask(Task):
Request = "sentry.celery:SentryRequest"

def delay(self, *args, **kwargs):
@classmethod
def _add_metadata(cls, kwargs: dict[str, Any] | None) -> None:
"""
Helper method that adds relevant metadata
"""
if kwargs is None:
return None
# Add the start time when the task was kicked off for async processing by the calling code
kwargs["__start_time"] = datetime.now().timestamp()

def delay(self, *args, **kwargs):
self._add_metadata(kwargs)
return super().delay(*args, **kwargs)

def apply_async(self, *args, **kwargs):
self._add_metadata(kwargs)
# If intended detect bad uses of pickle and make the tasks fail in tests. This should
# in theory pick up a lot of bad uses without accidentally failing tasks in prod.
if (
Expand Down
9 changes: 9 additions & 0 deletions src/sentry/testutils/helpers/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,18 @@ def _apply_async(
kwargs: dict[str, Any] | None = None,
countdown: float | None = None,
queue: str | None = None,
**options: Any,
) -> None:
if not self._active:
raise AssertionError("task enqueued to burst runner while burst was not active!")

try:
_start_time = options.pop("__start_time", None)
if _start_time and kwargs:
kwargs["__start_time"] = _start_time
except Exception:
pass

self.queue.append((task, args, {} if kwargs is None else kwargs))

@contextlib.contextmanager
Expand Down

0 comments on commit 7cb5761

Please sign in to comment.