From aff3a361b4092212c0757f9ce88fa2e40d25d1f4 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Thu, 20 Feb 2020 16:00:05 +0100 Subject: [PATCH] [AIRFLOW-6558] Campaign Manager operators for conversions (#7420) --- airflow/providers/google/cloud/hooks/base.py | 5 +- .../example_dags/example_campaign_manager.py | 61 ++++++ .../hooks/campaign_manager.py | 122 +++++++++++ .../operators/campaign_manager.py | 178 +++++++++++++++- docs/howto/operator/gcp/campaign_manager.rst | 38 ++++ .../hooks/test_campaign_manager.py | 194 +++++++++++------- .../operators/test_campaign_manager.py | 76 +++++++ .../operators/test_campaign_manager_system.py | 4 +- tests/test_utils/gcp_system_helpers.py | 3 + 9 files changed, 598 insertions(+), 83 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/base.py b/airflow/providers/google/cloud/hooks/base.py index 5b80c4c0dd0fe..fd8e6d4b74af2 100644 --- a/airflow/providers/google/cloud/hooks/base.py +++ b/airflow/providers/google/cloud/hooks/base.py @@ -326,7 +326,10 @@ def wrapper_decorator(self: CloudBaseHook, *args, **kwargs) -> RT: self.log.error('The request failed, the parameters are invalid.') raise AirflowException(e) except HttpError as e: - self.log.error('The request failed:\n%s', str(e)) + if hasattr(e, "content"): + self.log.error('The request failed:\n%s', e.content.decode(encoding="utf-8")) + else: + self.log.error('The request failed:\n%s', str(e)) raise AirflowException(e) return wrapper_decorator diff --git a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py b/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py index a0a2335fc78de..d26b44280ba4c 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py +++ b/airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py @@ -19,9 +19,11 @@ Example Airflow DAG that shows how to use CampaignManager. """ import os +import time from airflow import models from airflow.providers.google.marketing_platform.operators.campaign_manager import ( + GoogleCampaignManagerBatchInsertConversionsOperator, GoogleCampaignManagerBatchUpdateConversionsOperator, GoogleCampaignManagerDeleteReportOperator, GoogleCampaignManagerDownloadReportOperator, GoogleCampaignManagerInsertReportOperator, GoogleCampaignManagerRunReportOperator, ) @@ -31,6 +33,10 @@ from airflow.utils import dates PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789") +FLOODLIGHT_ACTIVITY_ID = os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345) +FLOODLIGHT_CONFIGURATION_ID = os.environ.get("FLOODLIGHT_CONFIGURATION_ID", 12345) +ENCRYPTION_ENTITY_ID = os.environ.get("ENCRYPTION_ENTITY_ID", 12345) +DEVICE_ID = os.environ.get("DEVICE_ID", "12345") BUCKET = os.environ.get("MARKETING_BUCKET", "test-cm-bucket") REPORT_NAME = "test-report" REPORT = { @@ -48,6 +54,33 @@ }, } +CONVERSION = { + "kind": "dfareporting#conversion", + "floodlightActivityId": FLOODLIGHT_ACTIVITY_ID, + "floodlightConfigurationId": FLOODLIGHT_CONFIGURATION_ID, + "mobileDeviceId": DEVICE_ID, + "ordinal": "0", + "quantity": 42, + "value": 123.4, + "timestampMicros": int(time.time()) * 1000000, + "customVariables": [ + { + "kind": "dfareporting#customFloodlightVariable", + "type": "U4", + "value": "value", + } + ], +} + +CONVERSION_UPDATE = { + "kind": "dfareporting#conversion", + "floodlightActivityId": FLOODLIGHT_ACTIVITY_ID, + "floodlightConfigurationId": FLOODLIGHT_CONFIGURATION_ID, + "mobileDeviceId": DEVICE_ID, + "ordinal": "0", + "quantity": 42, + "value": 123.4, +} default_args = {"start_date": dates.days_ago(1)} @@ -97,3 +130,31 @@ # [END howto_campaign_manager_delete_report_operator] create_report >> run_report >> wait_for_report >> get_report >> delete_report + + # [START howto_campaign_manager_insert_conversions] + insert_conversion = GoogleCampaignManagerBatchInsertConversionsOperator( + task_id="insert_conversion", + profile_id=PROFILE_ID, + conversions=[CONVERSION], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=ENCRYPTION_ENTITY_ID, + ) + # [END howto_campaign_manager_insert_conversions] + + # [START howto_campaign_manager_update_conversions] + update_conversion = GoogleCampaignManagerBatchUpdateConversionsOperator( + task_id="update_conversion", + profile_id=PROFILE_ID, + conversions=[CONVERSION_UPDATE], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=ENCRYPTION_ENTITY_ID, + ) + # [END howto_campaign_manager_update_conversions] + + insert_conversion >> update_conversion + +if __name__ == "__main__": + dag.clear(reset_dag_runs=True) + dag.run() diff --git a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py index ba14502c687cb..2cf6974ce12d3 100644 --- a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py +++ b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py @@ -23,6 +23,7 @@ from googleapiclient import http from googleapiclient.discovery import Resource, build +from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.base import CloudBaseHook @@ -229,3 +230,124 @@ def get_report_file( .get_media(fileId=file_id, profileId=profile_id, reportId=report_id) ) return request + + @staticmethod + def _conversions_batch_request( + conversions: List[Dict[str, Any]], + encryption_entity_type: str, + encryption_entity_id: int, + encryption_source: str, + kind: str, + ) -> Dict[str, Any]: + return { + "kind": kind, + "conversions": conversions, + "encryptionInfo": { + "kind": "dfareporting#encryptionInfo", + "encryptionEntityType": encryption_entity_type, + "encryptionEntityId": encryption_entity_id, + "encryptionSource": encryption_source, + }, + } + + @CloudBaseHook.catch_http_exception + def conversions_batch_insert( + self, + profile_id: str, + conversions: List[Dict[str, Any]], + encryption_entity_type: str, + encryption_entity_id: int, + encryption_source: str, + max_failed_inserts: int = 0, + ) -> Any: + """ + Inserts conversions. + + :param profile_id: User profile ID associated with this request. + :type profile_id: str + :param conversions: Conversations to insert, should by type of Conversation: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions#resource + :type conversions: List[Dict[str, Any]] + :param encryption_entity_type: The encryption entity type. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_type: str + :param encryption_entity_id: The encryption entity ID. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_id: int + :param encryption_source: Describes whether the encrypted cookie was received from ad serving + (the %m macro) or from Data Transfer. + :type encryption_source: str + :param max_failed_inserts: The maximum number of conversions that failed to be inserted + :type max_failed_inserts: int + """ + response = ( + self.get_conn() # pylint: disable=no-member + .conversions() + .batchinsert( + profileId=profile_id, + body=self._conversions_batch_request( + conversions=conversions, + encryption_entity_type=encryption_entity_type, + encryption_entity_id=encryption_entity_id, + encryption_source=encryption_source, + kind="dfareporting#conversionsBatchInsertRequest", + ), + ) + .execute(num_retries=self.num_retries) + ) + if response.get('hasFailures', False): + errored_conversions = [stat['errors'] for stat in response['status'] if 'errors' in stat] + if len(errored_conversions) > max_failed_inserts: + raise AirflowException(errored_conversions) + return response + + @CloudBaseHook.catch_http_exception + def conversions_batch_update( + self, + profile_id: str, + conversions: List[Dict[str, Any]], + encryption_entity_type: str, + encryption_entity_id: int, + encryption_source: str, + max_failed_updates: int = 0, + ) -> Any: + """ + Updates existing conversions. + + :param profile_id: User profile ID associated with this request. + :type profile_id: str + :param conversions: Conversations to update, should by type of Conversation: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions#resource + :type conversions: List[Dict[str, Any]] + :param encryption_entity_type: The encryption entity type. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_type: str + :param encryption_entity_id: The encryption entity ID. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_id: int + :param encryption_source: Describes whether the encrypted cookie was received from ad serving + (the %m macro) or from Data Transfer. + :type encryption_source: str + :param max_failed_updates: The maximum number of conversions that failed to be updateed + :type max_failed_updates: int + """ + response = ( + self.get_conn() # pylint: disable=no-member + .conversions() + .batchupdate( + profileId=profile_id, + body=self._conversions_batch_request( + conversions=conversions, + encryption_entity_type=encryption_entity_type, + encryption_entity_id=encryption_entity_id, + encryption_source=encryption_source, + kind="dfareporting#conversionsBatchUpdateRequest", + ), + ) + .execute(num_retries=self.num_retries) + ) + if response.get('hasFailures', False): + errored_conversions = [stat['errors'] for stat in response['status'] if 'errors' in stat] + if len(errored_conversions) > max_failed_updates: + raise AirflowException(errored_conversions) + return response diff --git a/airflow/providers/google/marketing_platform/operators/campaign_manager.py b/airflow/providers/google/marketing_platform/operators/campaign_manager.py index 9703afc0f0472..bd76be490f606 100644 --- a/airflow/providers/google/marketing_platform/operators/campaign_manager.py +++ b/airflow/providers/google/marketing_platform/operators/campaign_manager.py @@ -20,7 +20,7 @@ """ import tempfile import uuid -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from googleapiclient import http @@ -387,3 +387,179 @@ def execute(self, context: Dict): self.xcom_push(context, key="file_id", value=file_id) self.log.info("Report file id: %s", file_id) return response + + +class GoogleCampaignManagerBatchInsertConversionsOperator(BaseOperator): + """ + Inserts conversions. + + .. seealso:: + Check official API docs: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions/batchinsert + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleCampaignManagerBatchInsertConversionsOperator` + + :param profile_id: User profile ID associated with this request. + :type profile_id: str + :param conversions: Conversations to insert, should by type of Conversation: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions#resource + :type conversions: List[Dict[str, Any]] + :param encryption_entity_type: The encryption entity type. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_type: str + :param encryption_entity_id: The encryption entity ID. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_id: int + :param encryption_source: Describes whether the encrypted cookie was received from ad serving + (the %m macro) or from Data Transfer. + :type encryption_source: str + :param max_failed_inserts: The maximum number of conversions that failed to be inserted + :type max_failed_inserts: int + :param api_version: The version of the api that will be requested for example 'v3'. + :type api_version: str + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :type delegate_to: Optional[str] + """ + + template_fields = ( + "profile_id", + "conversions", + "encryption_entity_type", + "encryption_entity_id", + "encryption_source", + ) + + @apply_defaults + def __init__( + self, + profile_id: str, + conversions: List[Dict[str, Any]], + encryption_entity_type: str, + encryption_entity_id: int, + encryption_source: str, + max_failed_inserts: int = 0, + api_version: str = "v3.3", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + *args, + **kwargs + ) -> None: + super().__init__(*args, **kwargs) + self.profile_id = profile_id + self.conversions = conversions + self.encryption_entity_type = encryption_entity_type + self.encryption_entity_id = encryption_entity_id + self.encryption_source = encryption_source + self.max_failed_inserts = max_failed_inserts + self.api_version = api_version + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context: Dict): + hook = GoogleCampaignManagerHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + response = hook.conversions_batch_insert( + profile_id=self.profile_id, + conversions=self.conversions, + encryption_entity_type=self.encryption_entity_type, + encryption_entity_id=self.encryption_entity_id, + encryption_source=self.encryption_source, + max_failed_inserts=self.max_failed_inserts + ) + return response + + +class GoogleCampaignManagerBatchUpdateConversionsOperator(BaseOperator): + """ + Updates existing conversions. + + .. seealso:: + Check official API docs: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions/batchupdate + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleCampaignManagerBatchUpdateConversionsOperator` + + :param profile_id: User profile ID associated with this request. + :type profile_id: str + :param conversions: Conversations to update, should by type of Conversation: + https://developers.google.com/doubleclick-advertisers/v3.3/conversions#resource + :type conversions: List[Dict[str, Any]] + :param encryption_entity_type: The encryption entity type. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_type: str + :param encryption_entity_id: The encryption entity ID. This should match the encryption + configuration for ad serving or Data Transfer. + :type encryption_entity_id: int + :param encryption_source: Describes whether the encrypted cookie was received from ad serving + (the %m macro) or from Data Transfer. + :type encryption_source: str + :param max_failed_updates: The maximum number of conversions that failed to be updateed + :type max_failed_updates: int + :param api_version: The version of the api that will be requested for example 'v3'. + :type api_version: str + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + request must have domain-wide delegation enabled. + :type delegate_to: Optional[str] + """ + + template_fields = ( + "profile_id", + "conversions", + "encryption_entity_type", + "encryption_entity_id", + "encryption_source", + ) + + @apply_defaults + def __init__( + self, + profile_id: str, + conversions: List[Dict[str, Any]], + encryption_entity_type: str, + encryption_entity_id: int, + encryption_source: str, + max_failed_updates: int = 0, + api_version: str = "v3.3", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + *args, + **kwargs + ) -> None: + super().__init__(*args, **kwargs) + self.profile_id = profile_id + self.conversions = conversions + self.encryption_entity_type = encryption_entity_type + self.encryption_entity_id = encryption_entity_id + self.encryption_source = encryption_source + self.max_failed_updates = max_failed_updates + self.api_version = api_version + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context: Dict): + hook = GoogleCampaignManagerHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + response = hook.conversions_batch_update( + profile_id=self.profile_id, + conversions=self.conversions, + encryption_entity_type=self.encryption_entity_type, + encryption_entity_id=self.encryption_entity_id, + encryption_source=self.encryption_source, + max_failed_updates=self.max_failed_updates + ) + return response diff --git a/docs/howto/operator/gcp/campaign_manager.rst b/docs/howto/operator/gcp/campaign_manager.rst index 685d6f42a7b23..a82de6942ee4c 100644 --- a/docs/howto/operator/gcp/campaign_manager.rst +++ b/docs/howto/operator/gcp/campaign_manager.rst @@ -125,3 +125,41 @@ You can use :ref:`Jinja templating ` with :template-fields:`airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerRunReportOperator` parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom `, which allows it to be used by other operators. + +.. _howto/operator:GoogleCampaignManagerBatchInsertConversionsOperator: + +Inserting a conversions +^^^^^^^^^^^^^^^^^^^^^^^ + +To insert Campaign Manager conversions you can use the +:class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchInsertConversionsOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py + :language: python + :dedent: 4 + :start-after: [START howto_campaign_manager_insert_conversions] + :end-before: [END howto_campaign_manager_insert_conversions] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchInsertConversionsOperator` +parameters which allows you to dynamically determine values. +The result is saved to :ref:`XCom `, which allows it to be used by other operators. + +.. _howto/operator:GoogleCampaignManagerBatchUpdateConversionsOperator: + +Updating a conversions +^^^^^^^^^^^^^^^^^^^^^^ + +To update Campaign Manager conversions you can use the +:class:`~airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchUpdateConversionsOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py + :language: python + :dedent: 4 + :start-after: [START howto_campaign_manager_update_conversions] + :end-before: [END howto_campaign_manager_update_conversions] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerBatchUpdateConversionsOperator` +parameters which allows you to dynamically determine values. +The result is saved to :ref:`XCom `, which allows it to be used by other operators. diff --git a/tests/providers/google/marketing_platform/hooks/test_campaign_manager.py b/tests/providers/google/marketing_platform/hooks/test_campaign_manager.py index 8b942118b74da..d645ccdd91701 100644 --- a/tests/providers/google/marketing_platform/hooks/test_campaign_manager.py +++ b/tests/providers/google/marketing_platform/hooks/test_campaign_manager.py @@ -23,6 +23,12 @@ API_VERSION = "v3.3" GCP_CONN_ID = "google_cloud_default" +REPORT_ID = "REPORT_ID" +PROFILE_ID = "PROFILE_ID" +ENCRYPTION_SOURCE = "encryption_source" +ENCRYPTION_ENTITY_TYPE = "encryption_entity_type" +ENCRYPTION_ENTITY_ID = 1234567 + class TestGoogleCampaignManagerHook(TestCase): def setUp(self): @@ -39,8 +45,7 @@ def setUp(self): "campaign_manager.GoogleCampaignManagerHook._authorize" ) @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.build" + "airflow.providers.google.marketing_platform.hooks.campaign_manager.build" ) def test_gen_conn(self, mock_build, mock_authorize): result = self.hook.get_conn() @@ -56,23 +61,16 @@ def test_gen_conn(self, mock_build, mock_authorize): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_delete_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" - + def test_delete_report(self, get_conn_mock): return_value = "TEST" get_conn_mock.return_value.reports.return_value.delete.return_value.execute.return_value = ( return_value ) - result = self.hook.delete_report(profile_id=profile_id, report_id=report_id) + result = self.hook.delete_report(profile_id=PROFILE_ID, report_id=REPORT_ID) get_conn_mock.return_value.reports.return_value.delete.assert_called_once_with( - profileId=profile_id, reportId=report_id + profileId=PROFILE_ID, reportId=REPORT_ID ) self.assertEqual(return_value, result) @@ -81,25 +79,19 @@ def test_delete_report(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_get_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" + def test_get_report(self, get_conn_mock): file_id = "FILE_ID" return_value = "TEST" - get_conn_mock.return_value.reports.return_value.files.return_value.get.\ - return_value.execute.return_value = return_value + get_conn_mock.return_value.reports.return_value.files.return_value.\ + get.return_value.execute.return_value = return_value result = self.hook.get_report( - profile_id=profile_id, report_id=report_id, file_id=file_id + profile_id=PROFILE_ID, report_id=REPORT_ID, file_id=file_id ) get_conn_mock.return_value.reports.return_value.files.return_value.get.assert_called_once_with( - profileId=profile_id, reportId=report_id, fileId=file_id + profileId=PROFILE_ID, reportId=REPORT_ID, fileId=file_id ) self.assertEqual(return_value, result) @@ -108,13 +100,7 @@ def test_get_report(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_get_report_file(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" + def test_get_report_file(self, get_conn_mock): file_id = "FILE_ID" return_value = "TEST" @@ -123,11 +109,11 @@ def test_get_report_file(self, mock_base_hook, get_conn_mock): ) result = self.hook.get_report_file( - profile_id=profile_id, report_id=report_id, file_id=file_id + profile_id=PROFILE_ID, report_id=REPORT_ID, file_id=file_id ) get_conn_mock.return_value.reports.return_value.files.return_value.get_media.assert_called_once_with( - profileId=profile_id, reportId=report_id, fileId=file_id + profileId=PROFILE_ID, reportId=REPORT_ID, fileId=file_id ) self.assertEqual(return_value, result) @@ -136,12 +122,7 @@ def test_get_report_file(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_insert_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" + def test_insert_report(self, get_conn_mock): report = {"body": "test"} return_value = "TEST" @@ -149,10 +130,10 @@ def test_insert_report(self, mock_base_hook, get_conn_mock): return_value ) - result = self.hook.insert_report(profile_id=profile_id, report=report) + result = self.hook.insert_report(profile_id=PROFILE_ID, report=report) get_conn_mock.return_value.reports.return_value.insert.assert_called_once_with( - profileId=profile_id, body=report + profileId=PROFILE_ID, body=report ) self.assertEqual(return_value, result) @@ -161,21 +142,17 @@ def test_insert_report(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_list_reports(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" + def test_list_reports(self, get_conn_mock): max_results = 42 scope = "SCOPE" sort_field = "SORT_FIELD" sort_order = "SORT_ORDER" - items = ['item'] + items = ["item"] return_value = {"nextPageToken": None, "items": items} - get_conn_mock.return_value.reports.return_value.list.return_value.\ - execute.return_value = return_value + get_conn_mock.return_value.reports.return_value.list.return_value.execute.return_value = ( + return_value + ) request_mock = mock.MagicMock() request_mock.execute.return_value = {"nextPageToken": None, "items": items} @@ -183,11 +160,11 @@ def test_list_reports(self, mock_base_hook, get_conn_mock): request_mock, request_mock, request_mock, - None + None, ] result = self.hook.list_reports( - profile_id=profile_id, + profile_id=PROFILE_ID, max_results=max_results, scope=scope, sort_field=sort_field, @@ -195,7 +172,7 @@ def test_list_reports(self, mock_base_hook, get_conn_mock): ) get_conn_mock.return_value.reports.return_value.list.assert_called_once_with( - profileId=profile_id, + profileId=PROFILE_ID, maxResults=max_results, scope=scope, sortField=sort_field, @@ -208,13 +185,7 @@ def test_list_reports(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_patch_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" + def test_patch_report(self, get_conn_mock): update_mask = {"test": "test"} return_value = "TEST" @@ -223,11 +194,11 @@ def test_patch_report(self, mock_base_hook, get_conn_mock): ) result = self.hook.patch_report( - profile_id=profile_id, report_id=report_id, update_mask=update_mask + profile_id=PROFILE_ID, report_id=REPORT_ID, update_mask=update_mask ) get_conn_mock.return_value.reports.return_value.patch.assert_called_once_with( - profileId=profile_id, reportId=report_id, body=update_mask + profileId=PROFILE_ID, reportId=REPORT_ID, body=update_mask ) self.assertEqual(return_value, result) @@ -236,13 +207,7 @@ def test_patch_report(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - @mock.patch( - "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" - ) - def test_run_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" + def test_run_report(self, get_conn_mock): synchronous = True return_value = "TEST" @@ -251,11 +216,11 @@ def test_run_report(self, mock_base_hook, get_conn_mock): ) result = self.hook.run_report( - profile_id=profile_id, report_id=report_id, synchronous=synchronous + profile_id=PROFILE_ID, report_id=REPORT_ID, synchronous=synchronous ) get_conn_mock.return_value.reports.return_value.run.assert_called_once_with( - profileId=profile_id, reportId=report_id, synchronous=synchronous + profileId=PROFILE_ID, reportId=REPORT_ID, synchronous=synchronous ) self.assertEqual(return_value, result) @@ -264,23 +229,94 @@ def test_run_report(self, mock_base_hook, get_conn_mock): "airflow.providers.google.marketing_platform.hooks." "campaign_manager.GoogleCampaignManagerHook.get_conn" ) + def test_update_report(self, get_conn_mock): + return_value = "TEST" + get_conn_mock.return_value.reports.return_value.update.return_value.execute.return_value = ( + return_value + ) + + result = self.hook.update_report(profile_id=PROFILE_ID, report_id=REPORT_ID) + + get_conn_mock.return_value.reports.return_value.update.assert_called_once_with( + profileId=PROFILE_ID, reportId=REPORT_ID + ) + + self.assertEqual(return_value, result) + + @mock.patch( + "airflow.providers.google.marketing_platform." + "hooks.campaign_manager.GoogleCampaignManagerHook.get_conn" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.hooks.campaign_manager.GoogleCampaignManagerHook" + "._conversions_batch_request" + ) + def test_conversion_batch_insert(self, batch_request_mock, get_conn_mock): + conversions = [{"conversions1": "value"}, {"conversions2": "value"}] + + return_value = {'hasFailures': False} + get_conn_mock.return_value.conversions.return_value.batchinsert.return_value.execute.return_value = ( + return_value + ) + + batch_request_mock.return_value = "batch_request_mock" + + result = self.hook.conversions_batch_insert( + profile_id=PROFILE_ID, + conversions=conversions, + encryption_entity_id=ENCRYPTION_ENTITY_ID, + encryption_entity_type=ENCRYPTION_ENTITY_TYPE, + encryption_source=ENCRYPTION_SOURCE, + ) + + batch_request_mock.assert_called_once_with( + conversions=conversions, + encryption_entity_id=ENCRYPTION_ENTITY_ID, + encryption_entity_type=ENCRYPTION_ENTITY_TYPE, + encryption_source=ENCRYPTION_SOURCE, + kind="dfareporting#conversionsBatchInsertRequest", + ) + get_conn_mock.return_value.conversions.return_value.batchinsert.assert_called_once_with( + profileId=PROFILE_ID, body=batch_request_mock.return_value + ) + + self.assertEqual(return_value, result) + @mock.patch( "airflow.providers.google.marketing_platform.hooks." - "campaign_manager.CloudBaseHook.__init__" + "campaign_manager.GoogleCampaignManagerHook.get_conn" ) - def test_update_report(self, mock_base_hook, get_conn_mock): - profile_id = "PROFILE_ID" - report_id = "REPORT_ID" + @mock.patch( + "airflow.providers.google.marketing_platform.hooks.campaign_manager.GoogleCampaignManagerHook" + "._conversions_batch_request" + ) + def test_conversions_batch_update(self, batch_request_mock, get_conn_mock): + conversions = [{"conversions1": "value"}, {"conversions2": "value"}] - return_value = "TEST" - get_conn_mock.return_value.reports.return_value.update.return_value.execute.return_value = ( + return_value = {'hasFailures': False} + get_conn_mock.return_value.conversions.return_value.batchupdate.return_value.execute.return_value = ( return_value ) - result = self.hook.update_report(profile_id=profile_id, report_id=report_id) + batch_request_mock.return_value = "batch_request_mock" - get_conn_mock.return_value.reports.return_value.update.assert_called_once_with( - profileId=profile_id, reportId=report_id + result = self.hook.conversions_batch_update( + profile_id=PROFILE_ID, + conversions=conversions, + encryption_entity_id=ENCRYPTION_ENTITY_ID, + encryption_entity_type=ENCRYPTION_ENTITY_TYPE, + encryption_source=ENCRYPTION_SOURCE, + ) + + batch_request_mock.assert_called_once_with( + conversions=conversions, + encryption_entity_id=ENCRYPTION_ENTITY_ID, + encryption_entity_type=ENCRYPTION_ENTITY_TYPE, + encryption_source=ENCRYPTION_SOURCE, + kind="dfareporting#conversionsBatchUpdateRequest", + ) + get_conn_mock.return_value.conversions.return_value.batchupdate.assert_called_once_with( + profileId=PROFILE_ID, body=batch_request_mock.return_value ) self.assertEqual(return_value, result) diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py index 5addcd2bc818b..bc2bd899bce4e 100644 --- a/tests/providers/google/marketing_platform/operators/test_campaign_manager.py +++ b/tests/providers/google/marketing_platform/operators/test_campaign_manager.py @@ -18,6 +18,7 @@ from unittest import TestCase, mock from airflow.providers.google.marketing_platform.operators.campaign_manager import ( + GoogleCampaignManagerBatchInsertConversionsOperator, GoogleCampaignManagerBatchUpdateConversionsOperator, GoogleCampaignManagerDeleteReportOperator, GoogleCampaignManagerDownloadReportOperator, GoogleCampaignManagerInsertReportOperator, GoogleCampaignManagerRunReportOperator, ) @@ -25,6 +26,21 @@ API_VERSION = "api_version" GCP_CONN_ID = "google_cloud_default" +CONVERSION = { + "kind": "dfareporting#conversion", + "floodlightActivityId": 1234, + "floodlightConfigurationId": 1234, + "gclid": "971nc2849184c1914019v1c34c14", + "ordinal": "0", + "customVariables": [ + { + "kind": "dfareporting#customFloodlightVariable", + "type": "U10", + "value": "value", + } + ], +} + class TestGoogleCampaignManagerDeleteReportOperator(TestCase): @mock.patch( @@ -204,3 +220,63 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock): profile_id=profile_id, report_id=report_id, synchronous=synchronous ) xcom_mock.assert_called_once_with(None, key="file_id", value=file_id) + + +class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase): + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "campaign_manager.GoogleCampaignManagerHook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "campaign_manager.BaseOperator" + ) + def test_execute(self, mock_base_op, hook_mock): + profile_id = "PROFILE_ID" + op = GoogleCampaignManagerBatchInsertConversionsOperator( + task_id="insert_conversion", + profile_id=profile_id, + conversions=[CONVERSION], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=123456789, + ) + op.execute(None) + hook_mock.return_value.conversions_batch_insert.assert_called_once_with( + profile_id=profile_id, + conversions=[CONVERSION], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=123456789, + max_failed_inserts=0, + ) + + +class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase): + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "campaign_manager.GoogleCampaignManagerHook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "campaign_manager.BaseOperator" + ) + def test_execute(self, mock_base_op, hook_mock): + profile_id = "PROFILE_ID" + op = GoogleCampaignManagerBatchUpdateConversionsOperator( + task_id="update_conversion", + profile_id=profile_id, + conversions=[CONVERSION], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=123456789, + ) + op.execute(None) + hook_mock.return_value.conversions_batch_update.assert_called_once_with( + profile_id=profile_id, + conversions=[CONVERSION], + encryption_source="AD_SERVING", + encryption_entity_type="DCM_ADVERTISER", + encryption_entity_id=123456789, + max_failed_updates=0, + ) diff --git a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py b/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py index adb4817c3cc4b..1de39a840b00f 100644 --- a/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py +++ b/tests/providers/google/marketing_platform/operators/test_campaign_manager_system.py @@ -20,7 +20,7 @@ from tests.providers.google.marketing_platform.operators.test_campaign_manager_system_helper import ( GoogleCampaignManagerTestHelper, ) -from tests.test_utils.gcp_system_helpers import provide_gcp_context, skip_gcp_system +from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, provide_gcp_context, skip_gcp_system from tests.test_utils.system_tests_class import SystemTest # Required scopes @@ -47,4 +47,4 @@ def tearDown(self): @provide_gcp_context(GOOGLE_CAMPAIGN_MANAGER_KEY, scopes=SCOPES) def test_run_example_dag(self): - self.run_dag('example_campaign_manager', "airflow/providers/google/marketing_platform/example_dags") + self.run_dag('example_campaign_manager', MARKETING_DAG_FOLDER) diff --git a/tests/test_utils/gcp_system_helpers.py b/tests/test_utils/gcp_system_helpers.py index df1fec96825e9..cee79c696b9ef 100644 --- a/tests/test_utils/gcp_system_helpers.py +++ b/tests/test_utils/gcp_system_helpers.py @@ -27,6 +27,9 @@ CLOUD_DAG_FOLDER = os.path.join( AIRFLOW_MAIN_FOLDER, "airflow", "providers", "google", "cloud", "example_dags" ) +MARKETING_DAG_FOLDER = os.path.join( + AIRFLOW_MAIN_FOLDER, "airflow", "providers", "google", "marketing_platform", "example_dags" +) POSTGRES_LOCAL_EXECUTOR = os.path.join( AIRFLOW_MAIN_FOLDER, "tests", "test_utils", "postgres_local_executor.cfg" )