From 187f6367dac3c9dd1d200d5f9697534e20f674c1 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Thu, 9 Jan 2025 14:31:21 -0600 Subject: [PATCH 1/3] use composite api for discover --- tap_salesforce/__init__.py | 252 ++++++++++++++------------ tap_salesforce/salesforce/__init__.py | 26 ++- 2 files changed, 158 insertions(+), 120 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 950f595..a270deb 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -141,145 +141,161 @@ def do_discover(sf: Salesforce, streams: list[str]): # noqa: C901 sf_custom_setting_objects = [] object_to_tag_references = {} - # For each SF Object describe it, loop its fields and build a schema - entries = [] + # Build batches of SF sbjects. 25 per batch. + sobject_batches = [] + batch = [] for sobject_name in objects_to_discover: # Skip blacklisted SF objects depending on the api_type in use # ChangeEvent objects are not queryable via Bulk or REST (undocumented) if sobject_name in sf.get_blacklisted_objects() or sobject_name.endswith("ChangeEvent"): continue - sobject_description = sf.describe(sobject_name) + batch.append(sobject_name) + if len(batch) == 25: + sobject_batches.append(batch) + batch = [] - # Cache customSetting and Tag objects to check for blacklisting after - # all objects have been described - if sobject_description.get("customSetting"): - sf_custom_setting_objects.append(sobject_name) - elif sobject_name.endswith("__Tag"): - relationship_field = next( - (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), - None, - ) - if relationship_field: - # Map {"Object":"Object__Tag"} - object_to_tag_references[relationship_field["referenceTo"][0]] = sobject_name - - fields = sobject_description["fields"] - replication_key = get_replication_key(sobject_name, fields) - - unsupported_fields = set() - properties = {} - mdata = metadata.new() - - found_id_field = False - - # Loop over the object's fields - for f in fields: - field_name = f["name"] - field_type = f["type"] # noqa: F841 - - if field_name == "Id": - found_id_field = True - - property_schema, mdata = create_property_schema(f, mdata) - - # Compound Address fields cannot be queried by the Bulk API - if f["type"] in ("address", "location") and sf.api_type in [ - tap_salesforce.salesforce.BULK_API_TYPE, - tap_salesforce.salesforce.BULK2_API_TYPE, - ]: - unsupported_fields.add((field_name, "cannot query compound address fields with bulk API")) - - # we haven't been able to observe any records with a json field, so we - # are marking it as unavailable until we have an example to work with - if f["type"] == "json": - unsupported_fields.add( - ( - field_name, - "do not currently support json fields - please contact support", - ) + if len(batch) > 0: + sobject_batches.append(batch) + + # For each SF Object describe it, loop its fields and build a schema + entries = [] + for batch in sobject_batches: + sobject_descriptions = sf.describe(batch) + + for subrequest_result in sobject_descriptions: + sobject_description = subrequest_result["result"] + sobject_name = sobject_description["name"] + + # Cache customSetting and Tag objects to check for blacklisting after + # all objects have been described + if sobject_description.get("customSetting"): + sf_custom_setting_objects.append(sobject_name) + elif sobject_name.endswith("__Tag"): + relationship_field = next( + (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), + None, ) + if relationship_field: + # Map {"Object":"Object__Tag"} + object_to_tag_references[relationship_field["referenceTo"][0]] = sobject_name + + fields = sobject_description["fields"] + replication_key = get_replication_key(sobject_name, fields) + + unsupported_fields = set() + properties = {} + mdata = metadata.new() + + found_id_field = False + + # Loop over the object's fields + for f in fields: + field_name = f["name"] + field_type = f["type"] # noqa: F841 + + if field_name == "Id": + found_id_field = True + + property_schema, mdata = create_property_schema(f, mdata) + + # Compound Address fields cannot be queried by the Bulk API + if f["type"] in ("address", "location") and sf.api_type in [ + tap_salesforce.salesforce.BULK_API_TYPE, + tap_salesforce.salesforce.BULK2_API_TYPE, + ]: + unsupported_fields.add((field_name, "cannot query compound address fields with bulk API")) + + # we haven't been able to observe any records with a json field, so we + # are marking it as unavailable until we have an example to work with + if f["type"] == "json": + unsupported_fields.add( + ( + field_name, + "do not currently support json fields - please contact support", + ) + ) - # Blacklisted fields are dependent on the api_type being used - field_pair = (sobject_name, field_name) - if field_pair in sf.get_blacklisted_fields(): - unsupported_fields.add((field_name, sf.get_blacklisted_fields()[field_pair])) + # Blacklisted fields are dependent on the api_type being used + field_pair = (sobject_name, field_name) + if field_pair in sf.get_blacklisted_fields(): + unsupported_fields.add((field_name, sf.get_blacklisted_fields()[field_pair])) - inclusion = metadata.get(mdata, ("properties", field_name), "inclusion") + inclusion = metadata.get(mdata, ("properties", field_name), "inclusion") - if sf.select_fields_by_default and inclusion != "unsupported": - mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True) + if sf.select_fields_by_default and inclusion != "unsupported": + mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True) - properties[field_name] = property_schema + properties[field_name] = property_schema - if replication_key: - mdata = metadata.write(mdata, ("properties", replication_key), "inclusion", "automatic") + if replication_key: + mdata = metadata.write(mdata, ("properties", replication_key), "inclusion", "automatic") - # There are cases where compound fields are referenced by the associated - # subfields but are not actually present in the field list - field_name_set = {f["name"] for f in fields} - filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set] - missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set] + # There are cases where compound fields are referenced by the associated + # subfields but are not actually present in the field list + field_name_set = {f["name"] for f in fields} + filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set] + missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set] - if missing_unsupported_field_names: - LOGGER.info( - "Ignoring the following unsupported fields for object %s as they are missing from the field list: %s", - sobject_name, - ", ".join(sorted(missing_unsupported_field_names)), - ) - - if filtered_unsupported_fields: - LOGGER.info( - "Not syncing the following unsupported fields for object %s: %s", - sobject_name, - ", ".join(sorted([k for k, _ in filtered_unsupported_fields])), - ) + if missing_unsupported_field_names: + LOGGER.info( + "Ignoring the following unsupported fields for object %s as they are missing from the field list: %s", + sobject_name, + ", ".join(sorted(missing_unsupported_field_names)), + ) - # Salesforce Objects are skipped when they do not have an Id field - if not found_id_field: - LOGGER.info("Skipping Salesforce Object %s, as it has no Id field", sobject_name) - continue + if filtered_unsupported_fields: + LOGGER.info( + "Not syncing the following unsupported fields for object %s: %s", + sobject_name, + ", ".join(sorted([k for k, _ in filtered_unsupported_fields])), + ) - # Any property added to unsupported_fields has metadata generated and - # removed - for prop, description in filtered_unsupported_fields: - if metadata.get(mdata, ("properties", prop), "selected-by-default"): - metadata.delete(mdata, ("properties", prop), "selected-by-default") - - mdata = metadata.write(mdata, ("properties", prop), "unsupported-description", description) - mdata = metadata.write(mdata, ("properties", prop), "inclusion", "unsupported") - - if replication_key: - mdata = metadata.write(mdata, (), "valid-replication-keys", [replication_key]) - mdata = metadata.write(mdata, (), "replication-key", replication_key) - mdata = metadata.write(mdata, (), "replication-method", "INCREMENTAL") - else: - mdata = metadata.write( - mdata, - (), - "forced-replication-method", - { - "replication-method": "FULL_TABLE", - "reason": "No replication keys found from the Salesforce API", - }, - ) + # Salesforce Objects are skipped when they do not have an Id field + if not found_id_field: + LOGGER.info("Skipping Salesforce Object %s, as it has no Id field", sobject_name) + continue + + # Any property added to unsupported_fields has metadata generated and + # removed + for prop, description in filtered_unsupported_fields: + if metadata.get(mdata, ("properties", prop), "selected-by-default"): + metadata.delete(mdata, ("properties", prop), "selected-by-default") + + mdata = metadata.write(mdata, ("properties", prop), "unsupported-description", description) + mdata = metadata.write(mdata, ("properties", prop), "inclusion", "unsupported") + + if replication_key: + mdata = metadata.write(mdata, (), "valid-replication-keys", [replication_key]) + mdata = metadata.write(mdata, (), "replication-key", replication_key) + mdata = metadata.write(mdata, (), "replication-method", "INCREMENTAL") + else: + mdata = metadata.write( + mdata, + (), + "forced-replication-method", + { + "replication-method": "FULL_TABLE", + "reason": "No replication keys found from the Salesforce API", + }, + ) - mdata = metadata.write(mdata, (), "table-key-properties", key_properties) + mdata = metadata.write(mdata, (), "table-key-properties", key_properties) - schema = { - "type": "object", - "additionalProperties": False, - "properties": properties, - } + schema = { + "type": "object", + "additionalProperties": False, + "properties": properties, + } - entry = { - "stream": sobject_name, - "tap_stream_id": sobject_name, - "schema": schema, - "metadata": metadata.to_list(mdata), - } + entry = { + "stream": sobject_name, + "tap_stream_id": sobject_name, + "schema": schema, + "metadata": metadata.to_list(mdata), + } - entries.append(entry) + entries.append(entry) # For each custom setting field, remove its associated tag from entries # See Blacklisting.md for more information diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 60f4de9..2e64f3e 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -1,3 +1,4 @@ +import json import re from datetime import timedelta @@ -332,10 +333,28 @@ def describe(self, sobject=None): """Describes all objects or a specific object""" headers = self.auth.rest_headers instance_url = self.auth.instance_url + body = None + method = "GET" if sobject is None: endpoint = "sobjects" endpoint_tag = "sobjects" url = self.data_url.format(instance_url, endpoint) + elif isinstance(sobject, list): + batch_length = len(sobject) + if batch_length > 25: + raise TapSalesforceException(f"Composite describe limited to 25 sObjects per batch. Given list of {batch_length}.") + endpoint = "composite/batch" + endpoint_tag = "CompositeBatch" + url = self.data_url.format(instance_url, endpoint) + method = "POST" + headers['Content-Type'] = 'application/json' + composite_subrequests = [] + for obj in sobject: + sub_endpoint = f"sobjects/{obj}/describe" + sub_url = self.data_url.format("", sub_endpoint) + subrequest = {"method": "GET", "url": sub_url} + composite_subrequests.append(subrequest) + body = json.dumps({"batchRequests": composite_subrequests}) else: endpoint = f"sobjects/{sobject}/describe" endpoint_tag = sobject @@ -343,9 +362,12 @@ def describe(self, sobject=None): with metrics.http_request_timer("describe") as timer: timer.tags["endpoint"] = endpoint_tag - resp = self._make_request("GET", url, headers=headers) + resp = self._make_request(method, url, headers=headers, body=body) - return resp.json() + if isinstance(sobject, list): + return resp.json()['results'] + else: + return resp.json() # pylint: disable=no-self-use def _get_selected_properties(self, catalog_entry): From 824f190b8f72ba932a6bbc13689e7de09ef5e442 Mon Sep 17 00:00:00 2001 From: andyoneal Date: Thu, 9 Jan 2025 16:35:35 -0600 Subject: [PATCH 2/3] ruff fixes. exception class. --- tap_salesforce/__init__.py | 3 ++- tap_salesforce/salesforce/__init__.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index a270deb..10c4745 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -239,7 +239,8 @@ def do_discover(sf: Salesforce, streams: list[str]): # noqa: C901 if missing_unsupported_field_names: LOGGER.info( - "Ignoring the following unsupported fields for object %s as they are missing from the field list: %s", + "Ignoring the following unsupported fields for object %s as they are missing" + + "from the field list: %s", sobject_name, ", ".join(sorted(missing_unsupported_field_names)), ) diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 2e64f3e..0275ce9 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -342,7 +342,7 @@ def describe(self, sobject=None): elif isinstance(sobject, list): batch_length = len(sobject) if batch_length > 25: - raise TapSalesforceException(f"Composite describe limited to 25 sObjects per batch. Given list of {batch_length}.") + raise TapSalesforceExceptionError(f"Composite limited to 25 sObjects per batch. ({batch_length}).") endpoint = "composite/batch" endpoint_tag = "CompositeBatch" url = self.data_url.format(instance_url, endpoint) From 9c1846ac498aac0ae8b821be62d88be5fd491c7e Mon Sep 17 00:00:00 2001 From: andyoneal Date: Thu, 9 Jan 2025 16:37:04 -0600 Subject: [PATCH 3/3] double quotes --- tap_salesforce/salesforce/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 0275ce9..2090393 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -347,7 +347,7 @@ def describe(self, sobject=None): endpoint_tag = "CompositeBatch" url = self.data_url.format(instance_url, endpoint) method = "POST" - headers['Content-Type'] = 'application/json' + headers["Content-Type"] = "application/json" composite_subrequests = [] for obj in sobject: sub_endpoint = f"sobjects/{obj}/describe" @@ -365,7 +365,7 @@ def describe(self, sobject=None): resp = self._make_request(method, url, headers=headers, body=body) if isinstance(sobject, list): - return resp.json()['results'] + return resp.json()["results"] else: return resp.json()