Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crest Work #11

Merged
merged 6 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
version: 2.1
orbs:
slack: circleci/[email protected]
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
name: 'Setup virtual env'
command: |
python3 -mvenv /usr/local/share/virtualenvs/tap-sailthru
source /usr/local/share/virtualenvs/tap-sailthru/bin/activate
pip install 'pip==21.1.3'
pip install 'setuptools==56.0.0'
pip install .[test]
pip install pytest-cov
- run:
name: 'JSON Validator'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_sailthru/schemas/*.json
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-sailthru/bin/activate
pylint tap_sailthru -d C,R,W
- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-sailthru/bin/activate
python -m pytest --junitxml=junit/test-result.xml --cov=tap_sailthru --cov-report=html tests/unittests/
- add_ssh_keys
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Integration Tests'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-sailthru tests
- slack/notify-on-failure:
only_for_branches: master
workflows:
version: 2
commit:
jobs:
- build:
context:
- circleci-user
- tap-tester-user
build_daily:
triggers:
- schedule:
cron: "0 0 * * *"
filters:
branches:
only:
- master
jobs:
- build:
context:
- circleci-user
- tap-tester-user
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ $ pip install -e .
- `user_agent` (string, required): Process and email for API logging purposes. Example: tap-sailthru <api_user_email@your_company.com>
- `api_key` (string, required): The API key
- `api_secret` (string, required): The API secret
- `request_timeout` (string/integer/float, optional): The time for which request should wait to get response and default request_timeout is 300 seconds.

And the other values mentioned in the authentication section above.

Expand All @@ -55,7 +56,8 @@ And the other values mentioned in the authentication section above.
"start_date": "2021-04-01T00:00:00Z",
"user_agent": "Stitch Tap ([email protected])",
"api_key": "<api_key>",
"api_secret": "<api_secret>"
"api_secret": "<api_secret>",
"request_timeout": 300
}
```

Expand Down
6 changes: 3 additions & 3 deletions tap_sailthru/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import singer
from singer import utils

from tap_sailthru.discover import discover
from tap_sailthru.discover import discover as _discover
from tap_sailthru.sync import sync

REQUIRED_CONFIG_KEYS = ["start_date", "api_key", "api_secret", "user_agent"]
Expand All @@ -22,14 +22,14 @@ def main():

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
catalog = _discover(args.config)
catalog.dump()
# Otherwise run in sync mode
else:
if args.catalog:
catalog = args.catalog
else:
catalog = discover()
catalog = _discover(args.config)
sync(args.config, args.state, catalog)


Expand Down
130 changes: 116 additions & 14 deletions tap_sailthru/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

import backoff
from requests import Session
from requests.exceptions import Timeout
from singer import get_logger, metrics

LOGGER = get_logger()

# Backoff retries
MAX_RETRIES = 3

# timeout request after 300 seconds
REQUEST_TIMEOUT = 300

# pylint: disable=missing-class-docstring
class SailthruClientError(Exception):
pass
Expand All @@ -36,6 +40,98 @@ def __init__(self, message=None, response=None):
class SailthruServer5xxError(Exception):
pass

class SailthruBadRequestError(SailthruClientError):
pass

class SailthruUnauthorizedError(SailthruClientError):
pass

class SailthruForbiddenError(SailthruClientError):
pass

class SailthruNotFoundError(SailthruClientError):
pass

class SailthruConflictError(SailthruClientError):
pass

class SailthruMethodNotFoundError(SailthruClientError):
pass

class SailthruInternalServerError(SailthruServer5xxError):
pass

# error code to exception class and error message mapping
ERROR_CODE_EXCEPTION_MAPPING = {
400: {
"raise_exception": SailthruBadRequestError,
"message": "The request is missing or has a bad parameter."
},
401: {
"raise_exception": SailthruUnauthorizedError,
"message": "Invalid authorization credentials."
},
403: {
"raise_exception": SailthruForbiddenError,
"message": "User does not have permission to access the resource."
},
404: {
"raise_exception": SailthruNotFoundError,
"message": "The resource you have specified cannot be found."
},
405: {
"raise_exception": SailthruMethodNotFoundError,
"message": "The provided HTTP method is not supported by the URL."
},
409: {
"raise_exception": SailthruConflictError,
"message": "The request could not be completed due to a conflict with the current state of the server."
},
429: {
"raise_exception": SailthruClient429Error,
"message": "API rate limit exceeded, please retry after some time."
},
500: {
"raise_exception": SailthruInternalServerError,
"message": "An error has occurred at Sailthru's end."
}
}

# get exception class based on status code
def get_exception_for_status_code(status_code, error_code):
# return SailthruServer5xxError if status code is greater than 500
if status_code > 500:
return SailthruServer5xxError
if status_code == 400 and error_code == 99:
return SailthruClientStatsNotReadyError

return ERROR_CODE_EXCEPTION_MAPPING.get(status_code, {}).get("raise_exception", SailthruClientError)

# raise error with proper message based in error code from the response
def raise_for_error(response):
status_code = response.status_code
try:
json_response = response.json()
except Exception:
json_response = {}

# get sailthru error code, message and prepare message
error_code = json_response.get("error")
error_message = json_response.get("errormsg", ERROR_CODE_EXCEPTION_MAPPING.get(status_code, {}).get("message", "Unknown Error"))
message = "HTTP-error-code: {}, Error: {}, Message: {}".format(status_code, error_code, error_message)

# get exception class
exception = get_exception_for_status_code(status_code, error_code)

# return without raising error as for "blast_query" we have to skip syncing for that blast id
if status_code == 403 and error_code == 99:
LOGGER.warning("{}".format(json_response))
return

# add response with exception for 429 error, for 'retry_after_wait_gen'
if status_code == 429:
raise exception(message, response) from None
raise exception(message) from None

def retry_after_wait_gen():
while True:
Expand All @@ -52,12 +148,25 @@ def retry_after_wait_gen():
class SailthruClient:
base_url = 'https://api.sailthru.com'

def __init__(self, api_key, api_secret, user_agent) -> None:
def __init__(self, api_key, api_secret, user_agent, request_timeout=REQUEST_TIMEOUT) -> None:
self.__api_key = api_key
self.__api_secret = api_secret
self.session = Session()
self.headers = {'User-Agent': user_agent}

# Set request timeout to config param `request_timeout` value.
# If value is 0,"0","" or not passed then it set default to 300 seconds.
if request_timeout and float(request_timeout):
self.__request_timeout = float(request_timeout)
else:
self.__request_timeout = REQUEST_TIMEOUT

def check_platform_access(self) -> None:
"""
Check that provided credentials are valid or not by requesting sample settings.
"""
self.get('/settings', None)

def extract_params(self, params: Union[list, dict]) -> list:
"""
Extracts the values of a set of parameters, recursing into nested dictionaries.
Expand Down Expand Up @@ -213,7 +322,8 @@ def _build_request(self, endpoint, params, method):
@backoff.on_exception(backoff.expo,
(SailthruClientError,
SailthruServer5xxError,
SailthruClientStatsNotReadyError),
SailthruClientStatsNotReadyError,
Timeout),
max_tries=MAX_RETRIES,
factor=2)
def _make_request(self, url, payload, method):
Expand All @@ -225,21 +335,13 @@ def _make_request(self, url, payload, method):
url=url,
params=params,
data=data,
headers=self.headers)
headers=self.headers,
timeout=self.__request_timeout)
timer.tags[metrics.Tag.http_status_code] = response.status_code

if response.status_code == 429:
raise SailthruClient429Error("rate limit exceeded", response)
if response.status_code >= 500:
raise SailthruServer5xxError
if response.status_code == 400 and response.json().get("error") == 99:
raise SailthruClientStatsNotReadyError
if response.status_code == 403 and response.json().get("error") == 99:
# pylint: disable=logging-fstring-interpolation
LOGGER.warning(f"{response.json()}")
return response.json()
# raise error if status code is not 200
if response.status_code != 200:
raise SailthruClientError
raise_for_error(response)

return response.json()

Expand Down
8 changes: 7 additions & 1 deletion tap_sailthru/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from singer.catalog import Catalog

from tap_sailthru.streams import STREAMS
from tap_sailthru.client import SailthruClient


def _get_abs_path(path: str) -> str:
Expand Down Expand Up @@ -82,11 +83,16 @@ def get_schemas() -> Tuple[dict, dict]:
return schemas, schemas_metadata


def discover() -> Catalog:
def discover(config) -> Catalog:
"""
Constructs a singer Catalog object based on the schemas and metadata.
"""

# Initialize SailthruClient() and call check_platform_access() to verify credentials
api_key, api_secret = config.get('api_key'), config.get('api_secret')
client = SailthruClient(api_key, api_secret, config.get('user_agent'), config.get('request_timeout'))
client.check_platform_access()

schemas, schemas_metadata = get_schemas()
streams = []

Expand Down
2 changes: 1 addition & 1 deletion tap_sailthru/schemas/blast_save_list.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
],
"format": "date-time"
},
"largest_purchase item_price": {
"largest_purchase_item_price": {
"type": [
"null",
"string"
Expand Down
14 changes: 7 additions & 7 deletions tap_sailthru/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def sync(self,
with metrics.record_counter(self.tap_stream_id) as counter:
for record in self.get_records(bookmark_datetime):
self.date_records_to_datetime(record)
transform_keys_to_snake_case(record)
record_datetime = singer.utils.strptime_to_utc(record[self.replication_key])
if record_datetime >= bookmark_datetime:
transform_keys_to_snake_case(record)
transformed_record = transformer.transform(record,
stream_schema,
stream_metadata)
Expand Down Expand Up @@ -441,23 +441,23 @@ class PurchaseLog(IncrementalStream):
Docs: https://getstarted.sailthru.com/developers/api/job/#export-purchase-log
"""
tap_stream_id = 'purchase_log'
key_properties = ['extid']
replication_key = 'Date'
valid_replication_keys = ['Date']
key_properties = ['date', 'email_hash', 'extid', 'message_id', 'price', 'channel']
replication_key = 'date'
valid_replication_keys = ['date']
params = {
'job': 'export_purchase_log',
'start_date': '{purchase_log_start_date}',
'end_date': '{purchase_log_end_date}',
}
date_keys = ['Date']
date_keys = ['date']

def get_records(self, bookmark_datetime=None, is_parent=False):

start_datetime, end_datetime = get_start_and_end_date_params(bookmark_datetime)
now = singer.utils.now()

# Generate a report for each day up until the end date or today's date
while start_datetime.date() < min(end_datetime.date(), now.date()):
# Generate a report for each day up until the today's date
while start_datetime.date() <= now.date():

job_date = start_datetime.strftime('%Y%m%d')
self.params['start_date'] = job_date
Expand Down
2 changes: 1 addition & 1 deletion tap_sailthru/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def sync(config, state, catalog):
""" Sync data from tap source """

api_key, api_secret = config.get('api_key'), config.get('api_secret')
client = SailthruClient(api_key, api_secret, config.get('user_agent'))
client = SailthruClient(api_key, api_secret, config.get('user_agent'), config.get('request_timeout'))

with Transformer() as transformer:
for stream in catalog.get_selected_streams(state):
Expand Down
Loading