Skip to content

Commit

Permalink
Add currently syncing (#24)
Browse files Browse the repository at this point in the history
* Sync all customers for a given stream

* Add logging to see when we retry requests

* Update currently_syncing with customerId too. Write state as soon as we
update it

* Add the customerId to the bookmark keys

* Add shuffle for customerId and tap_stream_id; add shuffle unit tests

* Bug fix for when currently_syncing is null

* Fix exception handling typeError

* Fix none cases for currently_syncing

* Fix currently_syncing to write a tuple we can read in later

* Add get_customer_ids so we can use it in the tests

* Fix manipulated_state to account for customer_ids

* Update assertion for currently_syncing

* Fix currently syncing assertion

* Move bookmark access into Full Table assertions section

Full Table doesn't need the "stream_name and customer id" key logic

* Remove duplicate assertion

* Revert 6db016e

* Update bookmark to read stream->customer->replication_key

* Update tap to write bookmarks as stream->customer->replication_key

* Update manipulated state to nest stream->customer->replication_key

* Run bookmark assertions for every customer

* Fix dict comprehension typo

* Fix conflict with main

* Remove `get_state_key` again, use env var instead of hardcoded value

* Add missing dependency

* Move currently-syncing-null-out to the end of sync to prevent gaps

* Sort selected_streams and customers to guarantee consistency across runs

* Don't let the tap write (None, None)

* Sort selected_streams and customers effectively

* Update currently_syncing test assertions

* Add sort functions for streams and customers

* Update `shuffle` to handle a missing value

* Update unit tests to use sort_function, add a test for shuffling streams

* Add end date (#28)

* Add optional end date, add unit tests

Co-authored-by: Andy Lu <[email protected]>

* Test functions can't be named run_test apparently

* Rename do_thing

* Extract `get_queries_from_sync` as a function

* Remove unused variable

* Refactor tests to be more explicit

* Mock singer.utils.now to return a specific date

Co-authored-by: Andy Lu <[email protected]>

* add conversion_window test

* fixed conversion window unittests, bug removed

Co-authored-by: dylan-stitch <[email protected]>
Co-authored-by: Andy Lu <[email protected]>
Co-authored-by: kspeer <[email protected]>
  • Loading branch information
4 people committed Mar 15, 2022
1 parent 7555446 commit 1cee212
Show file tree
Hide file tree
Showing 7 changed files with 583 additions and 77 deletions.
38 changes: 23 additions & 15 deletions tap_google_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def generate_hash(record, metadata):
def should_give_up(ex):
if isinstance(ex, AttributeError):
if str(ex) == "'NoneType' object has no attribute 'Call'":
LOGGER.info('Retrying request due to AttributeError')
return False
return True

Expand All @@ -104,6 +105,7 @@ def should_give_up(ex):
internal_error = str(googleads_error.error_code.internal_error)
for err in [quota_error, internal_error]:
if err in retryable_errors:
LOGGER.info(f'Retrying request due to {err}')
return False
return True

Expand All @@ -115,8 +117,8 @@ def on_giveup_func(err):


@backoff.on_exception(backoff.expo,
[GoogleAdsException,
AttributeError],
(GoogleAdsException,
AttributeError),
max_tries=5,
jitter=None,
giveup=should_give_up,
Expand Down Expand Up @@ -289,7 +291,8 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u
stream_name = stream["stream"]
stream_mdata = stream["metadata"]
selected_fields = get_selected_fields(stream_mdata)
state = singer.set_currently_syncing(state, stream_name)
state = singer.set_currently_syncing(state, [stream_name, customer["customerId"]])
singer.write_state(state)

query = create_core_stream_query(resource_name, selected_fields)
try:
Expand All @@ -307,8 +310,6 @@ def sync(self, sdk_client, customer, stream, config, state): # pylint: disable=u

singer.write_record(stream_name, record)

state = singer.bookmarks.set_currently_syncing(state, None)


def get_query_date(start_date, bookmark, conversion_window_date):
"""Return a date within the conversion window and after start date
Expand Down Expand Up @@ -435,31 +436,40 @@ def sync(self, sdk_client, customer, stream, config, state):
stream_mdata = stream["metadata"]
selected_fields = get_selected_fields(stream_mdata)
replication_key = "date"
state = singer.set_currently_syncing(state, stream_name)
state = singer.set_currently_syncing(state, [stream_name, customer["customerId"]])
singer.write_state(state)

conversion_window = timedelta(
days=int(config.get("conversion_window") or DEFAULT_CONVERSION_WINDOW)
)
conversion_window_date = utils.now().replace(hour=0, minute=0, second=0, microsecond=0) - conversion_window

bookmark_object = singer.get_bookmark(state, stream["tap_stream_id"], customer["customerId"], default={})

bookmark_value = bookmark_object.get(replication_key)

query_date = get_query_date(
start_date=config["start_date"],
bookmark=singer.get_bookmark(state, stream_name, replication_key),
bookmark=bookmark_value,
conversion_window_date=singer.utils.strftime(conversion_window_date)
)
end_date = utils.now()

end_date = config.get("end_date")
if end_date:
end_date = utils.strptime_to_utc(end_date)
else:
end_date = utils.now()

if stream_name in REPORTS_WITH_90_DAY_MAX:
cutoff = end_date.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=90)
query_date = max(query_date, cutoff)
if query_date == cutoff:
LOGGER.info(f"Stream: {stream_name} supports only 90 days of data. Setting query date to {utils.strftime(query_date, '%Y-%m-%d')}.")

singer.write_state(state)

if selected_fields == {'segments.date'}:
raise Exception(f"Selected fields is currently limited to {', '.join(selected_fields)}. Please select at least one attribute and metric in order to replicate {stream_name}.")

while query_date < end_date:
while query_date <= end_date:
query = create_report_query(resource_name, selected_fields, query_date)
LOGGER.info(f"Requesting {stream_name} data for {utils.strftime(query_date, '%Y-%m-%d')}.")

Expand All @@ -481,15 +491,13 @@ def sync(self, sdk_client, customer, stream, config, state):

singer.write_record(stream_name, record)

singer.write_bookmark(state, stream_name, replication_key, utils.strftime(query_date))
new_bookmark_value = {replication_key: utils.strftime(query_date)}
singer.write_bookmark(state, stream["tap_stream_id"], customer["customerId"], new_bookmark_value)

singer.write_state(state)

query_date += timedelta(days=1)

state = singer.bookmarks.set_currently_syncing(state, None)
singer.write_state(state)


def initialize_core_streams(resource_schema):
return {
Expand Down
85 changes: 77 additions & 8 deletions tap_google_ads/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,98 @@
LOGGER = singer.get_logger()


def get_currently_syncing(state):
currently_syncing = state.get("currently_syncing")

if not currently_syncing:
currently_syncing = (None, None)

resuming_stream, resuming_customer = currently_syncing
return resuming_stream, resuming_customer


def sort_customers(customers):
return sorted(customers, key=lambda x: x["customerId"])

def sort_selected_streams(sort_list):
return sorted(sort_list, key=lambda x: x["tap_stream_id"])


def shuffle(shuffle_list, shuffle_key, current_value, sort_function):
"""Return `shuffle_list` with `current_value` at the front of the list
In the scenario where `current_value` is not in `shuffle_list`:
- Assume that we have a consistent ordering to `shuffle_list`
- Insert the `current_value` into `shuffle_list`
- Sort the new list
- Do the normal logic to shuffle the list
- Return the new shuffled list without the `current_value` we inserted"""

fallback = False
if current_value not in [item[shuffle_key] for item in shuffle_list]:
fallback = True
shuffle_list.append({shuffle_key: current_value})
shuffle_list = sort_function(shuffle_list)

matching_index = 0
for i, key in enumerate(shuffle_list):
if key[shuffle_key] == current_value:
matching_index = i
break
top_half = shuffle_list[matching_index:]
bottom_half = shuffle_list[:matching_index]

if fallback:
return top_half[1:] + bottom_half

return top_half + bottom_half


def do_sync(config, catalog, resource_schema, state):
# QA ADDED WORKAROUND [START]
try:
customers = json.loads(config["login_customer_ids"])
except TypeError: # falling back to raw value
customers = config["login_customer_ids"]
# QA ADDED WORKAROUND [END]
customers = sort_customers(customers)

selected_streams = [
stream
for stream in catalog["streams"]
if singer.metadata.to_map(stream["metadata"])[()].get("selected")
]
selected_streams = sort_selected_streams(selected_streams)

core_streams = initialize_core_streams(resource_schema)
report_streams = initialize_reports(resource_schema)
resuming_stream, resuming_customer = get_currently_syncing(state)

for customer in customers:
LOGGER.info(f"Syncing customer Id {customer['customerId']} ...")
sdk_client = create_sdk_client(config, customer["loginCustomerId"])
for catalog_entry in selected_streams:
stream_name = catalog_entry["stream"]
mdata_map = singer.metadata.to_map(catalog_entry["metadata"])
if resuming_stream:
selected_streams = shuffle(
selected_streams,
"tap_stream_id",
resuming_stream,
sort_function=sort_selected_streams
)

primary_key = mdata_map[()].get("table-key-properties", [])
singer.messages.write_schema(stream_name, catalog_entry["schema"], primary_key)
if resuming_customer:
customers = shuffle(
customers,
"customerId",
resuming_customer,
sort_function=sort_customers
)

for catalog_entry in selected_streams:
stream_name = catalog_entry["stream"]
mdata_map = singer.metadata.to_map(catalog_entry["metadata"])

primary_key = mdata_map[()].get("table-key-properties", [])
singer.messages.write_schema(stream_name, catalog_entry["schema"], primary_key)

for customer in customers:
sdk_client = create_sdk_client(config, customer["loginCustomerId"])

LOGGER.info(f"Syncing {stream_name} for customer Id {customer['customerId']}.")

Expand All @@ -43,3 +109,6 @@ def do_sync(config, catalog, resource_schema, state):
stream_obj = report_streams[stream_name]

stream_obj.sync(sdk_client, customer, catalog_entry, config, state)

state.pop("currently_syncing")
singer.write_state(state)
9 changes: 7 additions & 2 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ def get_type():
"""the expected url route ending"""
return "platform.google-ads"

def get_customer_ids(self):
return [
os.getenv('TAP_GOOGLE_ADS_CUSTOMER_ID'),
os.getenv('TAP_GOOGLE_ADS_LOGIN_CUSTOMER_ID'),
]

def get_properties(self, original: bool = True):
"""Configurable properties, with a switch to override the 'start_date' property"""
return_value = {
'start_date': '2021-12-01T00:00:00Z',
'user_id': 'not used?', # TODO ?
'customer_ids': ",".join((os.getenv('TAP_GOOGLE_ADS_CUSTOMER_ID'),os.getenv('TAP_GOOGLE_ADS_LOGIN_CUSTOMER_ID'))),
'customer_ids': ','.join(self.get_customer_ids()),
# 'conversion_window_days': '30',
'login_customer_ids': [{"customerId": os.getenv('TAP_GOOGLE_ADS_CUSTOMER_ID'),
"loginCustomerId": os.getenv('TAP_GOOGLE_ADS_LOGIN_CUSTOMER_ID'),}],

}

# TODO_TDL-17911 Add a test around conversion_window_days
Expand Down
Loading

0 comments on commit 1cee212

Please sign in to comment.