From 1623df8721105f7f8d219741b066b277d8c43cf0 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Mon, 16 Nov 2020 13:32:36 +0100 Subject: [PATCH] Use different deserialization method in XCom init_on_load (#12327) The init_on_load method used deserialize_value method which in case of custom XCom backends may perform requests to external services (for example downloading file from buckets). This is problematic because wherever we query XCom the resuest would be send (for example when listing XCom in webui). This PR proposes implementing orm_deserialize_value which allows overriding this behavior. By default we use BaseXCom.deserialize_value. closes: #12315 --- airflow/models/xcom.py | 29 ++++++++++++++++++++--------- docs/concepts.rst | 6 ++++++ docs/spelling_wordlist.txt | 1 + tests/models/test_xcom.py | 16 ++++++++++++++++ 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index bd3ce168077ee..60979e0715f69 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -63,7 +63,7 @@ def init_on_load(self): i.e automatically deserialize Xcom value when loading from DB. """ try: - self.value = XCom.deserialize_value(self) + self.value = self.orm_deserialize_value() except (UnicodeEncodeError, ValueError): # For backward-compatibility. # Preventing errors in webserver @@ -235,16 +235,16 @@ def serialize_value(value: Any): return json.dumps(value).encode('UTF-8') except (ValueError, TypeError): log.error( - "Could not serialize the XCOM value into JSON. " + "Could not serialize the XCom value into JSON. " "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config." + "for XCom, then you need to enable pickle " + "support for XCom in your airflow config." ) raise @staticmethod - def deserialize_value(result) -> Any: - """Deserialize Xcom value from str or pickle object""" + def deserialize_value(result: "XCom") -> Any: + """Deserialize XCom value from str or pickle object""" enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') if enable_pickling: return pickle.loads(result.value) @@ -252,13 +252,24 @@ def deserialize_value(result) -> Any: return json.loads(result.value.decode('UTF-8')) except JSONDecodeError: log.error( - "Could not deserialize the XCOM value from JSON. " + "Could not deserialize the XCom value from JSON. " "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config." + "for XCom, then you need to enable pickle " + "support for XCom in your airflow config." ) raise + def orm_deserialize_value(self) -> Any: + """ + Deserialize method which is used to reconstruct ORM XCom object. + + This method should be overridden in custom XCom backends to avoid + unnecessary request or other resource consuming operations when + creating XCom orm model. This is used when viewing XCom listing + in the webserver, for example. + """ + return BaseXCom.deserialize_value(self) + def resolve_xcom_backend(): """Resolves custom XCom class""" diff --git a/docs/concepts.rst b/docs/concepts.rst index a0136e435dae2..27e6dea347c0d 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -803,6 +803,12 @@ to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` methods. +It is also possible to override the ``orm_deserialize_value`` method which is used for deserialization when +recreating ORM XCom object. This happens every time we query the XCom table, for example when we want to populate +XCom list view in webserver. If your XCom backend performs expensive operations, or has large values that aren't +useful to show in such a view, override this method to provide an alternative representation. By default Airflow will +use ``BaseXCom.orm_deserialize_value`` method which returns the value stored in Airflow database. + See :doc:`modules_management` for details on how Python and Airflow manage modules. .. _concepts:variables: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index b967af491ecae..0a1571a59c78f 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -995,6 +995,7 @@ openfaas oper optimise ora +orm orchestrator orgtbl os diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index 9793a26faf6f8..56082773afa57 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -16,6 +16,7 @@ # under the License. import os import unittest +from unittest import mock from airflow import settings from airflow.configuration import conf @@ -212,3 +213,18 @@ def test_xcom_get_many(self): for result in results: self.assertEqual(result.value, json_obj) + + @mock.patch("airflow.models.xcom.XCom.orm_deserialize_value") + def test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize): + # pylint: disable=unexpected-keyword-arg + instance = BaseXCom( + key="key", + value="value", + timestamp=timezone.utcnow(), + execution_date=timezone.utcnow(), + task_id="task_id", + dag_id="dag_id", + ) + # pylint: enable=unexpected-keyword-arg + instance.init_on_load() + mock_orm_deserialize.assert_called_once_with()