From 354d501eba4ece015be2c3a0403d195116be2556 Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Thu, 23 Jan 2025 11:08:24 +0000 Subject: [PATCH 1/4] Use orjson for json serialization in Task SDK logging --- task_sdk/pyproject.toml | 3 ++- task_sdk/src/airflow/sdk/log.py | 8 +++---- task_sdk/tests/conftest.py | 36 +++++++++++++++++++----------- task_sdk/tests/test_log.py | 39 +++++++++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 19 deletions(-) create mode 100644 task_sdk/tests/test_log.py diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml index e2eefdd5fcfdc..2e55124eb58fa 100644 --- a/task_sdk/pyproject.toml +++ b/task_sdk/pyproject.toml @@ -27,10 +27,11 @@ dependencies = [ "httpx>=0.27.0", "jinja2>=3.1.4", "methodtools>=0.4.7", - "msgspec>=0.18.6", "psutil>=6.1.0", "structlog>=24.4.0", "retryhttp>=1.2.0", + "orjson>=3.10.11", + "msgspec>=0.19.0", ] classifiers = [ "Framework :: Apache Airflow", diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py index 3290d0ff7f3e7..c9b38af51f2d8 100644 --- a/task_sdk/src/airflow/sdk/log.py +++ b/task_sdk/src/airflow/sdk/log.py @@ -26,7 +26,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Generic, TextIO, TypeVar -import msgspec +import orjson import structlog if TYPE_CHECKING: @@ -196,13 +196,11 @@ def logging_processors( else: exc_group_processor = None - encoder = msgspec.json.Encoder() - def json_dumps(msg, default): - return encoder.encode(msg) + return orjson.dumps(msg, default=default) def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: - return encoder.encode(event_dict).decode("utf-8") + return orjson.dumps(event_dict).decode("utf-8") json = structlog.processors.JSONRenderer(serializer=json_dumps) diff --git a/task_sdk/tests/conftest.py b/task_sdk/tests/conftest.py index 3fc7fc18015c7..ee3801a07179c 100644 --- a/task_sdk/tests/conftest.py +++ b/task_sdk/tests/conftest.py @@ -62,18 +62,19 @@ def pytest_runtest_setup(item): class LogCapture: # Like structlog.typing.LogCapture, but that doesn't add log_level in to the event dict - entries: list[EventDict] + entries: list[EventDict | bytes] def __init__(self) -> None: self.entries = [] - def __call__(self, _: WrappedLogger, method_name: str, event_dict: EventDict) -> NoReturn: + def __call__(self, _: WrappedLogger, method_name: str, event: EventDict | bytes) -> NoReturn: from structlog.exceptions import DropEvent - if "level" not in event_dict: - event_dict["_log_level"] = method_name + if isinstance(event, dict): + if "level" not in event: + event["_log_level"] = method_name - self.entries.append(event_dict) + self.entries.append(event) raise DropEvent @@ -93,20 +94,29 @@ def captured_logs(request): reset_logging() configure_logging(enable_pretty_log=False) - # Get log level from test parameter, defaulting to INFO if not provided - log_level = getattr(request, "param", logging.INFO) + # Get log level from test parameter, which can either be a single log level or a + # tuple of log level and desired output type, defaulting to INFO if not provided + log_level = logging.INFO + output = "dict" + param = getattr(request, "param", logging.INFO) + if isinstance(param, int): + log_level = param + elif isinstance(param, tuple): + log_level = param[0] + output = param[1] # We want to capture all logs, but we don't want to see them in the test output structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(log_level)) - # But we need to replace remove the last processor (the one that turns JSON into text, as we want the - # event dict for tests) cur_processors = structlog.get_config()["processors"] processors = cur_processors.copy() - proc = processors.pop() - assert isinstance( - proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer) - ), "Pre-condition" + if output == "dict": + # We need to replace remove the last processor (the one that turns JSON into text, as we want the + # event dict for tests) + proc = processors.pop() + assert isinstance(proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer)), ( + "Pre-condition" + ) try: cap = LogCapture() processors.append(cap) diff --git a/task_sdk/tests/test_log.py b/task_sdk/tests/test_log.py new file mode 100644 index 0000000000000..b6fe21bd199aa --- /dev/null +++ b/task_sdk/tests/test_log.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import logging +import unittest.mock + +import orjson +import pytest +import structlog +from uuid6 import UUID + +from airflow.sdk.api.datamodels._generated import TaskInstance + + +@pytest.mark.parametrize( + "captured_logs", [(logging.INFO, "json")], indirect=True, ids=["log_level=info,formatter=json"] +) +def test_json_rendering(captured_logs): + """ + Test that the JSON formatter renders correctly. + """ + logger = structlog.get_logger() + logger.info( + "A test message with a Pydantic class", + pydantic_class=TaskInstance( + id=UUID("ffec3c8e-2898-46f8-b7d5-3cc571577368"), + dag_id="test_dag", + task_id="test_task", + run_id="test_run", + try_number=1, + ), + ) + assert captured_logs + assert isinstance(captured_logs[0], bytes) + assert orjson.loads(captured_logs[0]) == { + "event": "A test message with a Pydantic class", + "pydantic_class": "TaskInstance(id=UUID('ffec3c8e-2898-46f8-b7d5-3cc571577368'), task_id='test_task', dag_id='test_dag', run_id='test_run', try_number=1, map_index=-1, hostname=None)", + "timestamp": unittest.mock.ANY, + "level": "info", + } From 41b26f2475608dde6bfe08f263b748ca04016c28 Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Thu, 23 Jan 2025 20:59:19 +0000 Subject: [PATCH 2/4] Formatting fixes --- task_sdk/tests/conftest.py | 6 +++--- task_sdk/tests/test_log.py | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/task_sdk/tests/conftest.py b/task_sdk/tests/conftest.py index ee3801a07179c..e24f6e397d3e5 100644 --- a/task_sdk/tests/conftest.py +++ b/task_sdk/tests/conftest.py @@ -114,9 +114,9 @@ def captured_logs(request): # We need to replace remove the last processor (the one that turns JSON into text, as we want the # event dict for tests) proc = processors.pop() - assert isinstance(proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer)), ( - "Pre-condition" - ) + assert isinstance( + proc, (structlog.dev.ConsoleRenderer, structlog.processors.JSONRenderer) + ), "Pre-condition" try: cap = LogCapture() processors.append(cap) diff --git a/task_sdk/tests/test_log.py b/task_sdk/tests/test_log.py index b6fe21bd199aa..f7c91632e7c5a 100644 --- a/task_sdk/tests/test_log.py +++ b/task_sdk/tests/test_log.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from __future__ import annotations import logging From d43c9e9262c9fc84641ae43144ef9ae48f78f43d Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Sun, 26 Jan 2025 17:32:54 +0000 Subject: [PATCH 3/4] Use msgspec enc_hook --- task_sdk/pyproject.toml | 1 - task_sdk/src/airflow/sdk/log.py | 6 +++--- task_sdk/tests/test_log.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml index 2e55124eb58fa..e278300760bb9 100644 --- a/task_sdk/pyproject.toml +++ b/task_sdk/pyproject.toml @@ -30,7 +30,6 @@ dependencies = [ "psutil>=6.1.0", "structlog>=24.4.0", "retryhttp>=1.2.0", - "orjson>=3.10.11", "msgspec>=0.19.0", ] classifiers = [ diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py index c9b38af51f2d8..fa5b113588bf5 100644 --- a/task_sdk/src/airflow/sdk/log.py +++ b/task_sdk/src/airflow/sdk/log.py @@ -26,7 +26,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Generic, TextIO, TypeVar -import orjson +import msgspec import structlog if TYPE_CHECKING: @@ -197,10 +197,10 @@ def logging_processors( exc_group_processor = None def json_dumps(msg, default): - return orjson.dumps(msg, default=default) + return msgspec.json.encode(msg, enc_hook=default) def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: - return orjson.dumps(event_dict).decode("utf-8") + return msgspec.json.encode(event_dict).decode("utf-8") json = structlog.processors.JSONRenderer(serializer=json_dumps) diff --git a/task_sdk/tests/test_log.py b/task_sdk/tests/test_log.py index f7c91632e7c5a..bf00f33e9a7ec 100644 --- a/task_sdk/tests/test_log.py +++ b/task_sdk/tests/test_log.py @@ -17,10 +17,10 @@ from __future__ import annotations +import json import logging import unittest.mock -import orjson import pytest import structlog from uuid6 import UUID @@ -48,7 +48,7 @@ def test_json_rendering(captured_logs): ) assert captured_logs assert isinstance(captured_logs[0], bytes) - assert orjson.loads(captured_logs[0]) == { + assert json.loads(captured_logs[0]) == { "event": "A test message with a Pydantic class", "pydantic_class": "TaskInstance(id=UUID('ffec3c8e-2898-46f8-b7d5-3cc571577368'), task_id='test_task', dag_id='test_dag', run_id='test_run', try_number=1, map_index=-1, hostname=None)", "timestamp": unittest.mock.ANY, From edf815b0ae5b6b307133864d94f4435f57a3319e Mon Sep 17 00:00:00 2001 From: Ian Buss Date: Mon, 27 Jan 2025 21:10:19 +0000 Subject: [PATCH 4/4] Sort pyproject dependencies --- task_sdk/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml index e278300760bb9..06b3a9404065f 100644 --- a/task_sdk/pyproject.toml +++ b/task_sdk/pyproject.toml @@ -27,10 +27,10 @@ dependencies = [ "httpx>=0.27.0", "jinja2>=3.1.4", "methodtools>=0.4.7", + "msgspec>=0.19.0", "psutil>=6.1.0", "structlog>=24.4.0", "retryhttp>=1.2.0", - "msgspec>=0.19.0", ] classifiers = [ "Framework :: Apache Airflow",