diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index 662e589..9126ff0 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -6,14 +6,14 @@ LOGGER = singer.get_logger() - -def _get_response_items(response): +# prints the number of records fetched from the passed endpoint +def _get_response_items(response, name): items = response.results if 'count' in response.results: - LOGGER.info('Got {} results.'.format(response.results.get('count'))) items = response.results.get('items') + LOGGER.info('Got %s results from %s endpoint.', len(items), name) return items @@ -109,6 +109,9 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz """ cursor = selector() cursor.auth_stub = auth_stub + # set batch size ie. the page size defined by the user as the + # FuelSDK supports setting page size in the "BatchSize" value in "options" parameter + cursor.options = {"BatchSize": batch_size} if props is not None: cursor.props = props @@ -144,22 +147,26 @@ def request_from_cursor(name, cursor, batch_size): raise RuntimeError("Request failed with '{}'" .format(response.message)) - for item in _get_response_items(response): + for item in _get_response_items(response, name): yield item while response.more_results: LOGGER.info("Getting more results from '{}' endpoint".format(name)) - # Override call to getMoreResults to add a batch_size parameter - # response = cursor.getMoreResults() - response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) - LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) + if isinstance(cursor, FuelSDK.ET_Campaign): + # use 'getMoreResults' for campaigns as it does not use + # batch_size, rather it uses $page and $pageSize and REST Call + response = cursor.getMoreResults() + else: + # Override call to getMoreResults to add a batch_size parameter + # response = cursor.getMoreResults() + response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) if not response.status: raise RuntimeError("Request failed with '{}'" .format(response.message)) - for item in _get_response_items(response): + for item in _get_response_items(response, name): yield item LOGGER.info("Done retrieving results from '{}' endpoint".format(name)) diff --git a/tap_exacttarget/dao.py b/tap_exacttarget/dao.py index 26651db..d2372d4 100644 --- a/tap_exacttarget/dao.py +++ b/tap_exacttarget/dao.py @@ -51,6 +51,8 @@ def __init__(self, config, state, auth_stub, catalog): self.state = state.copy() self.catalog = catalog self.auth_stub = auth_stub + # initialize batch size + self.batch_size = int(self.config.get('batch_size', 2500)) @classmethod def matches_catalog(cls, catalog): diff --git a/tap_exacttarget/endpoints/campaigns.py b/tap_exacttarget/endpoints/campaigns.py index 6a71531..5767698 100644 --- a/tap_exacttarget/endpoints/campaigns.py +++ b/tap_exacttarget/endpoints/campaigns.py @@ -19,7 +19,10 @@ def sync_data(self): cursor = request( 'Campaign', FuelSDK.ET_Campaign, - self.auth_stub) + self.auth_stub, + # use $pageSize and $page in the props for + # this stream as it calls using REST API + props={"$pageSize": self.batch_size, "$page": 1, "page": 1}) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/content_areas.py b/tap_exacttarget/endpoints/content_areas.py index 4666098..37504c3 100644 --- a/tap_exacttarget/endpoints/content_areas.py +++ b/tap_exacttarget/endpoints/content_areas.py @@ -38,7 +38,8 @@ def sync_data(self): stream = request('ContentAreaDataAccessObject', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/data_extensions.py b/tap_exacttarget/endpoints/data_extensions.py index 9dd7433..63e762e 100644 --- a/tap_exacttarget/endpoints/data_extensions.py +++ b/tap_exacttarget/endpoints/data_extensions.py @@ -11,6 +11,8 @@ from tap_exacttarget.state import incorporate, save_state, \ get_last_record_value_for_table from tap_exacttarget.util import sudsobj_to_dict +from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row, \ + TapExacttarget__ET_DataExtension_Column LOGGER = singer.get_logger() # noqa @@ -52,7 +54,7 @@ def _get_extensions(self): FuelSDK.ET_DataExtension, self.auth_stub, props=['CustomerKey', 'Name'], - batch_size=int(self.config.get('batch_size', 2500)) + batch_size=self.batch_size ) to_return = {} @@ -110,8 +112,10 @@ def _get_fields(self, extensions): # pylint: disable=too-many-branches result = request( 'DataExtensionField', - FuelSDK.ET_DataExtension_Column, - self.auth_stub) + # use custom class to apply 'batch_size' + TapExacttarget__ET_DataExtension_Column, + self.auth_stub, + batch_size=self.batch_size) # iterate through all the fields and determine if it is primary key # or replication key and update the catalog file accordingly: @@ -276,19 +280,20 @@ def _replicate(self, customer_key, keys, LOGGER.info("Fetching {} from {} to {}" .format(table, start, end)) - cursor = FuelSDK.ET_DataExtension_Row() + # use custom class to apply 'batch_size' + cursor = TapExacttarget__ET_DataExtension_Row() cursor.auth_stub = self.auth_stub cursor.CustomerKey = customer_key cursor.props = keys + cursor.options = {"BatchSize": self.batch_size} if partial: cursor.search_filter = get_date_page(replication_key, start, unit) - batch_size = int(self.config.get('batch_size', 2500)) result = request_from_cursor('DataExtensionObject', cursor, - batch_size=batch_size) + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) @@ -350,7 +355,8 @@ def sync_data(self): 'SimpleOperator': 'equals', 'Value': customer_key, }, - props=['CustomerKey', 'CategoryID']) + props=['CustomerKey', 'CategoryID'], + batch_size=self.batch_size) parent_extension = next(parent_result) parent_category_id = parent_extension.CategoryID diff --git a/tap_exacttarget/endpoints/emails.py b/tap_exacttarget/endpoints/emails.py index e9c20d6..2408d4c 100644 --- a/tap_exacttarget/endpoints/emails.py +++ b/tap_exacttarget/endpoints/emails.py @@ -48,7 +48,8 @@ def sync_data(self): stream = request('Email', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/events.py b/tap_exacttarget/endpoints/events.py index 7ecbfd5..7492c16 100644 --- a/tap_exacttarget/endpoints/events.py +++ b/tap_exacttarget/endpoints/events.py @@ -58,7 +58,8 @@ def sync_data(self): stream = request(event_name, selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/folders.py b/tap_exacttarget/endpoints/folders.py index 669bd2d..3812120 100644 --- a/tap_exacttarget/endpoints/folders.py +++ b/tap_exacttarget/endpoints/folders.py @@ -45,7 +45,8 @@ def sync_data(self): stream = request('Folder', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/list_sends.py b/tap_exacttarget/endpoints/list_sends.py index faaf72b..c9ff519 100644 --- a/tap_exacttarget/endpoints/list_sends.py +++ b/tap_exacttarget/endpoints/list_sends.py @@ -30,7 +30,8 @@ def sync_data(self): # here: https://salesforce.stackexchange.com/questions/354332/not-getting-modifieddate-for-listsend-endpoint stream = request('ListSend', selector, - self.auth_stub) + self.auth_stub, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/list_subscribers.py b/tap_exacttarget/endpoints/list_subscribers.py index 758d449..07e4d42 100644 --- a/tap_exacttarget/endpoints/list_subscribers.py +++ b/tap_exacttarget/endpoints/list_subscribers.py @@ -54,7 +54,7 @@ def _get_all_subscribers_list(self): 'Property': 'ListName', 'SimpleOperator': 'equals', 'Value': 'All Subscribers', - }) + }, batch_size=self.batch_size) lists = list(result) @@ -94,7 +94,8 @@ def sync_data(self): self.auth_stub, _get_list_subscriber_filter( all_subscribers_list, - start, unit)) + start, unit), + batch_size=self.batch_size) batch_size = 100 diff --git a/tap_exacttarget/endpoints/lists.py b/tap_exacttarget/endpoints/lists.py index e70f843..72cab26 100644 --- a/tap_exacttarget/endpoints/lists.py +++ b/tap_exacttarget/endpoints/lists.py @@ -38,7 +38,8 @@ def sync_data(self): stream = request('List', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/sends.py b/tap_exacttarget/endpoints/sends.py index 39f0b6d..09f56be 100644 --- a/tap_exacttarget/endpoints/sends.py +++ b/tap_exacttarget/endpoints/sends.py @@ -44,7 +44,8 @@ def sync_data(self): stream = request('Send', selector, self.auth_stub, - search_filter) + search_filter, + batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/endpoints/subscribers.py b/tap_exacttarget/endpoints/subscribers.py index 4dfbf19..bbcea91 100644 --- a/tap_exacttarget/endpoints/subscribers.py +++ b/tap_exacttarget/endpoints/subscribers.py @@ -61,7 +61,7 @@ def pull_subscribers_batch(self, subscriber_keys): return stream = request( - 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter) + 'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, batch_size=self.batch_size) catalog_copy = copy.deepcopy(self.catalog) diff --git a/tap_exacttarget/fuel_overrides.py b/tap_exacttarget/fuel_overrides.py index 594f5b4..16757c0 100644 --- a/tap_exacttarget/fuel_overrides.py +++ b/tap_exacttarget/fuel_overrides.py @@ -34,3 +34,71 @@ def tap_exacttarget__getMoreResults(cursor, batch_size=2500): cursor.last_request_id = obj.request_id return obj + +# extend 'get' from 'ET_DataExtension_Row' and add 'options' parameter to set 'batch_size' +class TapExacttarget__ET_DataExtension_Row(FuelSDK.ET_DataExtension_Row): + + def get(self): + self.getName() + ''' + if props and props.is_a? Array then + @props = props + end + ''' + + if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck + self.props = self.props.keys() + + ''' + if filter and filter.is_a? Hash then + @filter = filter + end + ''' + + # add 'options' parameter to set 'batch_size' + obj = FuelSDK.ET_Get(self.auth_stub, "DataExtensionObject[{0}]".format(self.Name), self.props, self.search_filter, self.options) + self.last_request_id = obj.request_id + + return obj + +# extend 'get' from 'ET_DataExtension_Column' and add 'options' parameter to set 'batch_size' +class TapExacttarget__ET_DataExtension_Column(FuelSDK.ET_DataExtension_Column): + + def get(self): + ''' + if props and props.is_a? Array then + @props = props + end + ''' + + if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck + self.props = self.props.keys() + + ''' + if filter and filter.is_a? Hash then + @filter = filter + end + ''' + + ''' + fixCustomerKey = False + if filter and filter.is_a? Hash then + @filter = filter + if @filter.has_key?("Property") && @filter["Property"] == "CustomerKey" then + @filter["Property"] = "DataExtension.CustomerKey" + fixCustomerKey = true + end + end + ''' + + # add 'options' parameter to set 'batch_size' + obj = FuelSDK.ET_Get(self.auth_stub, self.obj, self.props, self.search_filter, self.options) + self.last_request_id = obj.request_id + + ''' + if fixCustomerKey then + @filter["Property"] = "CustomerKey" + end + ''' + + return obj diff --git a/tests/base.py b/tests/base.py index f190f5b..ef13703 100644 --- a/tests/base.py +++ b/tests/base.py @@ -60,6 +60,8 @@ def get_properties(self, original: bool = True): return return_value def expected_metadata(self): + # Note: Custom streams failed on our account with an error on + # `_CustomObjectKey` not being valid return { "campaign": { self.PRIMARY_KEYS: {"id"}, diff --git a/tests/test_exacttarget_pagination.py b/tests/test_exacttarget_pagination.py new file mode 100644 index 0000000..6b6201f --- /dev/null +++ b/tests/test_exacttarget_pagination.py @@ -0,0 +1,62 @@ +from base import ExactTargetBase +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner + +class ExactTargetPagination(ExactTargetBase): + + def name(self): + return "tap_tester_exacttarget_pagination" + + def get_properties(self, *args, **kwargs): + props = super().get_properties(*args, **kwargs) + props['batch_size'] = '1' + return props + + def test_run(self): + page_size = 1 + expected_streams = self.streams_to_select() + + conn_id = connections.ensure_connection(self) + runner.run_check_mode(self, conn_id) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.select_found_catalogs(conn_id, found_catalogs, only_streams=expected_streams) + + sync_record_count = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + # expected values + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information for assertions from sync based on expected values + record_count_sync = sync_record_count.get(stream, 0) + primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys]) + for message in synced_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + + sync_messages = synced_records.get(stream, {'messages': []}).get('messages') + + # verify records are more than page size so multiple page is working + self.assertGreater(record_count_sync, page_size) + + if record_count_sync > page_size: + primary_keys_list_1 = primary_keys_list[:page_size] + primary_keys_list_2 = primary_keys_list[page_size:2*page_size] + + primary_keys_page_1 = set(primary_keys_list_1) + primary_keys_page_2 = set(primary_keys_list_2) + + # Verify by private keys that data is unique for page + self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2)) + + # Verify we did not duplicate any records across pages + records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) + for message in sync_messages} + records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) + for message in sync_messages] + self.assertCountEqual(records_pks_set, records_pks_list, + msg="We have duplicate records for {}".format(stream)) diff --git a/tests/unittests/test_backoff.py b/tests/unittests/test_backoff.py index e0a0c63..65f3e43 100644 --- a/tests/unittests/test_backoff.py +++ b/tests/unittests/test_backoff.py @@ -134,7 +134,7 @@ def test_connection_reset_error_occurred__data_extension_get_extensions(self, mo # verify the code backed off and requested for 5 times self.assertEquals(mocked_get.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Column.get") def test_connection_reset_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') @@ -148,7 +148,7 @@ def test_connection_reset_error_occurred__data_extension_get_fields(self, mocked # verify the code backed off and requested for 5 times self.assertEquals(mocked_data_ext_column.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Row.get") def test_connection_reset_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.error(104, 'Connection reset by peer') @@ -530,7 +530,7 @@ def test_socket_timeout_error_occurred__data_extension_get_extensions(self, mock # verify the code backed off and requested for 5 times self.assertEquals(mocked_get.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Column.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Column.get") def test_socket_timeout_error_occurred__data_extension_get_fields(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out") @@ -544,7 +544,7 @@ def test_socket_timeout_error_occurred__data_extension_get_fields(self, mocked_d # verify the code backed off and requested for 5 times self.assertEquals(mocked_data_ext_column.call_count, 5) - @mock.patch("FuelSDK.objects.ET_DataExtension_Row.get") + @mock.patch("tap_exacttarget.fuel_overrides.TapExacttarget__ET_DataExtension_Row.get") def test_socket_timeout_error_occurred__data_extension_replicate(self, mocked_data_ext_column, mocked_sleep): # mock 'get' and raise error mocked_data_ext_column.side_effect = socket.timeout("The read operation timed out") diff --git a/tests/unittests/test_get_response_items.py b/tests/unittests/test_get_response_items.py new file mode 100644 index 0000000..5d0e940 --- /dev/null +++ b/tests/unittests/test_get_response_items.py @@ -0,0 +1,40 @@ +import unittest +from unittest import mock +import tap_exacttarget.client as _client + +class Mockresponse: + def __init__(self, json): + self.results = json + self.more_results = False + +class TestGetResponseItems(unittest.TestCase): + + @mock.patch("tap_exacttarget.client.LOGGER.info") + def test_result_without_count(self, mocked_logger): + json = [{ + "key1": "value1", + "key2": "value2" + },{ + "key3": "value3", + "key4": "value4" + }] + items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") + self.assertEquals(items, json) + mocked_logger.assert_called_with("Got %s results from %s endpoint.", 2, "TestDataAccessObject") + + @mock.patch("tap_exacttarget.client.LOGGER.info") + def test_result_with_count(self, mocked_logger): + json = { + "count": 2, + "items": [{ + "key1": "value1", + "key2": "value2" + },{ + "key3": "value3", + "key4": "value4" + } + ] + } + items = _client._get_response_items(Mockresponse(json), "TestDataAccessObject") + self.assertEquals(items, json.get("items")) + mocked_logger.assert_called_with("Got %s results from %s endpoint.", 2, "TestDataAccessObject")