Skip to content

Commit

Permalink
Use different deserialization method in XCom init_on_load (#12327)
Browse files Browse the repository at this point in the history
The init_on_load method used deserialize_value method which
in case of custom XCom backends may perform requests to external
services (for example downloading file from buckets).

This is problematic because wherever we query XCom the resuest would be
send (for example when listing XCom in webui). This PR proposes implementing
orm_deserialize_value which allows overriding this behavior. By default
we use BaseXCom.deserialize_value.

closes: #12315
  • Loading branch information
turbaszek authored Nov 16, 2020
1 parent 917e6c4 commit 1623df8
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 9 deletions.
29 changes: 20 additions & 9 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def init_on_load(self):
i.e automatically deserialize Xcom value when loading from DB.
"""
try:
self.value = XCom.deserialize_value(self)
self.value = self.orm_deserialize_value()
except (UnicodeEncodeError, ValueError):
# For backward-compatibility.
# Preventing errors in webserver
Expand Down Expand Up @@ -235,30 +235,41 @@ def serialize_value(value: Any):
return json.dumps(value).encode('UTF-8')
except (ValueError, TypeError):
log.error(
"Could not serialize the XCOM value into JSON. "
"Could not serialize the XCom value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config."
"for XCom, then you need to enable pickle "
"support for XCom in your airflow config."
)
raise

@staticmethod
def deserialize_value(result) -> Any:
"""Deserialize Xcom value from str or pickle object"""
def deserialize_value(result: "XCom") -> Any:
"""Deserialize XCom value from str or pickle object"""
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
return pickle.loads(result.value)
try:
return json.loads(result.value.decode('UTF-8'))
except JSONDecodeError:
log.error(
"Could not deserialize the XCOM value from JSON. "
"Could not deserialize the XCom value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config."
"for XCom, then you need to enable pickle "
"support for XCom in your airflow config."
)
raise

def orm_deserialize_value(self) -> Any:
"""
Deserialize method which is used to reconstruct ORM XCom object.
This method should be overridden in custom XCom backends to avoid
unnecessary request or other resource consuming operations when
creating XCom orm model. This is used when viewing XCom listing
in the webserver, for example.
"""
return BaseXCom.deserialize_value(self)


def resolve_xcom_backend():
"""Resolves custom XCom class"""
Expand Down
6 changes: 6 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,12 @@ to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter
deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value``
methods.

It is also possible to override the ``orm_deserialize_value`` method which is used for deserialization when
recreating ORM XCom object. This happens every time we query the XCom table, for example when we want to populate
XCom list view in webserver. If your XCom backend performs expensive operations, or has large values that aren't
useful to show in such a view, override this method to provide an alternative representation. By default Airflow will
use ``BaseXCom.orm_deserialize_value`` method which returns the value stored in Airflow database.

See :doc:`modules_management` for details on how Python and Airflow manage modules.

.. _concepts:variables:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ openfaas
oper
optimise
ora
orm
orchestrator
orgtbl
os
Expand Down
16 changes: 16 additions & 0 deletions tests/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import os
import unittest
from unittest import mock

from airflow import settings
from airflow.configuration import conf
Expand Down Expand Up @@ -212,3 +213,18 @@ def test_xcom_get_many(self):

for result in results:
self.assertEqual(result.value, json_obj)

@mock.patch("airflow.models.xcom.XCom.orm_deserialize_value")
def test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize):
# pylint: disable=unexpected-keyword-arg
instance = BaseXCom(
key="key",
value="value",
timestamp=timezone.utcnow(),
execution_date=timezone.utcnow(),
task_id="task_id",
dag_id="dag_id",
)
# pylint: enable=unexpected-keyword-arg
instance.init_on_load()
mock_orm_deserialize.assert_called_once_with()

0 comments on commit 1623df8

Please sign in to comment.