Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14896: Make batch_size apply to the first page and TDL-14895: Use the pagination__list_subscriber_interval_quantity config value correctly #64

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
LOGGER = singer.get_logger()


def _get_response_items(response):
def _get_response_items(response, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments

items = response.results

if 'count' in response.results:
LOGGER.info('Got {} results.'.format(response.results.get('count')))
items = response.results.get('items')

LOGGER.info('Got %s results from %s endpoint.', len(items), name)
return items


Expand Down Expand Up @@ -107,6 +107,8 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz
"""
cursor = selector()
cursor.auth_stub = auth_stub
# set batch size
cursor.options = {"BatchSize": batch_size}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments regarding the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment


if props is not None:
cursor.props = props
Expand Down Expand Up @@ -142,22 +144,26 @@ def request_from_cursor(name, cursor, batch_size):
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

while response.more_results:
LOGGER.info("Getting more results from '{}' endpoint".format(name))

# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name))
if isinstance(cursor, FuelSDK.ET_Campaign):
# use 'getMoreResults' for campaigns as it does not use
# batch_size, rather it uses $page and $pageSize and REST Call
response = cursor.getMoreResults()
else:
# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
Comment on lines +156 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the above changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the above change is to support pagination in the tap to run the pagination test.


if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

LOGGER.info("Done retrieving results from '{}' endpoint".format(name))
2 changes: 2 additions & 0 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self, config, state, auth_stub, catalog):
self.state = state.copy()
self.catalog = catalog
self.auth_stub = auth_stub
# initialize batch size
self.batch_size = int(self.config.get('batch_size', 2500))
Copy link

@dbshah1212 dbshah1212 Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use self.batch_size here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, here the batch size is initialized.


@classmethod
def matches_catalog(cls, catalog):
Expand Down
5 changes: 4 additions & 1 deletion tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def sync_data(self):
cursor = request(
'Campaign',
FuelSDK.ET_Campaign,
self.auth_stub)
self.auth_stub,
# use $pageSize and $page in the props for
# this stream as it calls using REST API
props={"$pageSize": self.batch_size, "$page": 1, "page": 1})

for campaign in cursor:
campaign = self.filter_keys_and_parse(campaign)
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/content_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def sync_data(self):
stream = request('ContentAreaDataAccessObject',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for content_area in stream:
content_area = self.filter_keys_and_parse(content_area)
Expand Down
20 changes: 13 additions & 7 deletions tap_exacttarget/endpoints/data_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from tap_exacttarget.state import incorporate, save_state, \
get_last_record_value_for_table
from tap_exacttarget.util import sudsobj_to_dict
from tap_exacttarget.fuel_overrides import TapExacttarget__ET_DataExtension_Row, \
TapExacttarget__ET_DataExtension_Column

LOGGER = singer.get_logger() # noqa

Expand Down Expand Up @@ -50,7 +52,7 @@ def _get_extensions(self):
FuelSDK.ET_DataExtension,
self.auth_stub,
props=['CustomerKey', 'Name'],
batch_size=int(self.config.get('batch_size', 2500))
batch_size=self.batch_size
)

to_return = {}
Expand Down Expand Up @@ -93,8 +95,10 @@ def _get_fields(self, extensions):

result = request(
'DataExtensionField',
FuelSDK.ET_DataExtension_Column,
self.auth_stub)
# use custom class to apply 'batch_size'
TapExacttarget__ET_DataExtension_Column,
self.auth_stub,
batch_size=self.batch_size)

for field in result:
extension_id = field.DataExtension.CustomerKey
Expand Down Expand Up @@ -192,19 +196,20 @@ def _replicate(self, customer_key, keys,
LOGGER.info("Fetching {} from {} to {}"
.format(table, start, end))

cursor = FuelSDK.ET_DataExtension_Row()
# use custom class to apply 'batch_size'
cursor = TapExacttarget__ET_DataExtension_Row()
cursor.auth_stub = self.auth_stub
cursor.CustomerKey = customer_key
cursor.props = keys
cursor.options = {"BatchSize": self.batch_size}

if partial:
cursor.search_filter = get_date_page(replication_key,
start,
unit)

batch_size = int(self.config.get('batch_size', 2500))
result = request_from_cursor('DataExtensionObject', cursor,
batch_size=batch_size)
batch_size=self.batch_size)

for row in result:
row = self.filter_keys_and_parse(row)
Expand Down Expand Up @@ -265,7 +270,8 @@ def sync_data(self):
'SimpleOperator': 'equals',
'Value': customer_key,
},
props=['CustomerKey', 'CategoryID'])
props=['CustomerKey', 'CategoryID'],
batch_size=self.batch_size)

parent_extension = next(parent_result)
parent_category_id = parent_extension.CategoryID
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def sync_data(self):
stream = request('Email',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for email in stream:
email = self.filter_keys_and_parse(email)
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def sync_data(self):
stream = request(event_name,
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for event in stream:
event = self.filter_keys_and_parse(event)
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/folders.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def sync_data(self):
stream = request('Folder',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for folder in stream:
folder = self.filter_keys_and_parse(folder)
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/list_sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def sync_data(self):
stream = request('ListSend',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for list_send in stream:
list_send = self.filter_keys_and_parse(list_send)
Expand Down
7 changes: 4 additions & 3 deletions tap_exacttarget/endpoints/list_subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _get_all_subscribers_list(self):
'Property': 'ListName',
'SimpleOperator': 'equals',
'Value': 'All Subscribers',
})
}, batch_size=self.batch_size)

lists = list(result)

Expand All @@ -98,7 +98,7 @@ def sync_data(self):
pagination_unit = self.config.get(
'pagination__list_subscriber_interval_unit', 'days')
pagination_quantity = self.config.get(
'pagination__list_subsctiber_interval_quantity', 1)
'pagination__list_subscriber_interval_quantity', 1)

unit = {pagination_unit: int(pagination_quantity)}

Expand All @@ -112,7 +112,8 @@ def sync_data(self):
self.auth_stub,
_get_list_subscriber_filter(
all_subscribers_list,
start, unit))
start, unit),
batch_size=self.batch_size)

batch_size = 100

Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def sync_data(self):
stream = request('List',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for _list in stream:
_list = self.filter_keys_and_parse(_list)
Expand Down
3 changes: 2 additions & 1 deletion tap_exacttarget/endpoints/sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def sync_data(self):
stream = request('Send',
selector,
self.auth_stub,
search_filter)
search_filter,
batch_size=self.batch_size)

for send in stream:
send = self.filter_keys_and_parse(send)
Expand Down
2 changes: 1 addition & 1 deletion tap_exacttarget/endpoints/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def pull_subscribers_batch(self, subscriber_keys):
return

stream = request(
'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter)
'Subscriber', FuelSDK.ET_Subscriber, self.auth_stub, _filter, batch_size=self.batch_size)

for subscriber in stream:
subscriber = self.filter_keys_and_parse(subscriber)
Expand Down
68 changes: 68 additions & 0 deletions tap_exacttarget/fuel_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,71 @@ def tap_exacttarget__getMoreResults(cursor, batch_size=2500):
cursor.last_request_id = obj.request_id

return obj

# extend 'get' from 'ET_DataExtension_Row' and add 'options' parameter to set 'batch_size'
class TapExacttarget__ET_DataExtension_Row(FuelSDK.ET_DataExtension_Row):

def get(self):
self.getName()
'''
if props and props.is_a? Array then
@props = props
end
'''

if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck
self.props = self.props.keys()

'''
if filter and filter.is_a? Hash then
@filter = filter
end
'''

# add 'options' parameter to set 'batch_size'
obj = FuelSDK.ET_Get(self.auth_stub, "DataExtensionObject[{0}]".format(self.Name), self.props, self.search_filter, self.options)
self.last_request_id = obj.request_id

return obj

# extend 'get' from 'ET_DataExtension_Column' and add 'options' parameter to set 'batch_size'
class TapExacttarget__ET_DataExtension_Column(FuelSDK.ET_DataExtension_Column):

def get(self):
'''
if props and props.is_a? Array then
@props = props
end
'''

if self.props is not None and type(self.props) is dict: # pylint:disable=unidiomatic-typecheck
self.props = self.props.keys()

'''
if filter and filter.is_a? Hash then
@filter = filter
end
'''

'''
fixCustomerKey = False
if filter and filter.is_a? Hash then
@filter = filter
if @filter.has_key?("Property") && @filter["Property"] == "CustomerKey" then
@filter["Property"] = "DataExtension.CustomerKey"
fixCustomerKey = true
end
end
'''

# add 'options' parameter to set 'batch_size'
obj = FuelSDK.ET_Get(self.auth_stub, self.obj, self.props, self.search_filter, self.options)
self.last_request_id = obj.request_id

'''
if fixCustomerKey then
@filter["Property"] = "CustomerKey"
end
'''

return obj
Loading