Skip to content

Commit

Permalink
Merge pull request #14 from MeltanoLabs/census_child_fail
Browse files Browse the repository at this point in the history
Authentication for retries fixed, schema issues fixed
  • Loading branch information
visch authored Jan 16, 2025
2 parents dcf33d9 + 2c907b1 commit 72cc180
Show file tree
Hide file tree
Showing 8 changed files with 1,612 additions and 88 deletions.
2 changes: 0 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ plugins:
- name: client_secret
sensitive: true
- name: clinic_id
select:
- opportunity_timeline.*
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
41 changes: 15 additions & 26 deletions tap_sunwave/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ class SunwaveStream(RESTStream):
"""Sunwave stream class."""

url_base = "https://emr.sunwavehealth.com/SunwaveEMR"
auth_errors = 0
@property
def schema_filepath(self):
return SCHEMAS_DIR / f"{self.name}.json"

def _request(self, prepared_request: requests.PreparedRequest, context: Context | None) -> requests.Response:
"""Auth wasn't getting ran for every retried request, so we need to do it here"""
# Manually run authenticator before sending the request
reauthed_request = self.authenticator.authenticate_request(prepared_request)
# Then call the parent's method with our newly authenticated request
return super()._request(reauthed_request, context)

@cached_property
def authenticator(self) -> Auth:
Expand Down Expand Up @@ -71,17 +82,14 @@ def validate_response(self, response: requests.Response) -> None:
# Check if response is a dict and has error
if isinstance(json_response, dict) and json_response.get("error"):
error_msg = self.response_error_message(response)
# TODO: figure out how to handle this better
if self.name == "opportunity_timeline":
log_msg = f"Skipping 'opportunity_timeline' record due to {error_msg}"
self.logger.info(log_msg)
else:
raise FatalAPIError(error_msg)
raise FatalAPIError(error_msg)
except requests.exceptions.JSONDecodeError as e:
# Their API returns a 200 status code when there's an error
# We detect that by noticing the response isn't valid JSON
msg = self.response_error_message(response)
raise FatalAPIError(msg)
# Reset auth errors
self.auth_errors = 0

def _cleanup_schema(self, schema_fragment):
if isinstance(schema_fragment, dict):
Expand All @@ -100,23 +108,4 @@ def _cleanup_schema(self, schema_fragment):

elif isinstance(schema_fragment, list):
for item in schema_fragment:
self._cleanup_schema(item)

def _get_swagger_schema(self, ref: str) -> dict:
"""Get schema from packaged swagger file."""
# Use package resources to access swagger.json
with resources.files(__package__).joinpath("docs/swagger.json").open("r", encoding="utf-8") as f:
swagger_doc = json.load(f)

# Example: ref => "#/components/schemas/FormStandardResponse"
# Strip the leading "#/" and split by "/"
path_parts = ref.lstrip("#/").split("/")

# Traverse the swagger doc to get the nested object
data = swagger_doc
for part in path_parts:
data = data[part]

cleaned_schema = self._cleanup_schema(data["properties"])
assert len(data["properties"]) > 1, "No properties found in schema"
return data
self._cleanup_schema(item)
Loading

0 comments on commit 72cc180

Please sign in to comment.