From fe2f65f7f4ec561f1189e6bc4bb788570f8e413b Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Wed, 8 Feb 2023 17:32:26 +0000 Subject: [PATCH 1/8] add tickets stream in base.py file --- tests/base.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/base.py b/tests/base.py index 0b72a00a..677f1f34 100644 --- a/tests/base.py +++ b/tests/base.py @@ -148,6 +148,12 @@ def expected_metadata(self): # DOCS_BUG https://stitchdata.atlassian.net/browse self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updatedAt"}, self.OBEYS_START_DATE: True + }, + "tickets": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updatedAt"}, + self.OBEYS_START_DATE: True } } From c811d37929f76bf0a9248c64844425ddfcaeb7a4 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Wed, 8 Feb 2023 19:58:08 +0000 Subject: [PATCH 2/8] get ticket records --- tests/client.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/client.py b/tests/client.py index fd26646a..18d47837 100644 --- a/tests/client.py +++ b/tests/client.py @@ -207,9 +207,43 @@ def read(self, stream, parent_ids=[], since=''): return self.get_email_events() elif stream == 'subscription_changes': return self.get_subscription_changes(since) + elif stream == "tickets": + return self.get_tickets() else: raise NotImplementedError + def get_tickets(self): + """ + Get all tickets. + """ + url = f"{BASE_URL}/crm/v4/objects/tickets" + replication_key = list(self.replication_keys["tickets"])[0] + records = [] + + # response = self.get(url) + + while True: + params = {"limit": 100, "associations": "contact,company,deals"} + response = self.get(url, params=params) + + # if data.get(path) is None: + # raise RuntimeError( + # "Unexpected API response: {} not in {}".format(path, data.keys())) + + # for row in data["results"]: + records.extend( + [ + record + for record in response["results"] + if record[replication_key] >= "2023-01-01T13:47:54.269Z" # self.start_date + ] + ) + + if not response.get("paging"): + break + params["after"] = response.get("paging").get("next").get("after") + return records + def get_campaigns(self): """ Get all campaigns by id, then grab the details of each campaign. From db3b083f8a6b02173338e96b27c8d165e92c892c Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Thu, 9 Feb 2023 09:54:37 +0000 Subject: [PATCH 3/8] add create ticket function for integration test case --- tests/client.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/client.py b/tests/client.py index 18d47837..37952398 100644 --- a/tests/client.py +++ b/tests/client.py @@ -216,6 +216,7 @@ def get_tickets(self): """ Get all tickets. """ + LOGGER.info(f"Within Get Tickets") url = f"{BASE_URL}/crm/v4/objects/tickets" replication_key = list(self.replication_keys["tickets"])[0] records = [] @@ -729,6 +730,8 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1): return self.create_subscription_changes() elif stream == 'subscription_changes': return self.create_subscription_changes(subscriptions, times) + elif stream == 'tickets': + return self.create_tickets() else: raise NotImplementedError(f"There is no create_{stream} method in this dipatch!") @@ -984,6 +987,30 @@ def create_deals(self): records = [response] return records + def create_tickets(self): + """ + HubSpot API https://developers.hubspot.com/docs/api/crm/tickets + """ + LOGGER.info(f"Within Create Tickets") + url = f"{BASE_URL}/crm/v4/objects/tickets" + + data = { + "properties": { + "content": "also a test", + "createdate": "2020-08-26T15:18:14.135Z", + "hs_lastmodifieddate": "2020-11-24T19:45:08.380Z", + "hs_pipeline": "0", + "hs_pipeline_stage": "1", + "hs_ticket_priority": "MEDIUM", + "subject": "ticket created through python code" + } + } + + # generate a record + response = self.post(url, data) + records = [response] + return records + def create_email_events(self): """ HubSpot API https://legacydocs.hubspot.com/docs/methods/email/email_events_overview From ef2e9da4b0d493288394ab02ff80363e65fc1708 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Fri, 10 Feb 2023 06:54:17 +0000 Subject: [PATCH 4/8] test tickets stream in interrupt sync mode --- tests/test_hubspot_interrupted_sync.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_hubspot_interrupted_sync.py b/tests/test_hubspot_interrupted_sync.py index 9191ece2..7654e461 100644 --- a/tests/test_hubspot_interrupted_sync.py +++ b/tests/test_hubspot_interrupted_sync.py @@ -18,7 +18,7 @@ def name(): def streams_to_test(self): """expected streams minus the streams not under test""" - return {'companies', 'engagements'} + return {'companies', 'engagements', 'tickets'} def simulated_interruption(self, reference_state): @@ -38,6 +38,12 @@ def simulated_interruption(self, reference_state): new_state['bookmarks']['engagements']['lastUpdated'] = None new_state['bookmarks']['engagements']['current_sync_start'] = engagements_bookmark + tickets_bookmark = self.timedelta_formatted( + reference_state['bookmarks']['engagements']['lastUpdated'], + days=-1, str_format=self.BASIC_DATE_FORMAT + ) + new_state['bookmarks']['tickets']['updatedAt'] = tickets_bookmark + return new_state def get_properties(self): From 5b19fe9ace6289157f99739b3ebf96865a81ad7e Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Fri, 10 Feb 2023 20:04:49 +0000 Subject: [PATCH 5/8] Changes: - add page limit - add functions to get, create, and update the tickets stream --- tests/base.py | 1 + tests/client.py | 101 +++++++++++++++++++++++++++++------------------- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/tests/base.py b/tests/base.py index 677f1f34..769eac88 100644 --- a/tests/base.py +++ b/tests/base.py @@ -153,6 +153,7 @@ def expected_metadata(self): # DOCS_BUG https://stitchdata.atlassian.net/browse self.PRIMARY_KEYS: {"id"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"updatedAt"}, + self.EXPECTED_PAGE_SIZE: 100, self.OBEYS_START_DATE: True } } diff --git a/tests/client.py b/tests/client.py index 37952398..62cae35d 100644 --- a/tests/client.py +++ b/tests/client.py @@ -212,39 +212,6 @@ def read(self, stream, parent_ids=[], since=''): else: raise NotImplementedError - def get_tickets(self): - """ - Get all tickets. - """ - LOGGER.info(f"Within Get Tickets") - url = f"{BASE_URL}/crm/v4/objects/tickets" - replication_key = list(self.replication_keys["tickets"])[0] - records = [] - - # response = self.get(url) - - while True: - params = {"limit": 100, "associations": "contact,company,deals"} - response = self.get(url, params=params) - - # if data.get(path) is None: - # raise RuntimeError( - # "Unexpected API response: {} not in {}".format(path, data.keys())) - - # for row in data["results"]: - records.extend( - [ - record - for record in response["results"] - if record[replication_key] >= "2023-01-01T13:47:54.269Z" # self.start_date - ] - ) - - if not response.get("paging"): - break - params["after"] = response.get("paging").get("next").get("after") - return records - def get_campaigns(self): """ Get all campaigns by id, then grab the details of each campaign. @@ -690,6 +657,42 @@ def get_workflows(self): if record[replication_key] >= self.start_date]) return records + def _get_tickets_by_pk(self, ticket_id): + """ + Get a specific ticket by pk value + HubSpot API https://developers.hubspot.com/docs/api/crm/tickets + """ + url = f"{BASE_URL}/crm/v4/objects/tickets/{ticket_id}?associations=contact,company,deals" + response = self.get(url) + return response + + def get_tickets(self): + """ + Get all tickets. + HubSpot API https://developers.hubspot.com/docs/api/crm/tickets + """ + url = f"{BASE_URL}/crm/v4/objects/tickets" + replication_key = list(self.replication_keys["tickets"])[0] + records = [] + + # response = self.get(url) + + while True: + params = {"limit": 100, "associations": "contact,company,deals"} + response = self.get(url, params=params) + + records.extend( + [ + record + for record in response["results"] + if record[replication_key] >= self.start_date + ] + ) + + if not response.get("paging"): + break + params["after"] = response.get("paging").get("next").get("after") + return records ########################################################################## ### CREATE ########################################################################## @@ -991,18 +994,15 @@ def create_tickets(self): """ HubSpot API https://developers.hubspot.com/docs/api/crm/tickets """ - LOGGER.info(f"Within Create Tickets") url = f"{BASE_URL}/crm/v4/objects/tickets" - + record_uuid = str(uuid.uuid4()).replace('-', '') data = { "properties": { - "content": "also a test", - "createdate": "2020-08-26T15:18:14.135Z", - "hs_lastmodifieddate": "2020-11-24T19:45:08.380Z", + "content": f"Created for testing purpose - {record_uuid}", "hs_pipeline": "0", "hs_pipeline_stage": "1", "hs_ticket_priority": "MEDIUM", - "subject": "ticket created through python code" + "subject": f"Sample ticket name - {record_uuid}" } } @@ -1311,6 +1311,8 @@ def update(self, stream, record_id): return self.update_forms(record_id) elif stream == 'engagements': return self.update_engagements(record_id) + elif stream == 'tickets': + return self.update_tickets(record_id) else: raise NotImplementedError(f"Test client does not have an update method for {stream}") @@ -1526,6 +1528,27 @@ def update_engagements(self, engagement_id): return record + def update_tickets(self, ticket_id): + """ + Hubspot API https://developers.hubspot.com/docs/api/crm/tickets + :params ticket_id: the pk value of the ticket record to update + :return: + """ + url = f"{BASE_URL}/crm/v4/objects/tickets/{ticket_id}" + + record_uuid = str(uuid.uuid4()).replace('-', '')[:20] + data = { + "properties": { + "subject": f"update record for testing - {record_uuid}" + } + } + + self.patch(url, data) + + record = self._get_tickets_by_pk(ticket_id) + + return record + ########################################################################## ### Deletes ########################################################################## From ac6cf6dc9506f98c4c39ee813586f3080874b041 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Fri, 10 Feb 2023 21:00:54 +0000 Subject: [PATCH 6/8] modify get logic for tickets in client.py and update sync offset test --- tests/client.py | 4 ++-- tests/test_hubspot_interrupted_sync_offset.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/client.py b/tests/client.py index 62cae35d..7144ceaa 100644 --- a/tests/client.py +++ b/tests/client.py @@ -677,15 +677,15 @@ def get_tickets(self): # response = self.get(url) + params = {"limit": 100, "associations": "contact,company,deals"} while True: - params = {"limit": 100, "associations": "contact,company,deals"} response = self.get(url, params=params) records.extend( [ record for record in response["results"] - if record[replication_key] >= self.start_date + if record[replication_key] >= self.start_date_strf.replace('.Z','.000Z') ] ) diff --git a/tests/test_hubspot_interrupted_sync_offset.py b/tests/test_hubspot_interrupted_sync_offset.py index abacba5f..bbe26248 100644 --- a/tests/test_hubspot_interrupted_sync_offset.py +++ b/tests/test_hubspot_interrupted_sync_offset.py @@ -33,6 +33,7 @@ def streams_to_test(self): 'email_events', # unable to manually find a partial state with our test data 'contacts_by_company', # interruptible does not apply, child of 'companies' 'subscription_changes', # BUG_TDL-14938 + 'tickets' # covered in TestHubspotInterruptedSync1 } return self.expected_streams() - untested From b3071bb72830ae06d1511fb50ba3536a0e921d4a Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Mon, 13 Feb 2023 11:41:36 +0530 Subject: [PATCH 7/8] address review comments and reformat the client.py as per the standards --- tests/client.py | 228 ++++++++++++++++++++++++------------------------ 1 file changed, 115 insertions(+), 113 deletions(-) diff --git a/tests/client.py b/tests/client.py index 7144ceaa..c4df9d52 100644 --- a/tests/client.py +++ b/tests/client.py @@ -1,15 +1,11 @@ import datetime -import time -import requests -import backoff -import json -# from json.decoder import JSONDecodeError -import uuid import random +import uuid -from tap_tester import menagerie, LOGGER - +import backoff +import requests from base import HubspotBaseTest +from tap_tester import LOGGER DEBUG = False BASE_URL = "https://api.hubapi.com" @@ -19,6 +15,7 @@ class TestClient(): START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" V3_DEALS_PROPERTY_PREFIXES = {'hs_date_entered', 'hs_date_exited', 'hs_time_in'} BOOKMARK_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + ########################################################################## ### CORE METHODS ########################################################################## @@ -29,7 +26,7 @@ def giveup(exc): return False return exc.response is not None \ - and 400 <= exc.response.status_code < 500 + and 400 <= exc.response.status_code < 500 @backoff.on_exception(backoff.constant, (requests.exceptions.RequestException, @@ -60,14 +57,14 @@ def post(self, url, data=dict(), params=dict(), debug=DEBUG): headers = dict(self.HEADERS) headers['content-type'] = "application/json" response = requests.post(url, json=data, params=params, headers=headers) - LOGGER.info(f"TEST CLIENT | POST {url} data={data} params={params} STATUS: {response.status_code}") + LOGGER.info( + f"TEST CLIENT | POST {url} data={data} params={params} STATUS: {response.status_code}") if debug: LOGGER.debug(response.text) response.raise_for_status() if response.status_code == 204: - LOGGER.warn(f"TEST CLIENT Response is empty") # NB: There is a simplejson.scanner.JSONDecodeError thrown when we attempt # to do a response.json() on a 204 response. To get around this we just return an empty list @@ -90,7 +87,8 @@ def put(self, url, data, params=dict(), debug=DEBUG): headers = dict(self.HEADERS) headers['content-type'] = "application/json" response = requests.put(url, json=data, params=params, headers=headers) - LOGGER.info(f"TEST CLIENT | PUT {url} data={data} params={params} STATUS: {response.status_code}") + LOGGER.info( + f"TEST CLIENT | PUT {url} data={data} params={params} STATUS: {response.status_code}") if debug: LOGGER.debug(response.text) @@ -108,7 +106,8 @@ def patch(self, url, data, params=dict(), debug=DEBUG): headers = dict(self.HEADERS) headers['content-type'] = "application/json" response = requests.patch(url, json=data, params=params, headers=headers) - LOGGER.info(f"TEST CLIENT | PATCH {url} data={data} params={params} STATUS: {response.status_code}") + LOGGER.info( + f"TEST CLIENT | PATCH {url} data={data} params={params} STATUS: {response.status_code}") if debug: LOGGER.debug(response.text) @@ -165,11 +164,12 @@ def datatype_transformations(self, stream, records): for record in records: for column in record.keys(): if column in datetime_columns[stream]: - record[column]= self.BaseTest.datetime_from_timestamp( - record[column]/1000, self.BOOKMARK_DATE_FORMAT + record[column] = self.BaseTest.datetime_from_timestamp( + record[column] / 1000, self.BOOKMARK_DATE_FORMAT ) - LOGGER.info(f"TEST CLIENT | Transforming (datatype conversions) {len(records)} {stream} records") + LOGGER.info( + f"TEST CLIENT | Transforming (datatype conversions) {len(records)} {stream} records") return records ########################################################################## @@ -260,11 +260,11 @@ def get_companies(self, since=''): for company in response['companies']: if company['properties']['hs_lastmodifieddate']: company_timestamp = datetime.datetime.fromtimestamp( - company['properties']['hs_lastmodifieddate']['timestamp']/1000 + company['properties']['hs_lastmodifieddate']['timestamp'] / 1000 ) else: company_timestamp = datetime.datetime.fromtimestamp( - company['properties']['createdate']['timestamp']/1000 + company['properties']['createdate']['timestamp'] / 1000 ) if company_timestamp >= since: @@ -294,7 +294,6 @@ def get_contact_lists(self, since='', list_id=''): return response - if since == 'all': params = {'count': 250} else: @@ -317,7 +316,7 @@ def get_contact_lists(self, since='', list_id=''): response = self.get(url, params=params) for record in response['lists']: - if since == 'all' or int(since) <= record[replication_key]: + if since == 'all' or int(since) <= record[replication_key]: records.append(record) has_more = response['has-more'] @@ -342,7 +341,7 @@ def _get_contacts_by_pks(self, pks): params_2['vid'] = pks response_2 = self.get(url_2, params=params_2) for vid, record in response_2.items(): - ts_ms = int(record['properties']['lastmodifieddate']['value'])/1000 + ts_ms = int(record['properties']['lastmodifieddate']['value']) / 1000 converted_ts = self.BaseTest.datetime_from_timestamp( ts_ms, self.BOOKMARK_DATE_FORMAT ) @@ -390,7 +389,6 @@ def get_contacts(self): records = self.denest_properties('contacts', records) return records - def get_contacts_by_company(self, parent_ids): """ Get all contacts_by_company iterating over compnayId's and @@ -452,18 +450,18 @@ def get_deals(self): v1_params = {'includeAllProperties': True, 'allPropertiesFetchMode': 'latest_version', - 'properties' : []} + 'properties': []} replication_key = list(self.replication_keys['deals'])[0] records = [] # hit the v1 endpoint to get the record has_more = True while has_more: - response = self.get(v1_url, params=v1_params) records.extend([record for record in response['deals'] # Here replication key of the deals stream is derived from "hs_lastmodifieddate" field. - if record['properties']["hs_lastmodifieddate"]['timestamp'] >= self.start_date]) + if record['properties']["hs_lastmodifieddate"][ + 'timestamp'] >= self.start_date]) has_more = response['hasMore'] v1_params['offset'] = response['offset'] @@ -488,21 +486,24 @@ def get_deals(self): for v3_record in v3_records: for record in records: if v3_record['id'] == str(record['dealId']): - # don't inclue the v3 property if the value is None non_null_v3_properties = {v3_property_key: v3_property_value - for v3_property_key, v3_property_value in v3_record['properties'].items() + for v3_property_key, v3_property_value in + v3_record['properties'].items() if v3_property_value is not None} # only grab v3 properties with a specific prefix trimmed_v3_properties = {v3_property_key: v3_property_value - for v3_property_key, v3_property_value in non_null_v3_properties.items() + for v3_property_key, v3_property_value in + non_null_v3_properties.items() if any([v3_property_key.startswith(prefix) - for prefix in self.V3_DEALS_PROPERTY_PREFIXES])} + for prefix in + self.V3_DEALS_PROPERTY_PREFIXES])} # the v3 properties must be restructured into objects to match v1 v3_properties = {v3_property_key: {'value': v3_property_value} - for v3_property_key, v3_property_value in trimmed_v3_properties.items()} + for v3_property_key, v3_property_value in + trimmed_v3_properties.items()} # add the v3 record properties to the v1 record record['properties'].update(v3_properties) @@ -523,7 +524,6 @@ def get_email_events(self, recipient=''): has_more = True while has_more: - response = self.get(url, params=params) records.extend([record for record in response['events'] @@ -567,7 +567,6 @@ def get_engagements(self): result['lastUpdated'] = result['engagement']['lastUpdated'] records.append(result) - has_more = response['hasMore'] params['offset'] = response['offset'] @@ -685,7 +684,7 @@ def get_tickets(self): [ record for record in response["results"] - if record[replication_key] >= self.start_date_strf.replace('.Z','.000Z') + if record[replication_key] >= self.start_date_strf.replace('.Z', '.000Z') ] ) @@ -693,6 +692,7 @@ def get_tickets(self): break params["after"] = response.get("paging").get("next").get("after") return records + ########################################################################## ### CREATE ########################################################################## @@ -749,43 +749,43 @@ def create_contacts(self): data = { "properties": [ { - "property": "email", - "value": f"{record_uuid}@stitchdata.com" - }, - { - "property": "firstname", - "value": "Yusaku" - }, - { - "property": "lastname", - "value": "Kasahara" - }, - { - "property": "website", - "value": "http://app.stitchdata.com" - }, - { - "property": "phone", - "value": "555-122-2323" - }, - { - "property": "address", - "value": "25 First Street" - }, - { - "property": "city", - "value": "Cambridge" - }, - { - "property": "state", - "value": "MA" - }, - { - "property": "zip", - "value": "02139" - } - ] - } + "property": "email", + "value": f"{record_uuid}@stitchdata.com" + }, + { + "property": "firstname", + "value": "Yusaku" + }, + { + "property": "lastname", + "value": "Kasahara" + }, + { + "property": "website", + "value": "http://app.stitchdata.com" + }, + { + "property": "phone", + "value": "555-122-2323" + }, + { + "property": "address", + "value": "25 First Street" + }, + { + "property": "city", + "value": "Cambridge" + }, + { + "property": "state", + "value": "MA" + }, + { + "property": "zip", + "value": "02139" + } + ] + } # generate a contacts record response = self.post(url, data) @@ -796,7 +796,7 @@ def create_contacts(self): get_resp = self.get(get_url, params=params) converted_versionTimestamp = self.BaseTest.datetime_from_timestamp( - get_resp['versionTimestamp']/1000, self.BOOKMARK_DATE_FORMAT + get_resp['versionTimestamp'] / 1000, self.BOOKMARK_DATE_FORMAT ) get_resp['versionTimestamp'] = converted_versionTimestamp records = self.denest_properties('contacts', [get_resp]) @@ -848,7 +848,7 @@ def create_contact_lists(self): data = { "name": f"tweeters{record_uuid}", "dynamic": True, - "filters":[ + "filters": [ [{ "operator": "EQ", "value": f"@hubspot{record_uuid}", @@ -978,10 +978,10 @@ def create_deals(self): "value": "60000", "name": "amount" }, - { - "value": "newbusiness", - "name": "dealtype" - } + { + "value": "newbusiness", + "name": "dealtype" + } ] } @@ -997,19 +997,18 @@ def create_tickets(self): url = f"{BASE_URL}/crm/v4/objects/tickets" record_uuid = str(uuid.uuid4()).replace('-', '') data = { - "properties": { - "content": f"Created for testing purpose - {record_uuid}", - "hs_pipeline": "0", - "hs_pipeline_stage": "1", - "hs_ticket_priority": "MEDIUM", - "subject": f"Sample ticket name - {record_uuid}" - } + "properties": { + "content": f"Created for testing purpose - {record_uuid}", + "hs_pipeline": "0", + "hs_pipeline_stage": "1", + "hs_ticket_priority": "MEDIUM", + "subject": f"Sample ticket name - {record_uuid}" + } } # generate a record response = self.post(url, data) - records = [response] - return records + return [response] def create_email_events(self): """ @@ -1020,7 +1019,8 @@ def create_email_events(self): the preferred approach. We do not currently rely on this approach. """ - raise NotImplementedError("Use create_subscription_changes instead to create records for email_events stream") + raise NotImplementedError( + "Use create_subscription_changes instead to create records for email_events stream") def create_engagements(self): """ @@ -1034,7 +1034,7 @@ def create_engagements(self): contact_records = self.get_contacts() contact_ids = [contact['vid'] for contact in contact_records - if contact['vid'] != 2304] # contact 2304 has hit the 10,000 assoc limit + if contact['vid'] != 2304] # contact 2304 has hit the 10,000 assoc limit contact_id = random.choice(contact_ids) url = f"{BASE_URL}/engagements/v1/engagements" @@ -1048,9 +1048,9 @@ def create_engagements(self): "associations": { "contactIds": [contact_id], "companyIds": [6804176293], - "dealIds": [ ], - "ownerIds": [ ], - "ticketIds":[ ] + "dealIds": [], + "ownerIds": [], + "ticketIds": [] }, "attachments": [ { @@ -1195,9 +1195,10 @@ def create_owners(self): """ HubSpot API The Owners API is read-only. Owners can only be created in HubSpot. """ - raise NotImplementedError("Only able to create owners from web app manually. No api endpoint exists.") + raise NotImplementedError( + "Only able to create owners from web app manually. No api endpoint exists.") - def create_subscription_changes(self, subscriptions=[] , times=1): + def create_subscription_changes(self, subscriptions=[], times=1): """ HubSpot API https://legacydocs.hubspot.com/docs/methods/email/update_status @@ -1206,7 +1207,8 @@ def create_subscription_changes(self, subscriptions=[] , times=1): # by default, a new subscription change will be created from a previous subscription change from one week ago as defined in the get if subscriptions == []: subscriptions = self.get_subscription_changes() - subscription_id_list = [[change.get('subscriptionId') for change in subscription['changes']] for subscription in subscriptions] + subscription_id_list = [[change.get('subscriptionId') for change in subscription['changes']] + for subscription in subscriptions] count = 0 email_records = [] subscription_records = [] @@ -1214,14 +1216,14 @@ def create_subscription_changes(self, subscriptions=[] , times=1): for item in subscription_id_list: if count < times: - #if item[0] + # if item[0] record_uuid = str(uuid.uuid4()).replace('-', '') recipient = record_uuid + "@stitchdata.com" url = f"{BASE_URL}/email/public/v1/subscriptions/{recipient}" data = { "subscriptionStatuses": [ { - "id": item[0], #a_sub_id, + "id": item[0], # a_sub_id, "subscribed": True, "optState": "OPT_IN", "legalBasis": "PERFORMANCE_OF_CONTRACT", @@ -1236,17 +1238,17 @@ def create_subscription_changes(self, subscriptions=[] , times=1): # The intention is for this method to return both of the objects that it creates with this put email_event = self.get_email_events(recipient=recipient) - #subscriptions = self.get_subscription_changes() + # subscriptions = self.get_subscription_changes() # if len(email_event) > 1 or len(subscription_change) > 1: # raise RuntimeError( # "Expected this change to generate 1 email_event and 1 subscription_change only. " # "Generate {len(email_event)} email_events and {len(subscription_changes)} subscription_changes." # ) email_records.extend(email_event) - #subscription_records.append(subscription_change) + # subscription_records.append(subscription_change) count += 1 - return email_records # , subscription_records + return email_records # , subscription_records def create_workflows(self): """ @@ -1296,7 +1298,7 @@ def update(self, stream, record_id): # Resets the access_token if the expiry time is less than or equal to the current time if self.CONFIG["token_expires"] <= datetime.datetime.utcnow(): self.acquire_access_token_from_refresh_token() - + if stream == 'companies': return self.update_companies(record_id) elif stream == 'contacts': @@ -1499,13 +1501,15 @@ def update_owners(self): """ HubSpot API The Owners API is read-only. Owners can only be updated in HubSpot. """ - raise NotImplementedError("Only able to update owners from web app manuanlly. No API endpoint in hubspot.") + raise NotImplementedError( + "Only able to update owners from web app manuanlly. No API endpoint in hubspot.") def update_campaigns(self): """ HubSpot API The Campaigns API is read-only. Campaigns can only be updated in HubSpot. """ - raise NotImplementedError("Only able to update campaigns from web app manuanlly. No API endpoint in hubspot.") + raise NotImplementedError( + "Only able to update campaigns from web app manuanlly. No API endpoint in hubspot.") def update_engagements(self, engagement_id): """ @@ -1545,9 +1549,7 @@ def update_tickets(self, ticket_id): self.patch(url, data) - record = self._get_tickets_by_pk(ticket_id) - - return record + return self._get_tickets_by_pk(ticket_id) ########################################################################## ### Deletes @@ -1557,7 +1559,7 @@ def cleanup(self, stream, records, count=10): # Resets the access_token if the expiry time is less than or equal to the current time if self.CONFIG["token_expires"] <= datetime.datetime.utcnow(): self.acquire_access_token_from_refresh_token() - + if stream == 'deal_pipelines': self.delete_deal_pipelines(records, count) elif stream == 'contact_lists': @@ -1574,7 +1576,7 @@ def delete_contact_lists(self, records=[], count=10): record_ids_to_delete = [record['listId'] for record in records] if len(record_ids_to_delete) == 1 or \ - len(record_ids_to_delete) <= count: + len(record_ids_to_delete) <= count: raise RuntimeError( "delete count is greater or equal to the number of existing records for contact_lists, " "need to have at least one record remaining" @@ -1584,7 +1586,6 @@ def delete_contact_lists(self, records=[], count=10): self.delete(url) - def delete_deal_pipelines(self, records=[], count=10): """ Delete older records based on timestamp primary key @@ -1595,14 +1596,15 @@ def delete_deal_pipelines(self, records=[], count=10): record_ids_to_delete = [record['pipelineId'] for record in records] if len(record_ids_to_delete) == 1 or \ - len(record_ids_to_delete) <= count: + len(record_ids_to_delete) <= count: raise RuntimeError( "delete count is greater or equal to the number of existing records for deal_pipelines, " "need to have at least one record remaining" ) for record_id in record_ids_to_delete: - if record_id == 'default' or len(record_id) > 16: # not a timestamp, not made by this client - continue # skip + if record_id == 'default' or len( + record_id) > 16: # not a timestamp, not made by this client + continue # skip url = f"{BASE_URL}/crm-pipelines/v1/pipelines/deals/{record_id}" self.delete(url) @@ -1635,8 +1637,8 @@ def acquire_access_token_from_refresh_token(self): self.CONFIG['access_token'] = auth['access_token'] self.CONFIG['refresh_token'] = auth['refresh_token'] self.CONFIG['token_expires'] = ( - datetime.datetime.utcnow() + - datetime.timedelta(seconds=auth['expires_in'] - 600)) + datetime.datetime.utcnow() + + datetime.timedelta(seconds=auth['expires_in'] - 600)) self.HEADERS = {'Authorization': f"Bearer {self.CONFIG['access_token']}"} LOGGER.info(f"TEST CLIENT | Token refreshed. Expires at {self.CONFIG['token_expires']}") @@ -1648,7 +1650,7 @@ def __init__(self, start_date=''): self.start_date_strf = start_date if start_date else self.CONFIG['start_date'] self.start_date = datetime.datetime.strptime( - self.start_date_strf, self.BaseTest.START_DATE_FORMAT + self.start_date_strf, self.BaseTest.START_DATE_FORMAT ).timestamp() * 1000 self.acquire_access_token_from_refresh_token() From 072f12625535aa57fbfa1af89522a70d902c7ede Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Wed, 15 Feb 2023 15:56:41 +0530 Subject: [PATCH 8/8] Formatting changes in client.py and bookmark key change in interrupt sync test --- tests/client.py | 8 ++------ tests/test_hubspot_interrupted_sync.py | 5 ++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/client.py b/tests/client.py index c4df9d52..75a289d0 100644 --- a/tests/client.py +++ b/tests/client.py @@ -680,13 +680,9 @@ def get_tickets(self): while True: response = self.get(url, params=params) - records.extend( - [ - record + records.extend([record for record in response["results"] - if record[replication_key] >= self.start_date_strf.replace('.Z', '.000Z') - ] - ) + if record[replication_key] >= self.start_date_strf.replace('.Z', '.000Z')]) if not response.get("paging"): break diff --git a/tests/test_hubspot_interrupted_sync.py b/tests/test_hubspot_interrupted_sync.py index 7654e461..61725abd 100644 --- a/tests/test_hubspot_interrupted_sync.py +++ b/tests/test_hubspot_interrupted_sync.py @@ -39,9 +39,8 @@ def simulated_interruption(self, reference_state): new_state['bookmarks']['engagements']['current_sync_start'] = engagements_bookmark tickets_bookmark = self.timedelta_formatted( - reference_state['bookmarks']['engagements']['lastUpdated'], - days=-1, str_format=self.BASIC_DATE_FORMAT - ) + reference_state['bookmarks']['tickets']['updatedAt'], + days=-1, str_format=self.BASIC_DATE_FORMAT) new_state['bookmarks']['tickets']['updatedAt'] = tickets_bookmark return new_state