Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14896: Make batch_size apply to the first page and TDL-14895: Use the pagination__list_subscriber_interval_quantity config value correctly #64

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
Expand All @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-exacttarget
source /usr/local/share/virtualenvs/tap-exacttarget/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
pip install .[test]
- run:
name: 'unittest'
command: |
Expand All @@ -26,16 +26,10 @@ jobs:
- run:
name: 'Integration Tests'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-exacttarget \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-exacttarget tests
workflows:
version: 2
commit:
Expand Down
17 changes: 11 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
py_modules=['tap_exacttarget'],
install_requires=[
'funcy==1.9.1',
'singer-python==5.9.0',
'singer-python==5.12.1',
'python-dateutil==2.6.0',
'voluptuous==0.10.5',
'Salesforce-FuelSDK==1.3.0'
],
extras_require={
'dev': [
'ipdb==0.11',
'pylint==2.1.1',
'astroid==2.1.0',
'test': [
'pylint==2.10.2',
'astroid==2.7.3',
'nose'
],
'dev': [
'ipdb==0.11'
]
},
entry_points='''
[console_scripts]
tap-exacttarget=tap_exacttarget:main
''',
packages=find_packages()
packages=find_packages(),
package_data={
'tap_exacttarget': ['schemas/*.json']
}
)
15 changes: 12 additions & 3 deletions tap_exacttarget/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import argparse
import json

import sys

import singer
from singer import utils
from singer import metadata
Expand Down Expand Up @@ -102,6 +104,13 @@ def do_sync(args):
.format(stream_catalog.get('stream')))
continue

# The 'subscribers' stream is the child stream of 'list_subscribers'
# When we sync 'list_subscribers', it makes the list of subscriber's
# 'SubscriberKey' that were returned as part of 'list_subscribers' records
# and pass that list to 'subscribers' stream and thus 'subscribers' stream
# will only sync records of subscribers that are present in the list.
# Hence, for different start dates the 'SubscriberKey' list will differ and
# thus 'subscribers' records will also be different for different start dates.
if SubscriberDataAccessObject.matches_catalog(stream_catalog):
subscriber_selected = True
subscriber_catalog = stream_catalog
Expand All @@ -123,7 +132,7 @@ def do_sync(args):
LOGGER.fatal('Cannot replicate `subscriber` without '
'`list_subscriber`. Please select `list_subscriber` '
'and try again.')
exit(1)
sys.exit(1)

for stream_accessor in stream_accessors:
if isinstance(stream_accessor, ListSubscriberDataAccessObject) and \
Expand Down Expand Up @@ -161,10 +170,10 @@ def main():

if success:
LOGGER.info("Completed successfully, exiting.")
exit(0)
sys.exit(0)
else:
LOGGER.info("Run failed, exiting.")
exit(1)
sys.exit(1)

if __name__ == '__main__':
main()
31 changes: 20 additions & 11 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

LOGGER = singer.get_logger()


def _get_response_items(response):
# prints the number of records fetched from the passed endpoint
def _get_response_items(response, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments

items = response.results

if 'count' in response.results:
LOGGER.info('Got {} results.'.format(response.results.get('count')))
items = response.results.get('items')

LOGGER.info('Got %s results from %s endpoint.', len(items), name)
return items


Expand Down Expand Up @@ -61,7 +61,8 @@ def get_auth_stub(config):
LOGGER.info('Failed to auth using V1 endpoint')
if not config.get('tenant_subdomain'):
LOGGER.warning('No tenant_subdomain found, will not attempt to auth with V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or try adding the \'tenant_subdomain\'."
raise Exception(message) from None

# Next try V2
# Move to OAuth2: https://help.salesforce.com/articleView?id=mc_rn_january_2019_platform_ip_remove_legacy_package_create_ability.htm&type=5
Expand All @@ -77,7 +78,8 @@ def get_auth_stub(config):
transport=transport)
except Exception as e:
LOGGER.info('Failed to auth using V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or \'tenant_subdomain\'."
raise Exception(message) from None

LOGGER.info("Success.")
return auth_stub
Expand Down Expand Up @@ -107,6 +109,9 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz
"""
cursor = selector()
cursor.auth_stub = auth_stub
# set batch size ie. the page size defined by the user as the
# FuelSDK supports setting page size in the "BatchSize" value in "options" parameter
cursor.options = {"BatchSize": batch_size}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments regarding the changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment


if props is not None:
cursor.props = props
Expand Down Expand Up @@ -142,22 +147,26 @@ def request_from_cursor(name, cursor, batch_size):
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

while response.more_results:
LOGGER.info("Getting more results from '{}' endpoint".format(name))

# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name))
if isinstance(cursor, FuelSDK.ET_Campaign):
# use 'getMoreResults' for campaigns as it does not use
# batch_size, rather it uses $page and $pageSize and REST Call
response = cursor.getMoreResults()
else:
# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
Comment on lines +156 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the above changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the above change is to support pagination in the tap to run the pagination test.


if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))

for item in _get_response_items(response):
for item in _get_response_items(response, name):
yield item

LOGGER.info("Done retrieving results from '{}' endpoint".format(name))
69 changes: 62 additions & 7 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import backoff
import socket
import functools
import singer
from singer import metadata
import os
from singer import metadata, Transformer, utils

from funcy import project

Expand All @@ -11,6 +15,34 @@
def _get_catalog_schema(catalog):
return catalog.get('schema', {}).get('properties')

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

# function to load the fields in the 'definitions' which contains the reference fields
def load_schema_references():
shared_schema_path = get_abs_path('schemas/definitions.json')

refs = {}
# load json from the path
refs["definitions.json"] = utils.load_json(shared_schema_path)

return refs

# function to load schema from json file
def load_schema(stream):
path = get_abs_path('schemas/{}s.json'.format(stream))
# load json from the path
schema = utils.load_json(path)

return schema

# decorator for retrying on error
def exacttarget_error_handling(fnc):
@backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2)
@functools.wraps(fnc)
def wrapper(*args, **kwargs):
return fnc(*args, **kwargs)
return wrapper

class DataAccessObject():

Expand All @@ -19,6 +51,8 @@ def __init__(self, config, state, auth_stub, catalog):
self.state = state.copy()
self.catalog = catalog
self.auth_stub = auth_stub
# initialize batch size
self.batch_size = int(self.config.get('batch_size', 2500))
Copy link

@dbshah1212 dbshah1212 Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use self.batch_size here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, here the batch size is initialized.


@classmethod
def matches_catalog(cls, catalog):
Expand All @@ -27,17 +61,30 @@ def matches_catalog(cls, catalog):
def generate_catalog(self):
cls = self.__class__

# get the reference schemas
refs = load_schema_references()
# resolve the schema reference and make final schema
schema = singer.resolve_schema_references(load_schema(cls.TABLE), refs)
mdata = metadata.new()
metadata.write(mdata, (), 'inclusion', 'available')
for prop in cls.SCHEMA['properties']: # pylint:disable=unsubscriptable-object
metadata.write(mdata, ('properties', prop), 'inclusion', 'available')

# use 'get_standard_metadata' with primary key, replication key and replication method
mdata = metadata.get_standard_metadata(schema=schema,
key_properties=self.KEY_PROPERTIES,
valid_replication_keys=self.REPLICATION_KEYS if self.REPLICATION_KEYS else None,
replication_method=self.REPLICATION_METHOD)

mdata_map = metadata.to_map(mdata)

# make 'automatic' inclusion for replication keys
for replication_key in self.REPLICATION_KEYS:
mdata_map[('properties', replication_key)]['inclusion'] = 'automatic'

return [{
'tap_stream_id': cls.TABLE,
'stream': cls.TABLE,
'key_properties': cls.KEY_PROPERTIES,
'schema': cls.SCHEMA,
'metadata': metadata.to_list(mdata)
'schema': schema,
'metadata': metadata.to_list(mdata_map)
}]

def filter_keys_and_parse(self, obj):
Expand All @@ -52,6 +99,13 @@ def get_catalog_keys(self):
def parse_object(self, obj):
return project(obj, self.get_catalog_keys())

# a function to write records by applying transformation
@staticmethod
def write_records_with_transform(record, catalog, table):
with Transformer() as transformer:
rec = transformer.transform(record, catalog.get('schema'), metadata.to_map(catalog.get('metadata')))
singer.write_record(table, rec)

def write_schema(self):
singer.write_schema(
self.catalog.get('stream'),
Expand All @@ -75,9 +129,10 @@ def sync(self):

# OVERRIDE THESE TO IMPLEMENT A NEW DAO:

SCHEMA = None
TABLE = None
KEY_PROPERTIES = None
REPLICATION_KEYS = []
REPLICATION_METHOD = None

def sync_data(self): # pylint: disable=no-self-use
raise RuntimeError('sync_data is not implemented!')
39 changes: 11 additions & 28 deletions tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,32 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.schemas import with_properties
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)

LOGGER = singer.get_logger()


class CampaignDataAccessObject(DataAccessObject):

SCHEMA = with_properties({
'id': {
'type': ['null', 'string'],
},
'createdDate': {
'type': ['null', 'string'],
},
'modifiedDate': {
'type': ['null', 'string'],
},
'name': {
'type': ['null', 'string'],
},
'description': {
'type': ['null', 'string'],
},
'campaignCode': {
'type': ['null', 'string'],
},
'color': {
'type': ['null', 'string'],
}
})

TABLE = 'campaign'
KEY_PROPERTIES = ['id']
REPLICATION_METHOD = 'FULL_TABLE'

@exacttarget_error_handling
def sync_data(self):
cursor = request(
'Campaign',
FuelSDK.ET_Campaign,
self.auth_stub)
self.auth_stub,
# use $pageSize and $page in the props for
# this stream as it calls using REST API
props={"$pageSize": self.batch_size, "$page": 1, "page": 1})

catalog_copy = copy.deepcopy(self.catalog)

for campaign in cursor:
campaign = self.filter_keys_and_parse(campaign)

singer.write_records(self.__class__.TABLE, [campaign])
self.write_records_with_transform(campaign, catalog_copy, self.TABLE)
Loading