Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use sync start time to bookmark contacts stream #226

Merged
merged 3 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ def sync_contacts(STATE, ctx):
# Dict to store replication key value for each contact record
bookmark_values = {}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
if bookmark_key in row:
Expand All @@ -540,7 +543,9 @@ def sync_contacts(STATE, ctx):

_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)

STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, sync_start_time)
STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -705,6 +710,9 @@ def sync_deals(STATE, ctx):
url = get_url('deals_all')

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in gen_request(STATE, 'deals', url, params, 'deals', "hasMore", ["offset"], ["offset"], v3_fields=v3_fields):
row_properties = row['properties']
modified_time = None
Expand All @@ -723,7 +731,9 @@ def sync_deals(STATE, ctx):
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record("deals", record, catalog.get('stream_alias'), time_extracted=utils.now())

STATE = singer.write_bookmark(STATE, 'deals', bookmark_key, utils.strftime(max_bk_value))
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, sync_start_time)
STATE = singer.write_bookmark(STATE, 'deals', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -775,6 +785,9 @@ def sync_tickets(STATE, ctx):
url = get_url(stream_id)

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as transformer:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in gen_request_tickets(stream_id, url, params, 'results', "paging"):
# parsing the string formatted date to datetime object
modified_time = utils.strptime_to_utc(row[bookmark_key])
Expand All @@ -789,7 +802,9 @@ def sync_tickets(STATE, ctx):
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time

STATE = singer.write_bookmark(STATE, stream_id, bookmark_key, utils.strftime(max_bk_value))
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, sync_start_time)
STATE = singer.write_bookmark(STATE, stream_id, bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -903,6 +918,9 @@ def sync_contact_lists(STATE, ctx):
url = get_url("contact_lists")
params = {'count': 250}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in gen_request(STATE, 'contact_lists', url, params, "lists", "has-more", ["offset"], ["offset"]):
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)

Expand All @@ -911,7 +929,9 @@ def sync_contact_lists(STATE, ctx):
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]

STATE = singer.write_bookmark(STATE, 'contact_lists', bookmark_key, max_bk_value)
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(utils.strptime_to_utc(max_bk_value), sync_start_time)
STATE = singer.write_bookmark(STATE, 'contact_lists', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)

return STATE
Expand All @@ -932,6 +952,9 @@ def sync_forms(STATE, ctx):
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in data:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)

Expand All @@ -940,7 +963,9 @@ def sync_forms(STATE, ctx):
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]

STATE = singer.write_bookmark(STATE, 'forms', bookmark_key, max_bk_value)
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(utils.strptime_to_utc(max_bk_value), sync_start_time)
STATE = singer.write_bookmark(STATE, 'forms', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)

return STATE
Expand All @@ -963,14 +988,19 @@ def sync_workflows(STATE, ctx):
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in data['workflows']:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("workflows", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]

STATE = singer.write_bookmark(STATE, 'workflows', bookmark_key, max_bk_value)
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(utils.strptime_to_utc(max_bk_value), sync_start_time)
STATE = singer.write_bookmark(STATE, 'workflows', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE

Expand All @@ -994,6 +1024,9 @@ def sync_owners(STATE, ctx):
time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
# To handle records updated between start of the table sync and the end,
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in data:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= max_bk_value:
Expand All @@ -1002,7 +1035,9 @@ def sync_owners(STATE, ctx):
if record[bookmark_key] >= start:
singer.write_record("owners", record, catalog.get('stream_alias'), time_extracted=time_extracted)

STATE = singer.write_bookmark(STATE, 'owners', bookmark_key, max_bk_value)
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(utils.strptime_to_utc(max_bk_value), sync_start_time)
STATE = singer.write_bookmark(STATE, 'owners', bookmark_key, utils.strftime(new_bookmark))
singer.write_state(STATE)
return STATE

Expand Down
6 changes: 4 additions & 2 deletions tap_hubspot/tests/test_deals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from unittest.mock import patch, ANY


@patch('builtins.min')
@patch('tap_hubspot.Context.get_catalog_from_id', return_value={"metadata": ""})
@patch('singer.metadata.to_map', return_value={})
@patch('singer.utils.strptime_with_tz')
@patch('singer.utils.strftime')
@patch('tap_hubspot.load_schema')
@patch('tap_hubspot.gen_request', return_value=[])
def test_associations_are_not_validated(mocked_gen_request, mocked_catalog_from_id, mocked_metadata_map, mocked_utils_strptime, mocked_utils_strftime, mocked_load_schema):
def test_associations_are_not_validated(mocked_gen_request, mocked_catalog_from_id, mocked_metadata_map, mocked_utils_strptime, mocked_utils_strftime, mocked_load_schema, mocked_min):
# pylint: disable=unused-argument
sync_deals({}, mocked_catalog_from_id)

Expand All @@ -17,13 +18,14 @@ def test_associations_are_not_validated(mocked_gen_request, mocked_catalog_from_
mocked_gen_request.assert_called_once_with(ANY, ANY, ANY, expected_param, ANY, ANY, ANY, ANY, v3_fields=None)


@patch('builtins.min')
@patch('tap_hubspot.Context.get_catalog_from_id', return_value={"metadata": ""})
@patch('singer.metadata.to_map', return_value={"associations": {"selected": True}})
@patch('singer.utils.strptime_with_tz')
@patch('singer.utils.strftime')
@patch('tap_hubspot.load_schema')
@patch('tap_hubspot.gen_request', return_value=[])
def test_associations_are_validated(mocked_gen_request, mocked_catalog_from_id, mocked_metadata_map, mocked_utils_strptime, mocked_utils_strftime, mocked_load_schema):
def test_associations_are_validated(mocked_gen_request, mocked_catalog_from_id, mocked_metadata_map, mocked_utils_strptime, mocked_utils_strftime, mocked_load_schema, mocked_min):
# pylint: disable=unused-argument
sync_deals({}, mocked_catalog_from_id)

Expand Down
20 changes: 16 additions & 4 deletions tests/test_hubspot_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,28 @@ def test_run(self):
# Given streams does not contain proper replication-key value in the response.
if stream not in {"companies","deals","contacts_by_company","email_events"}:
# verify first sync bookmark value is max bookmark value
max_bk_value = actual_records_1[0].get(stream_replication_key)
for record in actual_records_1:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_1,
msg="First sync bookmark was incorrect, A record with greater replication-key value was found.")
if max_bk_value < replication_key_value:
max_bk_value = replication_key_value

# For few streams, test records updated before sync may have replication value
# greater than bookmark value probably due delayed records updates pickup by Hubspot
self.assertLessEqual(bookmark_1, max_bk_value,
msg="First sync bookmark value cannot be greater than max replication-key value")

# verify second sync bookmark value is max bookmark value
max_bk_value = actual_records_2[0].get(stream_replication_key)
for record in actual_records_2:
replication_key_value = record.get(stream_replication_key)
self.assertLessEqual(replication_key_value,bookmark_2,
msg="Second sync bookmark was incorrect, A record with greater replication-key value was found.")
if max_bk_value < replication_key_value:
max_bk_value = replication_key_value

# For few streams, test records updated before sync may have replication value
# greater than bookmark value probably due delayed records updates pickup by Hubspot
self.assertLessEqual(bookmark_2, max_bk_value,
msg="Second sync bookmark value cannot be greater than max replication-key value")

# verify only the new and updated records are captured checking record countx
self.assertGreater(actual_record_count_1, actual_record_count_2)
Expand Down
11 changes: 8 additions & 3 deletions tests/test_hubspot_interrupted_sync_offset.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,14 @@ def test_run(self):
synced_records_2 = runner.get_records_from_target_output()
state_2 = menagerie.get_state(conn_id)

# verify the uninterrupted sync and the simulated resuming sync end with the same bookmark values
with self.subTest(stream=stream):
self.assertEqual(state_1, state_2)
# Verify post-iterrupted sync bookmark should be greater than or equal to interrupted sync bookmark
# since newly created test records may get updated while stream is syncing
replication_keys = self.expected_replication_keys()
for stream in state_1.get('bookmarks'):
replication_key = list(replication_keys[stream])[0]
self.assertLessEqual(state_1["bookmarks"][stream].get(replication_key),
state_2["bookmarks"][stream].get(replication_key),
msg="First sync bookmark should not be greater than the second bookmark.")


class TestHubspotInterruptedSyncOffsetContacts(TestHubspotInterruptedSyncOffsetContactLists):
Expand Down