Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14896: Make batch_size apply to the first page and TDL-14895: Use the pagination__list_subscriber_interval_quantity config value correctly #78

Merged
merged 19 commits into from
Oct 13, 2021
Merged
25 changes: 16 additions & 9 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

LOGGER = singer.get_logger()


def _get_response_items(response):
# prints the number of records fetched from the passed endpoint
def _get_response_items(response, name):
items = response.results

if 'count' in response.results:
LOGGER.info('Got {} results.'.format(response.results.get('count')))
items = response.results.get('items')

LOGGER.info('Got %s results from %s endpoint.', len(items), name)
return items


Expand Down Expand Up @@ -109,6 +109,9 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz
"""
cursor = selector()
cursor.auth_stub = auth_stub
# set batch size ie. the page size defined by the user as the
# FuelSDK supports setting page size in the "BatchSize" value in "options" parameter
cursor.options = {"BatchSize": batch_size}

if props is not None:
cursor.props = props
Expand Down Expand Up @@ -144,22 +147,26 @@ def request_from_cursor(name, cursor, batch_size):
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

while response.more_results:
LOGGER.info("Getting more results from '{}' endpoint".format(name))

# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name))
if isinstance(cursor, FuelSDK.ET_Campaign):
# use 'getMoreResults' for campaigns as it does not use
# batch_size, rather it uses $page and $pageSize and REST Call
response = cursor.getMoreResults()
else:
# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)

if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

LOGGER.info("Done retrieving results from '{}' endpoint".format(name))
2 changes: 2 additions & 0 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def __init__(self, config, state, auth_stub, catalog):
self.state = state.copy()
self.catalog = catalog
self.auth_stub = auth_stub
# initialize batch size
self.batch_size = int(self.config.get('batch_size', 2500))

@classmethod
def matches_catalog(cls, catalog):
Expand Down
5 changes: 4 additions & 1 deletion tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def sync_data(self):
cursor = request(
'Campaign',
FuelSDK.ET_Campaign,
self.auth_stub)
self.auth_stub,
# use $pageSize and $page in the props for
# this stream as it calls using REST API
props={"$pageSize": self.batch_size, "$page": 1, "page": 1})

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/content_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def sync_data(self):
stream = request('ContentAreaDataAccessObject',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
20 changes: 13 additions & 7 deletions tap_exacttarget/endpoints/data_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from tap_exacttarget.state import incorporate, save_state, \
get_last_record_value_for_table
from tap_exacttarget.util import sudsobj_to_dict
from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row, \
TapExacttarget__ET_DataExtension_Column

LOGGER = singer.get_logger() # noqa

Expand Down Expand Up @@ -52,7 +54,7 @@ def _get_extensions(self):
FuelSDK.ET_DataExtension,
self.auth_stub,
props=['CustomerKey', 'Name'],
batch_size=int(self.config.get('batch_size', 2500))
batch_size=self.batch_size
)

to_return = {}
Expand Down Expand Up @@ -110,8 +112,10 @@ def _get_fields(self, extensions): # pylint: disable=too-many-branches

result = request(
'DataExtensionField',
FuelSDK.ET_DataExtension_Column,
self.auth_stub)
# use custom class to apply 'batch_size'
TapExacttarget__ET_DataExtension_Column,
self.auth_stub,
batch_size=self.batch_size)

# iterate through all the fields and determine if it is primary key
# or replication key and update the catalog file accordingly:
Expand Down Expand Up @@ -276,19 +280,20 @@ def _replicate(self, customer_key, keys,
LOGGER.info("Fetching {} from {} to {}"
.format(table, start, end))

cursor = FuelSDK.ET_DataExtension_Row()
# use custom class to apply 'batch_size'
cursor = TapExacttarget__ET_DataExtension_Row()
cursor.auth_stub = self.auth_stub
cursor.CustomerKey = customer_key
cursor.props = keys
cursor.options = {"BatchSize": self.batch_size}

if partial:
cursor.search_filter = get_date_page(replication_key,
start,
unit)

batch_size = int(self.config.get('batch_size', 2500))
result = request_from_cursor('DataExtensionObject', cursor,
batch_size=batch_size)
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down Expand Up @@ -350,7 +355,8 @@ def sync_data(self):
'SimpleOperator': 'equals',
'Value': customer_key,
},
props=['CustomerKey', 'CategoryID'])
props=['CustomerKey', 'CategoryID'],
batch_size=self.batch_size)

parent_extension = next(parent_result)
parent_category_id = parent_extension.CategoryID
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def sync_data(self):
stream = request('Email',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def sync_data(self):
stream = request(event_name,
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/folders.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def sync_data(self):
stream = request('Folder',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/list_sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def sync_data(self):
# here: https://salesforce.stackexchange.com/questions/354332/not-getting-modifieddate-for-listsend-endpoint
stream = request('ListSend',
selector,
self.auth_stub)
self.auth_stub,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
5 changes: 3 additions & 2 deletions tap_exacttarget/endpoints/list_subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _get_all_subscribers_list(self):
'Property': 'ListName',
'SimpleOperator': 'equals',
'Value': 'All Subscribers',
})
}, batch_size=self.batch_size)

lists = list(result)

Expand Down Expand Up @@ -94,7 +94,8 @@ def sync_data(self):
self.auth_stub,
_get_list_subscriber_filter(
all_subscribers_list,
start, unit))
start, unit),
batch_size=self.batch_size)

batch_size = 100

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def sync_data(self):
stream = request('List',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def sync_data(self):
stream = request('Send',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
2 changes: 1 addition & 1 deletion tap_exacttarget/endpoints/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def pull_subscribers_batch(self, subscriber_keys):
return

stream = request(
'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter)
'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, batch_size=self.batch_size)

catalog_copy = copy.deepcopy(self.catalog)

Expand Down
68 changes: 68 additions & 0 deletions tap_exacttarget/fuel_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,71 @@ def tap_exacttarget__getMoreResults(cursor, batch_size=2500):
cursor.last_request_id = obj.request_id

return obj

# extend 'get' from 'ET_DataExtension_Row' and add 'options' parameter to set 'batch_size'
class TapExacttarget__ET_DataExtension_Row(FuelSDK.ET_DataExtension_Row):

def get(self):
self.getName()
'''
if props and props.is_a? Array then
@props = props
end
'''

if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck
self.props = self.props.keys()

'''
if filter and filter.is_a? Hash then
@filter = filter
end
'''

# add 'options' parameter to set 'batch_size'
obj = FuelSDK.ET_Get(self.auth_stub, "DataExtensionObject[{0}]".format(self.Name), self.props, self.search_filter, self.options)
self.last_request_id = obj.request_id

return obj

# extend 'get' from 'ET_DataExtension_Column' and add 'options' parameter to set 'batch_size'
class TapExacttarget__ET_DataExtension_Column(FuelSDK.ET_DataExtension_Column):

def get(self):
'''
if props and props.is_a? Array then
@props = props
end
'''

if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck
self.props = self.props.keys()

'''
if filter and filter.is_a? Hash then
@filter = filter
end
'''

'''
fixCustomerKey = False
if filter and filter.is_a? Hash then
@filter = filter
if @filter.has_key?("Property") && @filter["Property"] == "CustomerKey" then
@filter["Property"] = "DataExtension.CustomerKey"
fixCustomerKey = true
end
end
'''

# add 'options' parameter to set 'batch_size'
obj = FuelSDK.ET_Get(self.auth_stub, self.obj, self.props, self.search_filter, self.options)
self.last_request_id = obj.request_id

'''
if fixCustomerKey then
@filter["Property"] = "CustomerKey"
end
'''

return obj
2 changes: 2 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def get_properties(self, original: bool = True):
return return_value

def expected_metadata(self):
# Note: Custom streams failed on our account with an error on
# `_CustomObjectKey` not being valid
return {
"campaign": {
self.PRIMARY_KEYS: {"id"},
Expand Down
62 changes: 62 additions & 0 deletions tests/test_exacttarget_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from base import ExactTargetBase
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner

class ExactTargetPagination(ExactTargetBase):

def name(self):
return "tap_tester_exacttarget_pagination"

def get_properties(self, *args, **kwargs):
props = super().get_properties(*args, **kwargs)
props['batch_size'] = '1'
return props

def test_run(self):
page_size = 1
expected_streams = self.streams_to_select()

conn_id = connections.ensure_connection(self)
runner.run_check_mode(self, conn_id)

found_catalogs = menagerie.get_catalogs(conn_id)
self.select_found_catalogs(conn_id, found_catalogs, only_streams=expected_streams)

sync_record_count = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

for stream in expected_streams:
with self.subTest(stream=stream):
# expected values
expected_primary_keys = self.expected_primary_keys()[stream]

# collect information for assertions from sync based on expected values
record_count_sync = sync_record_count.get(stream, 0)
primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys])
for message in synced_records.get(stream).get('messages')
if message.get('action') == 'upsert']

sync_messages = synced_records.get(stream, {'messages': []}).get('messages')

# verify records are more than page size so multiple page is working
self.assertGreater(record_count_sync, page_size)

if record_count_sync > page_size:
primary_keys_list_1 = primary_keys_list[:page_size]
primary_keys_list_2 = primary_keys_list[page_size:2*page_size]

primary_keys_page_1 = set(primary_keys_list_1)
primary_keys_page_2 = set(primary_keys_list_2)

# Verify by private keys that data is unique for page
self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2))

# Verify we did not duplicate any records across pages
records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys])
for message in sync_messages}
records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys])
for message in sync_messages]
self.assertCountEqual(records_pks_set, records_pks_list,
msg="We have duplicate records for {}".format(stream))
Loading