-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-14896: Make batch_size apply to the first page and TDL-14895: Use the pagination__list_subscriber_interval_quantity config value correctly #64
base: master
Are you sure you want to change the base?
Changes from all commits
6c25a28
623d248
50689ba
a71197c
3e4c22c
f069f96
1d23a24
e1acd6a
dbeeb6b
054acac
73aeb95
e353b85
1f56470
520b087
58464cc
e38f478
7a9f289
0418198
a947b16
1a7909b
ca2ca09
6bfe2c0
d6ea31b
e278f8c
4cc3fc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ version: 2 | |
jobs: | ||
build: | ||
docker: | ||
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester | ||
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester | ||
steps: | ||
- checkout | ||
- run: | ||
|
@@ -11,7 +11,7 @@ jobs: | |
python3 -mvenv /usr/local/share/virtualenvs/tap-exacttarget | ||
source /usr/local/share/virtualenvs/tap-exacttarget/bin/activate | ||
pip install -U 'pip<19.2' 'setuptools<51.0.0' | ||
pip install .[dev] | ||
pip install .[test] | ||
- run: | ||
name: 'unittest' | ||
command: | | ||
|
@@ -26,16 +26,10 @@ jobs: | |
- run: | ||
name: 'Integration Tests' | ||
command: | | ||
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh | ||
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh | ||
source dev_env.sh | ||
source /usr/local/share/virtualenvs/tap-tester/bin/activate | ||
run-test --tap=tap-exacttarget \ | ||
--target=target-stitch \ | ||
--orchestrator=stitch-orchestrator \ | ||
[email protected] \ | ||
--password=$SANDBOX_PASSWORD \ | ||
--client-id=50 \ | ||
tests | ||
run-test --tap=tap-exacttarget tests | ||
workflows: | ||
version: 2 | ||
commit: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,14 +6,14 @@ | |
|
||
LOGGER = singer.get_logger() | ||
|
||
|
||
def _get_response_items(response): | ||
# prints the number of records fetched from the passed endpoint | ||
def _get_response_items(response, name): | ||
items = response.results | ||
|
||
if 'count' in response.results: | ||
LOGGER.info('Got {} results.'.format(response.results.get('count'))) | ||
items = response.results.get('items') | ||
|
||
LOGGER.info('Got %s results from %s endpoint.', len(items), name) | ||
return items | ||
|
||
|
||
|
@@ -61,7 +61,8 @@ def get_auth_stub(config): | |
LOGGER.info('Failed to auth using V1 endpoint') | ||
if not config.get('tenant_subdomain'): | ||
LOGGER.warning('No tenant_subdomain found, will not attempt to auth with V2 endpoint') | ||
raise e | ||
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or try adding the \'tenant_subdomain\'." | ||
raise Exception(message) from None | ||
|
||
# Next try V2 | ||
# Move to OAuth2: https://help.salesforce.com/articleView?id=mc_rn_january_2019_platform_ip_remove_legacy_package_create_ability.htm&type=5 | ||
|
@@ -77,7 +78,8 @@ def get_auth_stub(config): | |
transport=transport) | ||
except Exception as e: | ||
LOGGER.info('Failed to auth using V2 endpoint') | ||
raise e | ||
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or \'tenant_subdomain\'." | ||
raise Exception(message) from None | ||
|
||
LOGGER.info("Success.") | ||
return auth_stub | ||
|
@@ -107,6 +109,9 @@ def request(name, selector, auth_stub, search_filter=None, props=None, batch_siz | |
""" | ||
cursor = selector() | ||
cursor.auth_stub = auth_stub | ||
# set batch size ie. the page size defined by the user as the | ||
# FuelSDK supports setting page size in the "BatchSize" value in "options" parameter | ||
cursor.options = {"BatchSize": batch_size} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add comments regarding the changes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment |
||
|
||
if props is not None: | ||
cursor.props = props | ||
|
@@ -142,22 +147,26 @@ def request_from_cursor(name, cursor, batch_size): | |
raise RuntimeError("Request failed with '{}'" | ||
.format(response.message)) | ||
|
||
for item in _get_response_items(response): | ||
for item in _get_response_items(response, name): | ||
yield item | ||
|
||
while response.more_results: | ||
LOGGER.info("Getting more results from '{}' endpoint".format(name)) | ||
|
||
# Override call to getMoreResults to add a batch_size parameter | ||
# response = cursor.getMoreResults() | ||
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) | ||
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) | ||
if isinstance(cursor, FuelSDK.ET_Campaign): | ||
# use 'getMoreResults' for campaigns as it does not use | ||
# batch_size, rather it uses $page and $pageSize and REST Call | ||
response = cursor.getMoreResults() | ||
else: | ||
# Override call to getMoreResults to add a batch_size parameter | ||
# response = cursor.getMoreResults() | ||
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) | ||
Comment on lines
+156
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of the above changes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The purpose of the above change is to support pagination in the tap to run the pagination test. |
||
|
||
if not response.status: | ||
raise RuntimeError("Request failed with '{}'" | ||
.format(response.message)) | ||
|
||
for item in _get_response_items(response): | ||
for item in _get_response_items(response, name): | ||
yield item | ||
|
||
LOGGER.info("Done retrieving results from '{}' endpoint".format(name)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,9 @@ | ||
import backoff | ||
import socket | ||
import functools | ||
import singer | ||
from singer import metadata | ||
import os | ||
from singer import metadata, Transformer, utils | ||
|
||
from funcy import project | ||
|
||
|
@@ -11,6 +15,34 @@ | |
def _get_catalog_schema(catalog): | ||
return catalog.get('schema', {}).get('properties') | ||
|
||
def get_abs_path(path): | ||
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) | ||
|
||
# function to load the fields in the 'definitions' which contains the reference fields | ||
def load_schema_references(): | ||
shared_schema_path = get_abs_path('schemas/definitions.json') | ||
|
||
refs = {} | ||
# load json from the path | ||
refs["definitions.json"] = utils.load_json(shared_schema_path) | ||
|
||
return refs | ||
|
||
# function to load schema from json file | ||
def load_schema(stream): | ||
path = get_abs_path('schemas/{}s.json'.format(stream)) | ||
# load json from the path | ||
schema = utils.load_json(path) | ||
|
||
return schema | ||
|
||
# decorator for retrying on error | ||
def exacttarget_error_handling(fnc): | ||
@backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2) | ||
@functools.wraps(fnc) | ||
def wrapper(*args, **kwargs): | ||
return fnc(*args, **kwargs) | ||
return wrapper | ||
|
||
class DataAccessObject(): | ||
|
||
|
@@ -19,6 +51,8 @@ def __init__(self, config, state, auth_stub, catalog): | |
self.state = state.copy() | ||
self.catalog = catalog | ||
self.auth_stub = auth_stub | ||
# initialize batch size | ||
self.batch_size = int(self.config.get('batch_size', 2500)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use self.batch_size here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, here the batch size is initialized. |
||
|
||
@classmethod | ||
def matches_catalog(cls, catalog): | ||
|
@@ -27,17 +61,30 @@ def matches_catalog(cls, catalog): | |
def generate_catalog(self): | ||
cls = self.__class__ | ||
|
||
# get the reference schemas | ||
refs = load_schema_references() | ||
# resolve the schema reference and make final schema | ||
schema = singer.resolve_schema_references(load_schema(cls.TABLE), refs) | ||
mdata = metadata.new() | ||
metadata.write(mdata, (), 'inclusion', 'available') | ||
for prop in cls.SCHEMA['properties']: # pylint:disable=unsubscriptable-object | ||
metadata.write(mdata, ('properties', prop), 'inclusion', 'available') | ||
|
||
# use 'get_standard_metadata' with primary key, replication key and replication method | ||
mdata = metadata.get_standard_metadata(schema=schema, | ||
key_properties=self.KEY_PROPERTIES, | ||
valid_replication_keys=self.REPLICATION_KEYS if self.REPLICATION_KEYS else None, | ||
replication_method=self.REPLICATION_METHOD) | ||
|
||
mdata_map = metadata.to_map(mdata) | ||
|
||
# make 'automatic' inclusion for replication keys | ||
for replication_key in self.REPLICATION_KEYS: | ||
mdata_map[('properties', replication_key)]['inclusion'] = 'automatic' | ||
|
||
return [{ | ||
'tap_stream_id': cls.TABLE, | ||
'stream': cls.TABLE, | ||
'key_properties': cls.KEY_PROPERTIES, | ||
'schema': cls.SCHEMA, | ||
'metadata': metadata.to_list(mdata) | ||
'schema': schema, | ||
'metadata': metadata.to_list(mdata_map) | ||
}] | ||
|
||
def filter_keys_and_parse(self, obj): | ||
|
@@ -52,6 +99,13 @@ def get_catalog_keys(self): | |
def parse_object(self, obj): | ||
return project(obj, self.get_catalog_keys()) | ||
|
||
# a function to write records by applying transformation | ||
@staticmethod | ||
def write_records_with_transform(record, catalog, table): | ||
with Transformer() as transformer: | ||
rec = transformer.transform(record, catalog.get('schema'), metadata.to_map(catalog.get('metadata'))) | ||
singer.write_record(table, rec) | ||
|
||
def write_schema(self): | ||
singer.write_schema( | ||
self.catalog.get('stream'), | ||
|
@@ -75,9 +129,10 @@ def sync(self): | |
|
||
# OVERRIDE THESE TO IMPLEMENT A NEW DAO: | ||
|
||
SCHEMA = None | ||
TABLE = None | ||
KEY_PROPERTIES = None | ||
REPLICATION_KEYS = [] | ||
REPLICATION_METHOD = None | ||
|
||
def sync_data(self): # pylint: disable=no-self-use | ||
raise RuntimeError('sync_data is not implemented!') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,49 +1,32 @@ | ||
import FuelSDK | ||
import copy | ||
import singer | ||
|
||
from tap_exacttarget.client import request | ||
from tap_exacttarget.dao import DataAccessObject | ||
from tap_exacttarget.schemas import with_properties | ||
from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling) | ||
|
||
LOGGER = singer.get_logger() | ||
|
||
|
||
class CampaignDataAccessObject(DataAccessObject): | ||
|
||
SCHEMA = with_properties({ | ||
'id': { | ||
'type': ['null', 'string'], | ||
}, | ||
'createdDate': { | ||
'type': ['null', 'string'], | ||
}, | ||
'modifiedDate': { | ||
'type': ['null', 'string'], | ||
}, | ||
'name': { | ||
'type': ['null', 'string'], | ||
}, | ||
'description': { | ||
'type': ['null', 'string'], | ||
}, | ||
'campaignCode': { | ||
'type': ['null', 'string'], | ||
}, | ||
'color': { | ||
'type': ['null', 'string'], | ||
} | ||
}) | ||
|
||
TABLE = 'campaign' | ||
KEY_PROPERTIES = ['id'] | ||
REPLICATION_METHOD = 'FULL_TABLE' | ||
|
||
@exacttarget_error_handling | ||
def sync_data(self): | ||
cursor = request( | ||
'Campaign', | ||
FuelSDK.ET_Campaign, | ||
self.auth_stub) | ||
self.auth_stub, | ||
# use $pageSize and $page in the props for | ||
# this stream as it calls using REST API | ||
props={"$pageSize": self.batch_size, "$page": 1, "page": 1}) | ||
|
||
catalog_copy = copy.deepcopy(self.catalog) | ||
|
||
for campaign in cursor: | ||
campaign = self.filter_keys_and_parse(campaign) | ||
|
||
singer.write_records(self.__class__.TABLE, [campaign]) | ||
self.write_records_with_transform(campaign, catalog_copy, self.TABLE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments to the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments