Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use Composite API for Discover (redo) #72

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 135 additions & 118 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,145 +141,162 @@ def do_discover(sf: Salesforce, streams: list[str]): # noqa: C901
sf_custom_setting_objects = []
object_to_tag_references = {}

# For each SF Object describe it, loop its fields and build a schema
entries = []
# Build batches of SF sbjects. 25 per batch.
sobject_batches = []
batch = []
for sobject_name in objects_to_discover:
# Skip blacklisted SF objects depending on the api_type in use
# ChangeEvent objects are not queryable via Bulk or REST (undocumented)
if sobject_name in sf.get_blacklisted_objects() or sobject_name.endswith("ChangeEvent"):
continue

sobject_description = sf.describe(sobject_name)
batch.append(sobject_name)
if len(batch) == 25:
sobject_batches.append(batch)
batch = []

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None,
)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"][0]] = sobject_name

fields = sobject_description["fields"]
replication_key = get_replication_key(sobject_name, fields)

unsupported_fields = set()
properties = {}
mdata = metadata.new()

found_id_field = False

# Loop over the object's fields
for f in fields:
field_name = f["name"]
field_type = f["type"] # noqa: F841

if field_name == "Id":
found_id_field = True

property_schema, mdata = create_property_schema(f, mdata)

# Compound Address fields cannot be queried by the Bulk API
if f["type"] in ("address", "location") and sf.api_type in [
tap_salesforce.salesforce.BULK_API_TYPE,
tap_salesforce.salesforce.BULK2_API_TYPE,
]:
unsupported_fields.add((field_name, "cannot query compound address fields with bulk API"))

# we haven't been able to observe any records with a json field, so we
# are marking it as unavailable until we have an example to work with
if f["type"] == "json":
unsupported_fields.add(
(
field_name,
"do not currently support json fields - please contact support",
)
if len(batch) > 0:
sobject_batches.append(batch)

# For each SF Object describe it, loop its fields and build a schema
entries = []
for batch in sobject_batches:
sobject_descriptions = sf.describe(batch)

for subrequest_result in sobject_descriptions:
sobject_description = subrequest_result["result"]
sobject_name = sobject_description["name"]

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None,
)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"][0]] = sobject_name

fields = sobject_description["fields"]
replication_key = get_replication_key(sobject_name, fields)

unsupported_fields = set()
properties = {}
mdata = metadata.new()

found_id_field = False

# Loop over the object's fields
for f in fields:
field_name = f["name"]
field_type = f["type"] # noqa: F841

if field_name == "Id":
found_id_field = True

property_schema, mdata = create_property_schema(f, mdata)

# Compound Address fields cannot be queried by the Bulk API
if f["type"] in ("address", "location") and sf.api_type in [
tap_salesforce.salesforce.BULK_API_TYPE,
tap_salesforce.salesforce.BULK2_API_TYPE,
]:
unsupported_fields.add((field_name, "cannot query compound address fields with bulk API"))

# we haven't been able to observe any records with a json field, so we
# are marking it as unavailable until we have an example to work with
if f["type"] == "json":
unsupported_fields.add(
(
field_name,
"do not currently support json fields - please contact support",
)
)

# Blacklisted fields are dependent on the api_type being used
field_pair = (sobject_name, field_name)
if field_pair in sf.get_blacklisted_fields():
unsupported_fields.add((field_name, sf.get_blacklisted_fields()[field_pair]))
# Blacklisted fields are dependent on the api_type being used
field_pair = (sobject_name, field_name)
if field_pair in sf.get_blacklisted_fields():
unsupported_fields.add((field_name, sf.get_blacklisted_fields()[field_pair]))

inclusion = metadata.get(mdata, ("properties", field_name), "inclusion")
inclusion = metadata.get(mdata, ("properties", field_name), "inclusion")

if sf.select_fields_by_default and inclusion != "unsupported":
mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True)
if sf.select_fields_by_default and inclusion != "unsupported":
mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True)

properties[field_name] = property_schema
properties[field_name] = property_schema

if replication_key:
mdata = metadata.write(mdata, ("properties", replication_key), "inclusion", "automatic")
if replication_key:
mdata = metadata.write(mdata, ("properties", replication_key), "inclusion", "automatic")

# There are cases where compound fields are referenced by the associated
# subfields but are not actually present in the field list
field_name_set = {f["name"] for f in fields}
filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set]
missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set]
# There are cases where compound fields are referenced by the associated
# subfields but are not actually present in the field list
field_name_set = {f["name"] for f in fields}
filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set]
missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set]

if missing_unsupported_field_names:
LOGGER.info(
"Ignoring the following unsupported fields for object %s as they are missing from the field list: %s",
sobject_name,
", ".join(sorted(missing_unsupported_field_names)),
)

if filtered_unsupported_fields:
LOGGER.info(
"Not syncing the following unsupported fields for object %s: %s",
sobject_name,
", ".join(sorted([k for k, _ in filtered_unsupported_fields])),
)
if missing_unsupported_field_names:
LOGGER.info(
"Ignoring the following unsupported fields for object %s as they are missing"
+ "from the field list: %s",
sobject_name,
", ".join(sorted(missing_unsupported_field_names)),
)

# Salesforce Objects are skipped when they do not have an Id field
if not found_id_field:
LOGGER.info("Skipping Salesforce Object %s, as it has no Id field", sobject_name)
continue
if filtered_unsupported_fields:
LOGGER.info(
"Not syncing the following unsupported fields for object %s: %s",
sobject_name,
", ".join(sorted([k for k, _ in filtered_unsupported_fields])),
)

# Any property added to unsupported_fields has metadata generated and
# removed
for prop, description in filtered_unsupported_fields:
if metadata.get(mdata, ("properties", prop), "selected-by-default"):
metadata.delete(mdata, ("properties", prop), "selected-by-default")

mdata = metadata.write(mdata, ("properties", prop), "unsupported-description", description)
mdata = metadata.write(mdata, ("properties", prop), "inclusion", "unsupported")

if replication_key:
mdata = metadata.write(mdata, (), "valid-replication-keys", [replication_key])
mdata = metadata.write(mdata, (), "replication-key", replication_key)
mdata = metadata.write(mdata, (), "replication-method", "INCREMENTAL")
else:
mdata = metadata.write(
mdata,
(),
"forced-replication-method",
{
"replication-method": "FULL_TABLE",
"reason": "No replication keys found from the Salesforce API",
},
)
# Salesforce Objects are skipped when they do not have an Id field
if not found_id_field:
LOGGER.info("Skipping Salesforce Object %s, as it has no Id field", sobject_name)
continue

# Any property added to unsupported_fields has metadata generated and
# removed
for prop, description in filtered_unsupported_fields:
if metadata.get(mdata, ("properties", prop), "selected-by-default"):
metadata.delete(mdata, ("properties", prop), "selected-by-default")

mdata = metadata.write(mdata, ("properties", prop), "unsupported-description", description)
mdata = metadata.write(mdata, ("properties", prop), "inclusion", "unsupported")

if replication_key:
mdata = metadata.write(mdata, (), "valid-replication-keys", [replication_key])
mdata = metadata.write(mdata, (), "replication-key", replication_key)
mdata = metadata.write(mdata, (), "replication-method", "INCREMENTAL")
else:
mdata = metadata.write(
mdata,
(),
"forced-replication-method",
{
"replication-method": "FULL_TABLE",
"reason": "No replication keys found from the Salesforce API",
},
)

mdata = metadata.write(mdata, (), "table-key-properties", key_properties)
mdata = metadata.write(mdata, (), "table-key-properties", key_properties)

schema = {
"type": "object",
"additionalProperties": False,
"properties": properties,
}
schema = {
"type": "object",
"additionalProperties": False,
"properties": properties,
}

entry = {
"stream": sobject_name,
"tap_stream_id": sobject_name,
"schema": schema,
"metadata": metadata.to_list(mdata),
}
entry = {
"stream": sobject_name,
"tap_stream_id": sobject_name,
"schema": schema,
"metadata": metadata.to_list(mdata),
}

entries.append(entry)
entries.append(entry)

# For each custom setting field, remove its associated tag from entries
# See Blacklisting.md for more information
Expand Down
26 changes: 24 additions & 2 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import re
from datetime import timedelta

Expand Down Expand Up @@ -332,20 +333,41 @@ def describe(self, sobject=None):
"""Describes all objects or a specific object"""
headers = self.auth.rest_headers
instance_url = self.auth.instance_url
body = None
method = "GET"
if sobject is None:
endpoint = "sobjects"
endpoint_tag = "sobjects"
url = self.data_url.format(instance_url, endpoint)
elif isinstance(sobject, list):
batch_length = len(sobject)
if batch_length > 25:
raise TapSalesforceExceptionError(f"Composite limited to 25 sObjects per batch. ({batch_length}).")
endpoint = "composite/batch"
endpoint_tag = "CompositeBatch"
url = self.data_url.format(instance_url, endpoint)
method = "POST"
headers["Content-Type"] = "application/json"
composite_subrequests = []
for obj in sobject:
sub_endpoint = f"sobjects/{obj}/describe"
sub_url = self.data_url.format("", sub_endpoint)
subrequest = {"method": "GET", "url": sub_url}
composite_subrequests.append(subrequest)
body = json.dumps({"batchRequests": composite_subrequests})
else:
endpoint = f"sobjects/{sobject}/describe"
endpoint_tag = sobject
url = self.data_url.format(instance_url, endpoint)

with metrics.http_request_timer("describe") as timer:
timer.tags["endpoint"] = endpoint_tag
resp = self._make_request("GET", url, headers=headers)
resp = self._make_request(method, url, headers=headers, body=body)

return resp.json()
if isinstance(sobject, list):
return resp.json()["results"]
else:
return resp.json()

# pylint: disable=no-self-use
def _get_selected_properties(self, catalog_entry):
Expand Down