Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL 19241 add version timestamp in contacts #191

Merged
merged 4 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys,
singer.write_state(STATE)


def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key):
if len(vids) == 0:
return

Expand All @@ -442,6 +442,8 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
mdata = metadata.to_map(catalog.get('metadata'))

for record in data.values():
# Explicitly add the bookmark field "versionTimestamp" and its value in the record.
record[bookmark_key] = bookmark_values.get(record.get("vid"))
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)

Expand All @@ -465,6 +467,8 @@ def sync_contacts(STATE, ctx):
url = get_url("contacts_all")

vids = []
# Dict to store replication key value for each contact record
bookmark_values = {}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
Expand All @@ -476,15 +480,18 @@ def sync_contacts(STATE, ctx):

if not modified_time or modified_time >= start:
vids.append(row['vid'])
# Adding replication key value in `bookmark_values` dict
# Here, key is vid(primary key) and value is replication key value.
bookmark_values[row['vid']] = utils.strftime(modified_time)

if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)
vids = []

_sync_contact_vids(catalog, vids, schema, bumble_bee)
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)

STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
Expand Down Expand Up @@ -941,14 +948,14 @@ class Stream(object):
# Do these first as they are incremental
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'),
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'INCREMENTAL'),

# Do these last as they are full table
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'),
Stream('workflows', sync_workflows, ['id'], 'updatedAt', 'FULL_TABLE'),
Stream('owners', sync_owners, ["ownerId"], 'updatedAt', 'FULL_TABLE'),
Stream('campaigns', sync_campaigns, ["id"], None, 'FULL_TABLE'),
Stream('contact_lists', sync_contact_lists, ["listId"], 'updatedAt', 'FULL_TABLE'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'FULL_TABLE'),
Stream('companies', sync_companies, ["companyId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deals', sync_deals, ["dealId"], 'hs_lastmodifieddate', 'FULL_TABLE'),
Stream('deal_pipelines', sync_deal_pipelines, ['pipelineId'], None, 'FULL_TABLE'),
Expand Down
4 changes: 4 additions & 0 deletions tap_hubspot/schemas/contacts.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
"vid": {
"type": ["null", "integer"]
},
"versionTimestamp": {
"type": ["null", "string"],
"format": "date-time"
},
"canonical-vid": {
"type": ["null", "integer"]
},
Expand Down
12 changes: 11 additions & 1 deletion tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
can_save = True
return ret_records

FIELDS_ADDED_BY_TAP = {
# In 'contacts' streams 'versionTimeStamp' is not available in response of the second call.
# In the 1st call, Tap retrieves records of all contacts and from those records, it collects vids(id of contact).
# These records contain the versionTimestamp field.
# In the 2nd call, vids collected from the 1st call will be used to retrieve the whole contact record.
# Here, the records collected for detailed contact information do not contain the versionTimestamp field.
# So, we add the versionTimestamp field(fetched from 1st call records) explicitly in the record of 2nd call.
"contacts": { "versionTimestamp" }
}

KNOWN_EXTRA_FIELDS = {
'deals': {
# BUG_TDL-14993 | https://jira.talendforge.org/browse/TDL-14993
Expand Down Expand Up @@ -226,7 +236,7 @@ def test_run(self):
continue # skip this expected record if it isn't replicated
actual_record = matching_actual_records_by_pk[0]

expected_keys = set(expected_record.keys())
expected_keys = set(expected_record.keys()).union(FIELDS_ADDED_BY_TAP.get(stream, {}))
actual_keys = set(actual_record.keys())

# NB: KNOWN_MISSING_FIELDS is a dictionary of streams to aggregated missing fields.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_hubspot_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_run(self):
expected_keys = self.expected_automatic_fields().get(stream)

# BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 Replication keys are not included as an automatic field for these streams
if stream in {'companies', 'deals', 'contacts', 'subscription_changes', 'email_events'}:
if stream in {'companies', 'deals', 'subscription_changes', 'email_events'}:
# replication keys not in the expected_keys
remove_keys = self.expected_metadata()[stream].get(self.REPLICATION_KEYS)
expected_keys = expected_keys.difference(remove_keys)
Expand Down
17 changes: 15 additions & 2 deletions tests/test_hubspot_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ def test_run(self):
expected_record_count = 1 if stream not in STREAMS_WITHOUT_UPDATES else 2
expected_records_2 = self.expected_records[stream][-expected_record_count:]

# Given streams does not contain proper replication-key value in the response.
if stream not in {"companies","deals","contacts_by_company","email_events"}:
# verify first sync bookmark value is max bookmark value
for record in actual_records_1:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_1,
msg="First sync bookmark was incorrect, A record with greater replication-key value was found.")

# verify second sync bookmark value is max bookmark value
for record in actual_records_2:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_2,
msg="Second sync bookmark was incorrect, A record with greater replication-key value was found.")

# verify only the new and updated records are captured checking record countx
self.assertGreater(actual_record_count_1, actual_record_count_2)

Expand Down Expand Up @@ -216,8 +230,7 @@ def test_run(self):

# verify that at least 1 record from the first sync is replicated in the 2nd sync
# to prove that the bookmarking is inclusive
if stream in {'contacts', # BUG | https://jira.talendforge.org/browse/TDL-15502
'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503
if stream in {'companies', # BUG | https://jira.talendforge.org/browse/TDL-15503
'email_events'}: # BUG | https://jira.talendforge.org/browse/TDL-15706
continue # skipping failures
self.assertTrue(any([expected_pk in sync_2_pks for expected_pk in expected_sync_1_pks]))
40 changes: 23 additions & 17 deletions tests/test_hubspot_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,31 @@ def test_run(self):
#set(stream_properties[0].get('metadata', {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])))}"

)
# actual_replication_method = stream_properties[0]['metadata'].get('forced-replication-method')
actual_replication_method = stream_properties[0]['metadata'].get('forced-replication-method')
# BUG https://jira.talendforge.org/browse/TDL-9939 all streams are set to full-table in the metadata
# # verify the actual replication matches our expected replication method
# self.assertEqual(
# self.expected_replication_method().get(stream, None),
# actual_replication_method,
# msg="The actual replication method {} doesn't match the expected {}".format(
# actual_replication_method,
# self.expected_replication_method().get(stream, None)))
# verify the actual replication matches our expected replication method
if stream == "contacts":
self.assertEqual(
self.expected_replication_method().get(stream, None),
actual_replication_method,
msg="The actual replication method {} doesn't match the expected {}".format(
actual_replication_method,
self.expected_replication_method().get(stream, None)))

# verify that if there is a replication key we are doing INCREMENTAL otherwise FULL
actual_replication_method = stream_properties[0].get(
"metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD)
if stream_properties[0].get(
"metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []):
# BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 all streams are set to full table
pass # BUG TDL-9939 REMOVE ME WHEN BUG IS ADDRESSED
# self.assertTrue(actual_replication_method == self.INCREMENTAL,
# msg="Expected INCREMENTAL replication "
# "since there is a replication key")

if stream == "contacts":
self.assertTrue(actual_replication_method == self.INCREMENTAL,
msg="Expected INCREMENTAL replication "
"since there is a replication key")
else:
# BUG_TDL-9939 https://jira.talendforge.org/browse/TDL-9939 all streams are set to full table
pass # BUG TDL-9939 REMOVE ME WHEN BUG IS ADDRESSED

else:
self.assertTrue(actual_replication_method == self.FULL,
msg="Expected FULL replication "
Expand All @@ -109,10 +114,11 @@ def test_run(self):
actual_automatic_fields = {item.get("breadcrumb", ["properties", None])[1]
for item in metadata
if item.get("metadata").get("inclusion") == "automatic"}
# self.assertEqual(expected_automatic_fields,
# actual_automatic_fields,
# msg=f"expected {expected_automatic_fields} automatic fields but got {actual_automatic_fields}"
# )
if stream == "contacts":
self.assertEqual(expected_automatic_fields,
actual_automatic_fields,
msg=f"expected {expected_automatic_fields} automatic fields but got {actual_automatic_fields}"
)

# verify that all other fields have inclusion of available
# This assumes there are no unsupported fields for SaaS sources
Expand Down