From 6c25a283aa0b050d728ce8bb4d53e6ba8d7f4179 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 23 Aug 2021 17:26:44 +0530 Subject: [PATCH 01/18] added code change to respect batch size for 1st page sync --- tap_exacttarget/client.py | 18 +- tap_exacttarget/endpoints/campaigns.py | 7 +- tap_exacttarget/endpoints/content_areas.py | 3 +- tap_exacttarget/endpoints/data_extensions.py | 6 +- tap_exacttarget/endpoints/emails.py | 3 +- tap_exacttarget/endpoints/events.py | 3 +- tap_exacttarget/endpoints/folders.py | 3 +- tap_exacttarget/endpoints/list_sends.py | 3 +- tap_exacttarget/endpoints/list_subscribers.py | 7 +- tap_exacttarget/endpoints/lists.py | 3 +- tap_exacttarget/endpoints/sends.py | 3 +- tap_exacttarget/endpoints/subscribers.py | 3 +- tests/base.py | 177 ++++++++++++++++++ tests/test_exacttarget_base.py | 95 ---------- tests/test_exacttarget_discover.py | 101 ---------- tests/test_exacttarget_pagination.py | 52 +++++ 16 files changed, 273 insertions(+), 214 deletions(-) create mode 100644 tests/base.py delete mode 100644 tests/test_exacttarget_base.py delete mode 100644 tests/test_exacttarget_discover.py create mode 100644 tests/test_exacttarget_pagination.py diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index 701985c..e892724 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -107,6 +107,8 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz """ cursor = selector() cursor.auth_stub = auth_stub + # set batch size + cursor.options = {"BatchSize": batch_size} if props is not None: cursor.props = props @@ -119,6 +121,8 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz .format(name, search_filter)) else: + cursor.search_filter = {} + LOGGER.info( "Making RETRIEVE call to '{}' endpoint with no filters." .format(name)) @@ -148,10 +152,16 @@ def request_from_cursor(name, cursor, batch_size): while response.more_results: LOGGER.info("Getting more results from '{}' endpoint".format(name)) - # Override call to getMoreResults to add a batch_size parameter - # response = cursor.getMoreResults() - response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) - LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) + if isinstance(cursor, FuelSDK.ET_Campaign): + # use 'getMoreResults' for campaigns as it does not use + # batch_size, rather it uses $page and $pageSize and REST Call + response = cursor.getMoreResults() + LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results.get('items')), name)) + else: + # Override call to getMoreResults to add a batch_size parameter + # response = cursor.getMoreResults() + response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) + LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) if not response.status: raise RuntimeError("Request failed with '{}'" diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index 6c3d264..12d642f 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -38,10 +38,15 @@ class CampaignDataAccessObject(DataAccessObject): KEY_PROPERTIES = ['id'] def sync_data(self): + batch_size = int(self.config.get('batch_size', 2500)) + cursor = request( 'Campaign', FuelSDK.ET_Campaign, - self.auth_stub) + self.auth_stub, + # use $pageSize and $page in the props for + # this stream as it calls using REST API + props={"$pageSize": batch_size, "$page": 1, "page": 1}) for campaign in cursor: campaign = self.filter_keys_and_parse(campaign) diff --git a/tap_exacttarget/endpoints/content_areas.py b/tap_exacttarget/endpoints/content_areas.py index e44f6a9..8c79844 100644 --- a/tap_exacttarget/endpoints/content_areas.py +++ b/tap_exacttarget/endpoints/content_areas.py @@ -122,7 +122,8 @@ def sync_data(self): stream = request('ContentAreaDataAccessObject', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for content_area in stream: content_area = self.filter_keys_and_parse(content_area) diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index 8479585..6636275 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -94,7 +94,8 @@ def _get_fields(self, extensions): result = request( 'DataExtensionField', FuelSDK.ET_DataExtension_Column, - self.auth_stub) + self.auth_stub, + batch_size=int(self.config.get('batch_size', 2500))) for field in result: extension_id = field.DataExtension.CustomerKey @@ -265,7 +266,8 @@ def sync_data(self): 'SimpleOperator': 'equals', 'Value': customer_key, }, - props=['CustomerKey', 'CategoryID']) + props=['CustomerKey', 'CategoryID'], + batch_size=int(self.config.get('batch_size', 2500))) parent_extension = next(parent_result) parent_category_id = parent_extension.CategoryID diff --git a/tap_exacttarget/endpoints/emails.py b/tap_exacttarget/endpoints/emails.py index 4c0c089..2a906fc 100644 --- a/tap_exacttarget/endpoints/emails.py +++ b/tap_exacttarget/endpoints/emails.py @@ -137,7 +137,8 @@ def sync_data(self): stream = request('Email', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for email in stream: email = self.filter_keys_and_parse(email) diff --git a/tap_exacttarget/endpoints/events.py b/tap_exacttarget/endpoints/events.py index 8ed64d5..86d1d14 100644 --- a/tap_exacttarget/endpoints/events.py +++ b/tap_exacttarget/endpoints/events.py @@ -85,7 +85,8 @@ def sync_data(self): stream = request(event_name, selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for event in stream: event = self.filter_keys_and_parse(event) diff --git a/tap_exacttarget/endpoints/folders.py b/tap_exacttarget/endpoints/folders.py index e52247a..49a8ce4 100644 --- a/tap_exacttarget/endpoints/folders.py +++ b/tap_exacttarget/endpoints/folders.py @@ -78,7 +78,8 @@ def sync_data(self): stream = request('Folder', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for folder in stream: folder = self.filter_keys_and_parse(folder) diff --git a/tap_exacttarget/endpoints/list_sends.py b/tap_exacttarget/endpoints/list_sends.py index 136697a..0714d3c 100644 --- a/tap_exacttarget/endpoints/list_sends.py +++ b/tap_exacttarget/endpoints/list_sends.py @@ -119,7 +119,8 @@ def sync_data(self): stream = request('ListSend', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for list_send in stream: list_send = self.filter_keys_and_parse(list_send) diff --git a/tap_exacttarget/endpoints/list_subscribers.py b/tap_exacttarget/endpoints/list_subscribers.py index 5a7d23f..3405c88 100644 --- a/tap_exacttarget/endpoints/list_subscribers.py +++ b/tap_exacttarget/endpoints/list_subscribers.py @@ -71,7 +71,7 @@ def _get_all_subscribers_list(self): 'Property': 'ListName', 'SimpleOperator': 'equals', 'Value': 'All Subscribers', - }) + }, batch_size=int(self.config.get('batch_size', 2500))) lists = list(result) @@ -98,7 +98,7 @@ def sync_data(self): pagination_unit = self.config.get( 'pagination__list_subscriber_interval_unit', 'days') pagination_quantity = self.config.get( - 'pagination__list_subsctiber_interval_quantity', 1) + 'pagination__list_subscriber_interval_quantity', 1) unit = {pagination_unit: int(pagination_quantity)} @@ -112,7 +112,8 @@ def sync_data(self): self.auth_stub, _get_list_subscriber_filter( all_subscribers_list, - start, unit)) + start, unit), + batch_size=int(self.config.get('batch_size', 2500))) batch_size = 100 diff --git a/tap_exacttarget/endpoints/lists.py b/tap_exacttarget/endpoints/lists.py index b9f2dbf..2dd139e 100644 --- a/tap_exacttarget/endpoints/lists.py +++ b/tap_exacttarget/endpoints/lists.py @@ -67,7 +67,8 @@ def sync_data(self): stream = request('List', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for _list in stream: _list = self.filter_keys_and_parse(_list) diff --git a/tap_exacttarget/endpoints/sends.py b/tap_exacttarget/endpoints/sends.py index 5129197..e895f67 100644 --- a/tap_exacttarget/endpoints/sends.py +++ b/tap_exacttarget/endpoints/sends.py @@ -106,7 +106,8 @@ def sync_data(self): stream = request('Send', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=int(self.config.get('batch_size', 2500))) for send in stream: send = self.filter_keys_and_parse(send) diff --git a/tap_exacttarget/endpoints/subscribers.py b/tap_exacttarget/endpoints/subscribers.py index 9a846a2..40deea5 100644 --- a/tap_exacttarget/endpoints/subscribers.py +++ b/tap_exacttarget/endpoints/subscribers.py @@ -149,7 +149,8 @@ def pull_subscribers_batch(self, subscriber_keys): return stream = request( - 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter) + 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, + batch_size=int(self.config.get('batch_size', 2500))) for subscriber in stream: subscriber = self.filter_keys_and_parse(subscriber) diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..b91c186 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,177 @@ +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner +import os +import unittest +from datetime import datetime as dt +import time + +class ExactTargetBase(unittest.TestCase): + START_DATE = "" + DATETIME_FMT = { + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%fZ" + } + + def name(self): + return "tap_tester_exacttarget_base" + + def tap_name(self): + return "tap-exacttarget" + + def setUp(self): + required_env = { + "TAP_EXACTTARGET_CLIENT_ID", + "TAP_EXACTTARGET_CLIENT_SECRET", + "TAP_EXACTTARGET_TENANT_SUBDOMAIN", + "TAP_EXACTTARGET_V2_CLIENT_ID", + "TAP_EXACTTARGET_V2_CLIENT_SECRET", + "TAP_EXACTTARGET_V2_TENANT_SUBDOMAIN", + } + missing_envs = [v for v in required_env if not os.getenv(v)] + if missing_envs: + raise Exception("set " + ", ".join(missing_envs)) + + def get_type(self): + return "platform.exacttarget" + + def get_credentials(self): + return { + 'client_secret': os.getenv('TAP_EXACTTARGET_CLIENT_SECRET') + } + + def get_properties(self, original: bool = True): + return_value = { + 'start_date': '2014-01-01T00:00:00Z', + 'client_id': os.getenv('TAP_EXACTTARGET_CLIENT_ID'), + 'tenant_subdomain': os.getenv('TAP_EXACTTARGET_TENANT_SUBDOMAIN') + } + if original: + return return_value + + # Reassign start date + return_value["start_date"] = self.START_DATE + return return_value + + def expected_metadata(self): + # Note: Custom streams failed on our account with an error on + # `_CustomObjectKey` not being valid + return { + "campaign": { + "pk": {"id"}, + "replication": "FULL" + }, + "content_area":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "email":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "event": { + "pk": {"SendID", "EventType", "SubscriberKey", "EventDate"}, + "replication": "INCREMENTAL", + "replication-key": {"EventDate"}, + }, + "folder":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "list":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "list_send":{ + "pk": {"ListID", "SendID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "list_subscriber":{ + "pk": {"SubscriberKey", "ListID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "send":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + }, + "subscriber":{ + "pk": {"ID"}, + "replication": "INCREMENTAL", + "replication-key": {"ModifiedDate"}, + } + } + + def streams_to_select(self): + return set(self.expected_metadata().keys()) - {'event', 'list_subscriber', 'subscriber', 'list_send'} + + def expected_replication_keys(self): + return {table: properties.get("replication-key", set()) + for table, properties + in self.expected_metadata().items()} + + def expected_primary_keys(self): + return {table: properties.get("pk", set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_method(self): + return {table: properties.get("replication", set()) + for table, properties + in self.expected_metadata().items()} + + def select_found_catalogs(self, conn_id, catalogs, only_streams=None, deselect_all_fields: bool = False, non_selected_props=[]): + """Select all streams and all fields within streams""" + for catalog in catalogs: + if only_streams and catalog["tap_stream_id"] not in only_streams: + continue + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + + non_selected_properties = non_selected_props if not deselect_all_fields else [] + if deselect_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}) + non_selected_properties = non_selected_properties.keys() + additional_md = [] + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, additional_md=additional_md, + non_selected_fields=non_selected_properties + ) + + def run_and_verify_sync(self, conn_id): + sync_job_name = runner.run_sync_mode(self, conn_id) + + # verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.streams_to_select(), self.expected_primary_keys()) + + self.assertGreater( + sum(sync_record_count.values()), 0, + msg="failed to replicate any data: {}".format(sync_record_count) + ) + print("total replicated row count: {}".format(sum(sync_record_count.values()))) + + return sync_record_count + + def dt_to_ts(self, dtime): + for date_format in self.DATETIME_FMT: + try: + date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple())) + return date_stripped + except ValueError: + continue + + def is_incremental(self, stream): + return self.expected_metadata()[stream]["replication"] == "INCREMENTAL" diff --git a/tests/test_exacttarget_base.py b/tests/test_exacttarget_base.py deleted file mode 100644 index 3c70772..0000000 --- a/tests/test_exacttarget_base.py +++ /dev/null @@ -1,95 +0,0 @@ -from tap_tester.scenario import SCENARIOS - -import datetime -import tap_tester.connections as connections -import tap_tester.menagerie as menagerie -import tap_tester.runner as runner -import os -import unittest -import pdb -import json -import requests - - -class ExactTargetBase(unittest.TestCase): - - def name(self): - return "tap_tester_exacttarget_base" - - def tap_name(self): - return "tap-exacttarget" - - def setUp(self): - required_env = { - "client_id": "TAP_EXACTTARGET_CLIENT_ID", - "client_secret": "TAP_EXACTTARGET_CLIENT_SECRET", - } - missing_envs = [v for v in required_env.values() if not os.getenv(v)] - if missing_envs: - raise Exception("set " + ", ".join(missing_envs)) - - def get_type(self): - return "platform.exacttarget" - - def get_credentials(self): - return { - 'client_secret': os.getenv('TAP_EXACTTARGET_CLIENT_SECRET') - } - - def get_properties(self): - yesterday = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1) - return { - 'start_date': yesterday.strftime("%Y-%m-%dT%H:%M:%SZ"), - 'client_id': os.getenv('TAP_EXACTTARGET_CLIENT_ID') - } - - def streams_to_select(self): - # Note: Custom streams failed on our account with an error on - # `_CustomObjectKey` not being valid - return ["campaign", - "content_area", - "email", - "event", - "folder", - "list", - "list_send", - "list_subscriber", - "send", - "subscriber"] - - def select_found_catalogs(self, conn_id, found_catalogs, only_streams=None): - selected = [] - for catalog in found_catalogs: - if only_streams and catalog["tap_stream_id"] not in only_streams: - continue - schema = menagerie.select_catalog(conn_id, catalog) - - selected.append({ - "key_properties": catalog.get("key_properties"), - "schema": schema, - "tap_stream_id": catalog.get("tap_stream_id"), - "replication_method": catalog.get("replication_method"), - "replication_key": catalog.get("replication_key"), - }) - - for catalog_entry in selected: - connections.select_catalog_and_fields_via_metadata( - conn_id, - catalog_entry, - {"annotated-schema": catalog_entry['schema']} - ) - - def test_run(self): - conn_id = connections.ensure_connection(self) - runner.run_check_mode(self, conn_id) - - found_catalogs = menagerie.get_catalogs(conn_id) - self.select_found_catalogs(conn_id, found_catalogs, only_streams=self.streams_to_select()) - sync_job_name = runner.run_sync_mode(self, conn_id) - - # verify tap and target exit codes - exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) - - -SCENARIOS.add(ExactTargetBase) diff --git a/tests/test_exacttarget_discover.py b/tests/test_exacttarget_discover.py deleted file mode 100644 index b183e17..0000000 --- a/tests/test_exacttarget_discover.py +++ /dev/null @@ -1,101 +0,0 @@ -import datetime -import tap_tester.connections as connections -import tap_tester.menagerie as menagerie -import tap_tester.runner as runner -import os -import unittest -import pdb -import json -import requests - - -class ExactTargetDiscover(unittest.TestCase): - - def name(self): - return "tap_tester_exacttarget_discover_v1" - - def tap_name(self): - return "tap-exacttarget" - - def setUp(self): - required_env = { - "TAP_EXACTTARGET_CLIENT_ID", - "TAP_EXACTTARGET_CLIENT_SECRET", - "TAP_EXACTTARGET_TENANT_SUBDOMAIN", - "TAP_EXACTTARGET_V2_CLIENT_ID", - "TAP_EXACTTARGET_V2_CLIENT_SECRET", - "TAP_EXACTTARGET_V2_TENANT_SUBDOMAIN", - } - missing_envs = [v for v in required_env if not os.getenv(v)] - if missing_envs: - raise Exception("set " + ", ".join(missing_envs)) - - def get_type(self): - return "platform.exacttarget" - - def get_credentials(self): - return { - 'client_secret': os.getenv('TAP_EXACTTARGET_CLIENT_SECRET') - } - - def get_properties(self): - yesterday = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1) - return { - 'start_date': yesterday.strftime("%Y-%m-%dT%H:%M:%SZ"), - 'client_id': os.getenv('TAP_EXACTTARGET_CLIENT_ID') - } - - def test_run(self): - conn_id = connections.ensure_connection(self) - runner.run_check_mode(self, conn_id) - - found_catalog = menagerie.get_catalog(conn_id) - for catalog_entry in found_catalog['streams']: - field_names_in_schema = set([ k for k in catalog_entry['schema']['properties'].keys()]) - field_names_in_breadcrumbs = set([x['breadcrumb'][1] for x in catalog_entry['metadata'] if len(x['breadcrumb']) == 2]) - self.assertEqual(field_names_in_schema, field_names_in_breadcrumbs) - - inclusions_set = set([(x['breadcrumb'][1], x['metadata']['inclusion']) - for x in catalog_entry['metadata'] - if len(x['breadcrumb']) == 2]) - # Validate that all fields are in metadata - self.assertEqual(len(inclusions_set), len(field_names_in_schema)) - self.assertEqual(set([i[0] for i in inclusions_set]), field_names_in_schema) - # Validate that all metadata['inclusion'] are 'available' - unique_inclusions = set([i[1] for i in inclusions_set]) - self.assertTrue(len(unique_inclusions) == 1 and 'available' in unique_inclusions) - -class ExactTargetDiscover2(ExactTargetDiscover): - def name(self): - return "tap_tester_exacttarget_discover_v1_with_subdomain" - - def get_credentials(self): - return { - 'client_secret': os.getenv('TAP_EXACTTARGET_CLIENT_SECRET') - } - - def get_properties(self): - yesterday = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1) - return { - 'start_date': yesterday.strftime("%Y-%m-%dT%H:%M:%SZ"), - 'client_id': os.getenv('TAP_EXACTTARGET_CLIENT_ID'), - 'tenant_subdomain': os.getenv('TAP_EXACTTARGET_TENANT_SUBDOMAIN') - } - - -class ExactTargetDiscover3(ExactTargetDiscover): - def name(self): - return "tap_tester_exacttarget_discover_v2_with_subdomain" - - def get_credentials(self): - return { - 'client_secret': os.getenv('TAP_EXACTTARGET_V2_CLIENT_SECRET') - } - - def get_properties(self): - yesterday = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1) - return { - 'start_date': yesterday.strftime("%Y-%m-%dT%H:%M:%SZ"), - 'client_id': os.getenv('TAP_EXACTTARGET_V2_CLIENT_ID'), - 'tenant_subdomain': os.getenv('TAP_EXACTTARGET_V2_TENANT_SUBDOMAIN') - } diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py new file mode 100644 index 0000000..c3f3da4 --- /dev/null +++ b/tests/test_exacttarget_pagination.py @@ -0,0 +1,52 @@ +from base import ExactTargetBase +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner + +class ExactTargetPagination(ExactTargetBase): + + def name(self): + return "tap_tester_exacttarget_pagination" + + def get_properties(self, *args, **kwargs): + props = super().get_properties(*args, **kwargs) + props['batch_size'] = '1' + return props + + def test_run(self): + page_size = 1 + expected_streams = self.streams_to_select() + + conn_id = connections.ensure_connection(self) + runner.run_check_mode(self, conn_id) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.select_found_catalogs(conn_id, found_catalogs, only_streams=expected_streams) + + sync_record_count = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + # expected values + expected_primary_keys = self.expected_primary_keys() + + # collect information for assertions from sync based on expected values + record_count_sync = sync_record_count.get(stream, 0) + primary_keys_list = [(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # verify records are more than page size so multiple page is working + self.assertGreater(record_count_sync, page_size) + + if record_count_sync > page_size: + primary_keys_list_1 = primary_keys_list[:page_size] + primary_keys_list_2 = primary_keys_list[page_size:2*page_size] + + primary_keys_page_1 = set(primary_keys_list_1) + primary_keys_page_2 = set(primary_keys_list_2) + + # Verify by private keys that data is unique for page + self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2)) From 623d248099da8a1a35334a4c09db3c072da4c292 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 27 Aug 2021 14:52:04 +0530 Subject: [PATCH 02/18] updated code according to comment --- tap_exacttarget/dao.py | 1 + tap_exacttarget/endpoints/campaigns.py | 2 +- tap_exacttarget/endpoints/content_areas.py | 2 +- tap_exacttarget/endpoints/data_extensions.py | 9 ++++----- tap_exacttarget/endpoints/emails.py | 2 +- tap_exacttarget/endpoints/events.py | 2 +- tap_exacttarget/endpoints/folders.py | 2 +- tap_exacttarget/endpoints/list_sends.py | 2 +- tap_exacttarget/endpoints/list_subscribers.py | 4 ++-- tap_exacttarget/endpoints/lists.py | 2 +- tap_exacttarget/endpoints/sends.py | 2 +- tap_exacttarget/endpoints/subscribers.py | 2 +- 12 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tap_exacttarget/dao.py b/tap_exacttarget/dao.py index ec7224d..27e917f 100644 --- a/tap_exacttarget/dao.py +++ b/tap_exacttarget/dao.py @@ -19,6 +19,7 @@ def __init__(self, config, state, auth_stub, catalog): self.state = state.copy() self.catalog = catalog self.auth_stub = auth_stub + self.batch_size = int(self.config.get('batch_size', 2500)) @classmethod def matches_catalog(cls, catalog): diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index 12d642f..a29a54f 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -46,7 +46,7 @@ def sync_data(self): self.auth_stub, # use $pageSize and $page in the props for # this stream as it calls using REST API - props={"$pageSize": batch_size, "$page": 1, "page": 1}) + props={"$pageSize": self.batch_size, "$page": 1, "page": 1}) for campaign in cursor: campaign = self.filter_keys_and_parse(campaign) diff --git a/tap_exacttarget/endpoints/content_areas.py b/tap_exacttarget/endpoints/content_areas.py index 8c79844..ec55074 100644 --- a/tap_exacttarget/endpoints/content_areas.py +++ b/tap_exacttarget/endpoints/content_areas.py @@ -123,7 +123,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for content_area in stream: content_area = self.filter_keys_and_parse(content_area) diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index 6636275..268d88e 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -50,7 +50,7 @@ def _get_extensions(self): FuelSDK.ET_DataExtension, self.auth_stub, props=['CustomerKey', 'Name'], - batch_size=int(self.config.get('batch_size', 2500)) + batch_size=self.batch_size ) to_return = {} @@ -95,7 +95,7 @@ def _get_fields(self, extensions): 'DataExtensionField', FuelSDK.ET_DataExtension_Column, self.auth_stub, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for field in result: extension_id = field.DataExtension.CustomerKey @@ -203,9 +203,8 @@ def _replicate(self, customer_key, keys, start, unit) - batch_size = int(self.config.get('batch_size', 2500)) result = request_from_cursor('DataExtensionObject', cursor, - batch_size=batch_size) + batch_size=self.batch_size) for row in result: row = self.filter_keys_and_parse(row) @@ -267,7 +266,7 @@ def sync_data(self): 'Value': customer_key, }, props=['CustomerKey', 'CategoryID'], - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) parent_extension = next(parent_result) parent_category_id = parent_extension.CategoryID diff --git a/tap_exacttarget/endpoints/emails.py b/tap_exacttarget/endpoints/emails.py index 2a906fc..487f871 100644 --- a/tap_exacttarget/endpoints/emails.py +++ b/tap_exacttarget/endpoints/emails.py @@ -138,7 +138,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for email in stream: email = self.filter_keys_and_parse(email) diff --git a/tap_exacttarget/endpoints/events.py b/tap_exacttarget/endpoints/events.py index 86d1d14..ab022ab 100644 --- a/tap_exacttarget/endpoints/events.py +++ b/tap_exacttarget/endpoints/events.py @@ -86,7 +86,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for event in stream: event = self.filter_keys_and_parse(event) diff --git a/tap_exacttarget/endpoints/folders.py b/tap_exacttarget/endpoints/folders.py index 49a8ce4..e038c3a 100644 --- a/tap_exacttarget/endpoints/folders.py +++ b/tap_exacttarget/endpoints/folders.py @@ -79,7 +79,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for folder in stream: folder = self.filter_keys_and_parse(folder) diff --git a/tap_exacttarget/endpoints/list_sends.py b/tap_exacttarget/endpoints/list_sends.py index 0714d3c..4487891 100644 --- a/tap_exacttarget/endpoints/list_sends.py +++ b/tap_exacttarget/endpoints/list_sends.py @@ -120,7 +120,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for list_send in stream: list_send = self.filter_keys_and_parse(list_send) diff --git a/tap_exacttarget/endpoints/list_subscribers.py b/tap_exacttarget/endpoints/list_subscribers.py index 3405c88..b6d71a5 100644 --- a/tap_exacttarget/endpoints/list_subscribers.py +++ b/tap_exacttarget/endpoints/list_subscribers.py @@ -71,7 +71,7 @@ def _get_all_subscribers_list(self): 'Property': 'ListName', 'SimpleOperator': 'equals', 'Value': 'All Subscribers', - }, batch_size=int(self.config.get('batch_size', 2500))) + }, batch_size=self.batch_size) lists = list(result) @@ -113,7 +113,7 @@ def sync_data(self): _get_list_subscriber_filter( all_subscribers_list, start, unit), - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) batch_size = 100 diff --git a/tap_exacttarget/endpoints/lists.py b/tap_exacttarget/endpoints/lists.py index 2dd139e..85f3785 100644 --- a/tap_exacttarget/endpoints/lists.py +++ b/tap_exacttarget/endpoints/lists.py @@ -68,7 +68,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for _list in stream: _list = self.filter_keys_and_parse(_list) diff --git a/tap_exacttarget/endpoints/sends.py b/tap_exacttarget/endpoints/sends.py index e895f67..1ed7a97 100644 --- a/tap_exacttarget/endpoints/sends.py +++ b/tap_exacttarget/endpoints/sends.py @@ -107,7 +107,7 @@ def sync_data(self): selector, self.auth_stub, search_filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for send in stream: send = self.filter_keys_and_parse(send) diff --git a/tap_exacttarget/endpoints/subscribers.py b/tap_exacttarget/endpoints/subscribers.py index 40deea5..9f92167 100644 --- a/tap_exacttarget/endpoints/subscribers.py +++ b/tap_exacttarget/endpoints/subscribers.py @@ -150,7 +150,7 @@ def pull_subscribers_batch(self, subscriber_keys): stream = request( 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, - batch_size=int(self.config.get('batch_size', 2500))) + batch_size=self.batch_size) for subscriber in stream: subscriber = self.filter_keys_and_parse(subscriber) From 50689ba8512c484588391d6b76ca5321ffbbaebe Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 27 Aug 2021 14:57:23 +0530 Subject: [PATCH 03/18] resolve pylint error --- tap_exacttarget/endpoints/campaigns.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index a29a54f..c27a975 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -38,7 +38,6 @@ class CampaignDataAccessObject(DataAccessObject): KEY_PROPERTIES = ['id'] def sync_data(self): - batch_size = int(self.config.get('batch_size', 2500)) cursor = request( 'Campaign', From a71197c6f66e00a5b936fdd8b6e100244e99a4e4 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 1 Sep 2021 16:05:34 +0530 Subject: [PATCH 04/18] updated code based on comments --- tests/base.py | 71 +++++++++++++++------------- tests/test_exacttarget_pagination.py | 2 +- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/tests/base.py b/tests/base.py index b91c186..ea0da77 100644 --- a/tests/base.py +++ b/tests/base.py @@ -13,6 +13,11 @@ class ExactTargetBase(unittest.TestCase): "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S.%fZ" } + PRIMARY_KEYS = "table-key-properties" + REPLICATION_METHOD = "forced-replication-method" + REPLICATION_KEYS = "valid-replication-keys" + FULL_TABLE = "FULL_TABLE" + INCREMENTAL = "INCREMENTAL" def name(self): return "tap_tester_exacttarget_base" @@ -59,53 +64,53 @@ def expected_metadata(self): # `_CustomObjectKey` not being valid return { "campaign": { - "pk": {"id"}, - "replication": "FULL" + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE }, "content_area":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "email":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "event": { - "pk": {"SendID", "EventType", "SubscriberKey", "EventDate"}, - "replication": "INCREMENTAL", - "replication-key": {"EventDate"}, + self.PRIMARY_KEYS: {"SendID", "EventType", "SubscriberKey", "EventDate"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"EventDate"}, }, "folder":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "list":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "list_send":{ - "pk": {"ListID", "SendID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ListID", "SendID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "list_subscriber":{ - "pk": {"SubscriberKey", "ListID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"SubscriberKey", "ListID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "send":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, }, "subscriber":{ - "pk": {"ID"}, - "replication": "INCREMENTAL", - "replication-key": {"ModifiedDate"}, + self.PRIMARY_KEYS: {"ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"ModifiedDate"}, } } @@ -113,17 +118,17 @@ def streams_to_select(self): return set(self.expected_metadata().keys()) - {'event', 'list_subscriber', 'subscriber', 'list_send'} def expected_replication_keys(self): - return {table: properties.get("replication-key", set()) + return {table: properties.get(self.REPLICATION_KEYS, set()) for table, properties in self.expected_metadata().items()} def expected_primary_keys(self): - return {table: properties.get("pk", set()) + return {table: properties.get(self.PRIMARY_KEYS, set()) for table, properties in self.expected_metadata().items()} def expected_replication_method(self): - return {table: properties.get("replication", set()) + return {table: properties.get(self.REPLICATION_METHOD, set()) for table, properties in self.expected_metadata().items()} @@ -174,4 +179,4 @@ def dt_to_ts(self, dtime): continue def is_incremental(self, stream): - return self.expected_metadata()[stream]["replication"] == "INCREMENTAL" + return self.expected_metadata()[stream][self.REPLICATION_METHOD] == self.INCREMENTAL diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py index c3f3da4..0db8302 100644 --- a/tests/test_exacttarget_pagination.py +++ b/tests/test_exacttarget_pagination.py @@ -34,7 +34,7 @@ def test_run(self): # collect information for assertions from sync based on expected values record_count_sync = sync_record_count.get(stream, 0) - primary_keys_list = [(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys]) for message in synced_records.get(stream).get('messages') if message.get('action') == 'upsert'] From 3e4c22c20dd78638f81e591d74bef0f59f4a701b Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 1 Sep 2021 16:39:53 +0530 Subject: [PATCH 05/18] resolve test cases failure --- tests/test_exacttarget_pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py index 0db8302..74c5841 100644 --- a/tests/test_exacttarget_pagination.py +++ b/tests/test_exacttarget_pagination.py @@ -30,7 +30,7 @@ def test_run(self): for stream in expected_streams: with self.subTest(stream=stream): # expected values - expected_primary_keys = self.expected_primary_keys() + expected_primary_keys = self.expected_primary_keys()[stream] # collect information for assertions from sync based on expected values record_count_sync = sync_record_count.get(stream, 0) From f069f960aaa96bf494495c51cd6746f36e1880c3 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 2 Sep 2021 14:27:27 +0530 Subject: [PATCH 06/18] added data extension stream in the test --- tests/base.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/base.py b/tests/base.py index ea0da77..8d9f895 100644 --- a/tests/base.py +++ b/tests/base.py @@ -72,6 +72,18 @@ def expected_metadata(self): self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"ModifiedDate"}, }, + "data_extension.test emails":{ + self.PRIMARY_KEYS: {"_CustomObjectKey", "ID"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + "data_extension.This is a test":{ + self.PRIMARY_KEYS: {"_CustomObjectKey", "ID"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + "data_extension.my_test":{ + self.PRIMARY_KEYS: {"_CustomObjectKey", "ID"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + }, "email":{ self.PRIMARY_KEYS: {"ID"}, self.REPLICATION_METHOD: self.INCREMENTAL, @@ -135,7 +147,7 @@ def expected_replication_method(self): def select_found_catalogs(self, conn_id, catalogs, only_streams=None, deselect_all_fields: bool = False, non_selected_props=[]): """Select all streams and all fields within streams""" for catalog in catalogs: - if only_streams and catalog["tap_stream_id"] not in only_streams: + if only_streams and catalog["stream_name"] not in only_streams: continue schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) From 1d23a24d8f064ee94dd854dc455758c5b9e950d8 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 3 Sep 2021 11:47:13 +0530 Subject: [PATCH 07/18] added batch size for data extension stream --- tap_exacttarget/client.py | 1 - tap_exacttarget/endpoints/data_extensions.py | 5 +++- tap_exacttarget/fuel_overrides.py | 26 ++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index e892724..eec66f4 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -121,7 +121,6 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz .format(name, search_filter)) else: - cursor.search_filter = {} LOGGER.info( "Making RETRIEVE call to '{}' endpoint with no filters." diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index 268d88e..e0abee9 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -10,6 +10,7 @@ from tap_exacttarget.state import incorporate, save_state, \ get_last_record_value_for_table from tap_exacttarget.util import sudsobj_to_dict +from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row LOGGER = singer.get_logger() # noqa @@ -193,10 +194,12 @@ def _replicate(self, customer_key, keys, LOGGER.info("Fetching {} from {} to {}" .format(table, start, end)) - cursor = FuelSDK.ET_DataExtension_Row() + # use custom class to apply 'batch_size' + cursor = TapExacttarget__ET_DataExtension_Row() cursor.auth_stub = self.auth_stub cursor.CustomerKey = customer_key cursor.props = keys + cursor.options = {"BatchSize": self.batch_size} if partial: cursor.search_filter = get_date_page(replication_key, diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 6266af1..4071133 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -34,3 +34,29 @@ def tap_exacttarget__getMoreResults(cursor, batch_size=2500): cursor.last_request_id = obj.request_id return obj + +class TapExacttarget__ET_DataExtension_Row(FuelSDK.ET_DataExtension_Row): + + # extend 'get' from 'ET_DataExtension_Row' + def get(self): + self.getName() + ''' + if props and props.is_a? Array then + @props = props + end + ''' + + if self.props is not None and type(self.props) is dict: + self.props = self.props.keys() + + ''' + if filter and filter.is_a? Hash then + @filter = filter + end + ''' + + # add 'options' parameter to set 'batch_size' + obj = FuelSDK.ET_Get(self.auth_stub, "DataExtensionObject[{0}]".format(self.Name), self.props, self.search_filter, self.options) + self.last_request_id = obj.request_id + + return obj From e1acd6a8587b419363a2c05a54b6845fe63aaa46 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 3 Sep 2021 13:10:43 +0530 Subject: [PATCH 08/18] pylint resolve --- tap_exacttarget/fuel_overrides.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 4071133..9003bde 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -46,7 +46,7 @@ def get(self): end ''' - if self.props is not None and type(self.props) is dict: + if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck self.props = self.props.keys() ''' From dbeeb6bd57b6ea962072b1b0507289809572eebe Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 3 Sep 2021 18:57:23 +0530 Subject: [PATCH 09/18] updated logger messages --- tap_exacttarget/client.py | 10 ++--- tap_exacttarget/endpoints/data_extensions.py | 5 ++- tap_exacttarget/fuel_overrides.py | 41 ++++++++++++++++++++ tests/unittests/test_get_response_items.py | 40 +++++++++++++++++++ 4 files changed, 88 insertions(+), 8 deletions(-) create mode 100644 tests/unittests/test_get_response_items.py diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index eec66f4..221c9a3 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -7,13 +7,13 @@ LOGGER = singer.get_logger() -def _get_response_items(response): +def _get_response_items(response, name): items = response.results if 'count' in response.results: - LOGGER.info('Got {} results.'.format(response.results.get('count'))) items = response.results.get('items') + LOGGER.info('Got {} results from {} endpoint.'.format(len(items), name)) return items @@ -145,7 +145,7 @@ def request_from_cursor(name, cursor, batch_size): raise RuntimeError("Request failed with '{}'" .format(response.message)) - for item in _get_response_items(response): + for item in _get_response_items(response, name): yield item while response.more_results: @@ -155,18 +155,16 @@ def request_from_cursor(name, cursor, batch_size): # use 'getMoreResults' for campaigns as it does not use # batch_size, rather it uses $page and $pageSize and REST Call response = cursor.getMoreResults() - LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results.get('items')), name)) else: # Override call to getMoreResults to add a batch_size parameter # response = cursor.getMoreResults() response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) - LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) if not response.status: raise RuntimeError("Request failed with '{}'" .format(response.message)) - for item in _get_response_items(response): + for item in _get_response_items(response, name): yield item LOGGER.info("Done retrieving results from '{}' endpoint".format(name)) diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index e0abee9..cffe6fe 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -10,7 +10,8 @@ from tap_exacttarget.state import incorporate, save_state, \ get_last_record_value_for_table from tap_exacttarget.util import sudsobj_to_dict -from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row +from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row, \ + TapExacttarget__ET_DataExtension_Column LOGGER = singer.get_logger() # noqa @@ -94,7 +95,7 @@ def _get_fields(self, extensions): result = request( 'DataExtensionField', - FuelSDK.ET_DataExtension_Column, + TapExacttarget__ET_DataExtension_Column, self.auth_stub, batch_size=self.batch_size) diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 9003bde..5d9310b 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -60,3 +60,44 @@ def get(self): self.last_request_id = obj.request_id return obj + +class TapExacttarget__ET_DataExtension_Column(FuelSDK.ET_DataExtension_Column): + + # extend 'get' from 'ET_DataExtension_Column' + def get(self): + ''' + if props and props.is_a? Array then + @props = props + end + ''' + + if self.props is not None and type(self.props) is dict: + self.props = self.props.keys() + + ''' + if filter and filter.is_a? Hash then + @filter = filter + end + ''' + + ''' + fixCustomerKey = False + if filter and filter.is_a? Hash then + @filter = filter + if @filter.has_key?("Property") && @filter["Property"] == "CustomerKey" then + @filter["Property"] = "DataExtension.CustomerKey" + fixCustomerKey = true + end + end + ''' + + obj = FuelSDK.ET_Get(self.auth_stub, self.obj, self.props, self.search_filter, self.options) + self.last_request_id = obj.request_id + + ''' + if fixCustomerKey then + @filter["Property"] = "CustomerKey" + end + ''' + + return obj diff --git a/tests/unittests/test_get_response_items.py b/tests/unittests/test_get_response_items.py new file mode 100644 index 0000000..52e338e --- /dev/null +++ b/tests/unittests/test_get_response_items.py @@ -0,0 +1,40 @@ +import unittest +from unittest import mock +import tap_exacttarget.client as _client + +class Mockresponse: + def __init__(self, json): + self.results = json + self.more_results = False + +class TestGetResponseItems(unittest.TestCase): + + @mock.patch("tap_exacttarget.client.LOGGER.info") + def test_result_without_count(self, mocked_logger): + json = [{ + "key1": "value1", + "key2": "value2" + },{ + "key3": "value3", + "key4": "value4" + }] + items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") + self.assertEquals(items, json) + mocked_logger.assert_called_with("Got {} results from {} endpoint.".format(2, "TestDataAccessObject")) + + @mock.patch("tap_exacttarget.client.LOGGER.info") + def test_result_with_count(self, mocked_logger): + json = { + "count": 2, + "items": [{ + "key1": "value1", + "key2": "value2" + },{ + "key3": "value3", + "key4": "value4" + } + ] + } + items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") + self.assertEquals(items, json.get("items")) + mocked_logger.assert_called_with("Got {} results from {} endpoint.".format(2, "TestDataAccessObject")) From 054acac0c641af0fdb46fcd90617b3e5e3f7cca8 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 3 Sep 2021 19:00:47 +0530 Subject: [PATCH 10/18] pylint resolve --- tap_exacttarget/fuel_overrides.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 5d9310b..064a780 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -71,7 +71,7 @@ def get(self): end ''' - if self.props is not None and type(self.props) is dict: + if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck self.props = self.props.keys() ''' From 73aeb9543e0cf74df002fef05dedd068a5319c93 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 6 Sep 2021 14:55:56 +0530 Subject: [PATCH 11/18] added data extension incremental stream --- tests/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/base.py b/tests/base.py index 8d9f895..534cf11 100644 --- a/tests/base.py +++ b/tests/base.py @@ -48,7 +48,7 @@ def get_credentials(self): def get_properties(self, original: bool = True): return_value = { - 'start_date': '2014-01-01T00:00:00Z', + 'start_date': '2019-01-01T00:00:00Z', 'client_id': os.getenv('TAP_EXACTTARGET_CLIENT_ID'), 'tenant_subdomain': os.getenv('TAP_EXACTTARGET_TENANT_SUBDOMAIN') } @@ -84,6 +84,11 @@ def expected_metadata(self): self.PRIMARY_KEYS: {"_CustomObjectKey", "ID"}, self.REPLICATION_METHOD: self.FULL_TABLE, }, + "data_extension.test 1":{ + self.PRIMARY_KEYS: {"_CustomObjectKey", "ID"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"JoinDate"}, + }, "email":{ self.PRIMARY_KEYS: {"ID"}, self.REPLICATION_METHOD: self.INCREMENTAL, From e353b8501cc6258dd423e99e661ebbfe842b7a77 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 16 Sep 2021 12:50:03 +0530 Subject: [PATCH 12/18] added assertion for verifying that we did not duplicate any records across pages --- tests/test_exacttarget_pagination.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py index 74c5841..bed02e6 100644 --- a/tests/test_exacttarget_pagination.py +++ b/tests/test_exacttarget_pagination.py @@ -38,6 +38,8 @@ def test_run(self): for message in synced_records.get(stream).get('messages') if message.get('action') == 'upsert'] + sync_messages = synced_records.get(stream, {'messages': []}).get('messages') + # verify records are more than page size so multiple page is working self.assertGreater(record_count_sync, page_size) @@ -50,3 +52,13 @@ def test_run(self): # Verify by private keys that data is unique for page self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2)) + + # Verify we did not duplicate any records across pages + records_pks_set = {tuple([message.get('data').get(primary_key) + for primary_key in expected_primary_keys]) + for message in sync_messages} + records_pks_list = [tuple([message.get('data').get(primary_key) + for primary_key in expected_primary_keys]) + for message in sync_messages] + self.assertCountEqual(records_pks_set, records_pks_list, + msg=f"We have duplicate records for {stream}") From 1f56470dfe2ebc3362b558feaa35cf9df51dd74c Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 16 Sep 2021 15:00:02 +0530 Subject: [PATCH 13/18] updated the code --- tests/test_exacttarget_pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py index bed02e6..cb7e50a 100644 --- a/tests/test_exacttarget_pagination.py +++ b/tests/test_exacttarget_pagination.py @@ -61,4 +61,4 @@ def test_run(self): for primary_key in expected_primary_keys]) for message in sync_messages] self.assertCountEqual(records_pks_set, records_pks_list, - msg=f"We have duplicate records for {stream}") + msg="We have duplicate records for {}".format(stream)) From 520b0877cd9fe8800c4c5067daf3d17ecf5d722e Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 22 Sep 2021 13:55:05 +0530 Subject: [PATCH 14/18] updated the code --- tap_exacttarget/client.py | 3 +-- tap_exacttarget/endpoints/subscribers.py | 3 +-- tap_exacttarget/fuel_overrides.py | 5 ++-- tests/base.py | 34 ++++++++++++------------ tests/test_exacttarget_pagination.py | 6 ++--- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index 221c9a3..672b414 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -13,7 +13,7 @@ def _get_response_items(response, name): if 'count' in response.results: items = response.results.get('items') - LOGGER.info('Got {} results from {} endpoint.'.format(len(items), name)) + LOGGER.info('Got %s results from %s endpoint.', len(items), name) return items @@ -121,7 +121,6 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz .format(name, search_filter)) else: - LOGGER.info( "Making RETRIEVE call to '{}' endpoint with no filters." .format(name)) diff --git a/tap_exacttarget/endpoints/subscribers.py b/tap_exacttarget/endpoints/subscribers.py index 9f92167..9dbfd39 100644 --- a/tap_exacttarget/endpoints/subscribers.py +++ b/tap_exacttarget/endpoints/subscribers.py @@ -149,8 +149,7 @@ def pull_subscribers_batch(self, subscriber_keys): return stream = request( - 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, - batch_size=self.batch_size) + 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, batch_size=self.batch_size) for subscriber in stream: subscriber = self.filter_keys_and_parse(subscriber) diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 064a780..576254a 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -35,9 +35,9 @@ def tap_exacttarget__getMoreResults(cursor, batch_size=2500): return obj +# extend 'get' from 'ET_DataExtension_Row' and add 'options' parameter to set 'batch_size' class TapExacttarget__ET_DataExtension_Row(FuelSDK.ET_DataExtension_Row): - # extend 'get' from 'ET_DataExtension_Row' def get(self): self.getName() ''' @@ -61,9 +61,9 @@ def get(self): return obj +# extend 'get' from 'ET_DataExtension_Column' and add 'options' parameter to set 'batch_size' class TapExacttarget__ET_DataExtension_Column(FuelSDK.ET_DataExtension_Column): - # extend 'get' from 'ET_DataExtension_Column' def get(self): ''' if props and props.is_a? Array then @@ -91,6 +91,7 @@ def get(self): end ''' + # add 'options' parameter to set 'batch_size' obj = FuelSDK.ET_Get(self.auth_stub, self.obj, self.props, self.search_filter, self.options) self.last_request_id = obj.request_id diff --git a/tests/base.py b/tests/base.py index 534cf11..7213e64 100644 --- a/tests/base.py +++ b/tests/base.py @@ -132,42 +132,40 @@ def expected_metadata(self): } def streams_to_select(self): - return set(self.expected_metadata().keys()) - {'event', 'list_subscriber', 'subscriber', 'list_send'} + return set(self.expected_metadata().keys()) - {'event', 'list_subscriber', 'subscriber'} def expected_replication_keys(self): return {table: properties.get(self.REPLICATION_KEYS, set()) - for table, properties - in self.expected_metadata().items()} + for table, properties in self.expected_metadata().items()} def expected_primary_keys(self): return {table: properties.get(self.PRIMARY_KEYS, set()) - for table, properties - in self.expected_metadata().items()} + for table, properties in self.expected_metadata().items()} def expected_replication_method(self): return {table: properties.get(self.REPLICATION_METHOD, set()) - for table, properties - in self.expected_metadata().items()} + for table, properties in self.expected_metadata().items()} def select_found_catalogs(self, conn_id, catalogs, only_streams=None, deselect_all_fields: bool = False, non_selected_props=[]): """Select all streams and all fields within streams""" for catalog in catalogs: if only_streams and catalog["stream_name"] not in only_streams: continue + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) non_selected_properties = non_selected_props if not deselect_all_fields else [] if deselect_all_fields: # get a list of all properties so that none are selected - non_selected_properties = schema.get('annotated-schema', {}).get( - 'properties', {}) + non_selected_properties = schema.get('annotated-schema', {}).get('properties', {}) non_selected_properties = non_selected_properties.keys() - additional_md = [] - connections.select_catalog_and_fields_via_metadata( - conn_id, catalog, schema, additional_md=additional_md, - non_selected_fields=non_selected_properties - ) + additional_md = [] + connections.select_catalog_and_fields_via_metadata(conn_id, + catalog, + schema, + additional_md=additional_md, + non_selected_fields=non_selected_properties) def run_and_verify_sync(self, conn_id): sync_job_name = runner.run_sync_mode(self, conn_id) @@ -176,14 +174,16 @@ def run_and_verify_sync(self, conn_id): exit_status = menagerie.get_exit_status(conn_id, sync_job_name) menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) - sync_record_count = runner.examine_target_output_file( - self, conn_id, self.streams_to_select(), self.expected_primary_keys()) + sync_record_count = runner.examine_target_output_file(self, + conn_id, + self.streams_to_select(), + self.expected_primary_keys()) self.assertGreater( sum(sync_record_count.values()), 0, msg="failed to replicate any data: {}".format(sync_record_count) ) - print("total replicated row count: {}".format(sum(sync_record_count.values()))) + print("total replicated row count: %s", sum(sync_record_count.values())) return sync_record_count diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py index cb7e50a..6b6201f 100644 --- a/tests/test_exacttarget_pagination.py +++ b/tests/test_exacttarget_pagination.py @@ -54,11 +54,9 @@ def test_run(self): self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2)) # Verify we did not duplicate any records across pages - records_pks_set = {tuple([message.get('data').get(primary_key) - for primary_key in expected_primary_keys]) + records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) for message in sync_messages} - records_pks_list = [tuple([message.get('data').get(primary_key) - for primary_key in expected_primary_keys]) + records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) for message in sync_messages] self.assertCountEqual(records_pks_set, records_pks_list, msg="We have duplicate records for {}".format(stream)) From 58464cc1873cf69f6d78ae8816c5cf177fb85a56 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 22 Sep 2021 14:02:21 +0530 Subject: [PATCH 15/18] resolve unittest failure --- tests/unittests/test_get_response_items.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_get_response_items.py b/tests/unittests/test_get_response_items.py index 52e338e..5d0e940 100644 --- a/tests/unittests/test_get_response_items.py +++ b/tests/unittests/test_get_response_items.py @@ -20,7 +20,7 @@ def test_result_without_count(self, mocked_logger): }] items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") self.assertEquals(items, json) - mocked_logger.assert_called_with("Got {} results from {} endpoint.".format(2, "TestDataAccessObject")) + mocked_logger.assert_called_with("Got %s results from %s endpoint.", 2, "TestDataAccessObject") @mock.patch("tap_exacttarget.client.LOGGER.info") def test_result_with_count(self, mocked_logger): @@ -37,4 +37,4 @@ def test_result_with_count(self, mocked_logger): } items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") self.assertEquals(items, json.get("items")) - mocked_logger.assert_called_with("Got {} results from {} endpoint.".format(2, "TestDataAccessObject")) + mocked_logger.assert_called_with("Got %s results from %s endpoint.", 2, "TestDataAccessObject") From e38f478fd43dd68b36b6dfc083a146c94628a986 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 7 Oct 2021 12:25:03 +0530 Subject: [PATCH 16/18] updated comments --- tap_exacttarget/dao.py | 1 + tap_exacttarget/endpoints/campaigns.py | 1 - tap_exacttarget/endpoints/data_extensions.py | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_exacttarget/dao.py b/tap_exacttarget/dao.py index 27e917f..48996ff 100644 --- a/tap_exacttarget/dao.py +++ b/tap_exacttarget/dao.py @@ -19,6 +19,7 @@ def __init__(self, config, state, auth_stub, catalog): self.state = state.copy() self.catalog = catalog self.auth_stub = auth_stub + # initialize batch size self.batch_size = int(self.config.get('batch_size', 2500)) @classmethod diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index c27a975..631db39 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -38,7 +38,6 @@ class CampaignDataAccessObject(DataAccessObject): KEY_PROPERTIES = ['id'] def sync_data(self): - cursor = request( 'Campaign', FuelSDK.ET_Campaign, diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index cffe6fe..1a9059d 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -95,6 +95,7 @@ def _get_fields(self, extensions): result = request( 'DataExtensionField', + # use custom class to apply 'batch_size' TapExacttarget__ET_DataExtension_Column, self.auth_stub, batch_size=self.batch_size) From 7a9f28904603020c6decfcc27cf50ba53e481012 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 13 Oct 2021 11:20:34 +0530 Subject: [PATCH 17/18] added comments --- tap_exacttarget/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index 672b414..ce5a52a 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -6,7 +6,7 @@ LOGGER = singer.get_logger() - +# prints the number of records fetched from the passed endpoint def _get_response_items(response, name): items = response.results @@ -107,7 +107,8 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz """ cursor = selector() cursor.auth_stub = auth_stub - # set batch size + # set batch size ie. the page size defined by the user as the + # FuelSDK supports setting page size in the "BatchSize" value in "options" parameter cursor.options = {"BatchSize": batch_size} if props is not None: From 4cc3fc109f5285deb93030e731525e2929c6422c Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 14 Oct 2021 00:15:14 +0530 Subject: [PATCH 18/18] resolve unittests failure --- tests/unittests/test_backoff.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unittests/test_backoff.py b/tests/unittests/test_backoff.py index e0a0c63..65f3e43 100644 --- a/tests/unittests/test_backoff.py +++ b/tests/unittests/test_backoff.py @@ -134,7 +134,7 @@ def test_connection_reset_error_occurred__data_extension_get_extensions(self, mo # verify the code backed off and requested for 5 times self.assertEquals(mocked_get.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Column.get") def test_connection_reset_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') @@ -148,7 +148,7 @@ def test_connection_reset_error_occurred__data_extension_get_fields(self, mocked # verify the code backed off and requested for 5 times self.assertEquals(mocked_data_ext_column.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Row.get") def test_connection_reset_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') @@ -530,7 +530,7 @@ def test_socket_timeout_error_occurred__data_extension_get_extensions(self, mock # verify the code backed off and requested for 5 times self.assertEquals(mocked_get.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Column.get") def test_socket_timeout_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out") @@ -544,7 +544,7 @@ def test_socket_timeout_error_occurred__data_extension_get_fields(self, mocked_d # verify the code backed off and requested for 5 times self.assertEquals(mocked_data_ext_column.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Row.get") def test_socket_timeout_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out")