Skip to content

Commit

Permalink
Add exception handling for recent data scraping airflow operators (#3607
Browse files Browse the repository at this point in the history
)

* add exception handling for recent data scraping airflow operators

* add placeholder dag import timeout variable for local development

* add exception handling for dynamic xlsx ursl scraping

* unnest exception handling, break out operations to be more focused, add description
  • Loading branch information
charlie-costanzo authored Feb 4, 2025
1 parent 66999d7 commit 0f4ed72
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 191 deletions.
95 changes: 78 additions & 17 deletions airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import requests
from bs4 import BeautifulSoup
from pydantic import HttpUrl, parse_obj_as
from pydantic import HttpUrl, ValidationError, parse_obj_as

from airflow.exceptions import AirflowException

xlsx_urls = {
"ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release",
Expand All @@ -19,34 +21,93 @@
}


# pushes the scraped URL value to XCom
def push_url_to_xcom(key, scraped_url, context):
task_instance = context["ti"]
task_instance.xcom_push(key=key, value=scraped_url)
"""Push the scraped URL value to XCom with proper error handling."""
task_instance = context.get("ti")
if task_instance is None:
raise AirflowException("Task instance not found in context")

try:
task_instance.xcom_push(key=key, value=scraped_url)
except Exception as e:
logging.error(f"Error pushing URL to XCom for key {key}: {e}")
raise AirflowException(f"Failed to push URL to XCom: {e}")


# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
def href_matcher(href):
"""Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'"""
return (
href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx")
)


def scrape_ntd_xlsx_urls(**context):
for key, value in xlsx_urls.items():
url = value
req = requests.get(url)
soup = BeautifulSoup(req.text, "html.parser")
def make_http_request(url, key):
"""Make HTTP request with proper error handling."""
try:
response = requests.get(url)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred while fetching {url}: {e}")
raise AirflowException(f"HTTP error for {key}: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"Error occurred while fetching {url}: {e}")
raise AirflowException(f"Request failed for {key}: {e}")


def parse_html_content(response_text, url, key):
"""Parse HTML content with error handling."""
try:
return BeautifulSoup(response_text, "html.parser")
except Exception as e:
logging.error(f"Error parsing HTML for {url}: {e}")
raise AirflowException(f"HTML parsing failed for {key}: {e}")


link = soup.find("a", href=href_matcher)
def find_and_validate_xlsx_link(soup, key, url):
"""Find and validate XLSX download link."""
link = soup.find("a", href=href_matcher)
if not link:
error_msg = f"No XLSX download link found for {key} at {url}"
logging.error(error_msg)
raise AirflowException(error_msg)

file_link = link.get("href")
if not file_link:
error_msg = f"Found link for {key} but href attribute is missing"
logging.error(error_msg)
raise AirflowException(error_msg)

updated_url = f"https://www.transit.dot.gov{file_link}"
try:
return parse_obj_as(HttpUrl, updated_url)
except ValidationError as e:
logging.error(f"URL validation failed for {updated_url}: {e}")
raise AirflowException(f"Invalid URL constructed for {key}: {e}")


def scrape_ntd_xlsx_urls(**context):
"""Main function to scrape XLSX URLs and push them to XCom."""
for key, url in xlsx_urls.items():
try:
# Make HTTP request
response = make_http_request(url, key)

# Extract the href if the link is found
file_link = link["href"] if link else None
# Parse HTML content
soup = parse_html_content(response.text, url, key)

updated_url = f"https://www.transit.dot.gov{file_link}"
# Find and validate XLSX link
validated_url = find_and_validate_xlsx_link(soup, key, url)

validated_url = parse_obj_as(HttpUrl, updated_url)
logging.info(f"Successfully validated URL for {key}: {validated_url}")

logging.info(f"Validated URL: {validated_url}.")
# Push to XCom
push_url_to_xcom(key=key, scraped_url=validated_url, context=context)

push_url_to_xcom(key=key, scraped_url=validated_url, context=context)
except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
# Log any unhandled exceptions and re-raise as AirflowException
logging.error(f"Unexpected error processing {key}: {e}")
raise AirflowException(f"Failed to process {key}: {e}")
2 changes: 2 additions & 0 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ x-airflow-common:
AIRFLOW__CORE__BASE_LOG_FOLDER: /opt/airflow/gcs/logs
AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/gcs/plugins
AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: 'true'
# AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 120


# this option prevents a DAG from trying to run all dagruns back to its
# start date. this lets you it spin up docker, unpause a dag, and just
Expand Down
108 changes: 77 additions & 31 deletions airflow/plugins/operators/scrape_ntd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from pydantic import HttpUrl, parse_obj_as

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator # type: ignore

API_BUCKET = os.environ["CALITP_BUCKET__NTD_API_DATA_PRODUCTS"]
Expand Down Expand Up @@ -40,37 +41,52 @@ def filename(self) -> str:
class Config:
arbitrary_types_allowed = True

def fetch_from_ntd_api(self):
""" """

logging.info(f"Downloading NTD data for {self.year} / {self.product}.")
def _make_api_request(self, url: str) -> bytes:
"""Make API request with proper error handling."""
try:
response = requests.get(url)
response.raise_for_status()
return response.content
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise AirflowException(f"HTTP error in NTD API request: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"Request error occurred: {e}")
raise AirflowException(f"Error in NTD API request: {e}")

def _validate_response_content(self, content: bytes) -> bytes:
"""Validate API response content."""
if content is None or len(content) == 0:
logging.info(
f"There is no data to download for {self.year} / {self.product}. Ending pipeline."
)
return None
logging.info(
f"Downloaded {self.product} data for {self.year} with {len(content)} rows!"
)
return content

def fetch_from_ntd_api(self):
"""Fetch data from NTD API with proper error handling."""
try:
# Construct and validate URL
url = (
self.root_url + self.endpoint_id + self.file_format + "?$limit=5000000"
)

validated_url = parse_obj_as(HttpUrl, url)

response = requests.get(validated_url).content

if response is None or len(response) == 0:
logging.info(
f"There is no data to download for {self.year} / {self.product}. Ending pipeline."
)
# Make API request
response_content = self._make_api_request(validated_url)

pass
else:
logging.info(
f"Downloaded {self.product} data for {self.year} with {len(response)} rows!"
)

return response

except requests.exceptions.RequestException as e:
logging.info(f"An error occurred: {e}")
# Validate response content
return self._validate_response_content(response_content)

except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
logging.error(f"Unexpected error occurred: {e}")
raise AirflowException(f"Unexpected error in NTD API request: {e}")


class JSONExtract(NtdDataProductAPIExtract):
Expand Down Expand Up @@ -110,17 +126,47 @@ def __init__(

super().__init__(**kwargs)

def execute(self, **kwargs):
api_content = self.extract.fetch_from_ntd_api()

decode_api_content = api_content.decode("utf-8")
def _process_api_content(self, api_content: bytes) -> pd.DataFrame:
"""Process API content into a DataFrame with error handling."""
try:
decode_api_content = api_content.decode("utf-8")
df = pd.read_json(decode_api_content)
return df.rename(make_name_bq_safe, axis="columns")
except ValueError as e:
logging.error(f"Error parsing JSON data: {e}")
raise AirflowException(f"Failed to parse JSON data: {e}")
except Exception as e:
logging.error(f"Error processing API content: {e}")
raise AirflowException(f"Failed to process API content: {e}")

def _save_dataframe(self, df: pd.DataFrame) -> None:
"""Save DataFrame as compressed JSONL with error handling."""
try:
gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
self.extract.save_content(fs=get_fs(), content=gzipped_content)
except Exception as e:
logging.error(f"Error saving processed data: {e}")
raise AirflowException(f"Failed to save processed data: {e}")

df = pd.read_json(decode_api_content)
def execute(self, **kwargs):
"""Execute the operator with proper error handling."""
try:
# Fetch API content
api_content = self.extract.fetch_from_ntd_api()
if api_content is None:
return None

df = df.rename(make_name_bq_safe, axis="columns")
# Process API content
df = self._process_api_content(api_content)

self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
# Save processed data
self._save_dataframe(df)

self.extract.save_content(fs=get_fs(), content=self.gzipped_content)
except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
logging.error(f"Error processing NTD API data: {e}")
raise AirflowException(f"Failed to process NTD API data: {e}")
Loading

0 comments on commit 0f4ed72

Please sign in to comment.