Skip to content

Commit

Permalink
updates 2024-11-18 - added renewable energy planning dataset and adde…
Browse files Browse the repository at this point in the history
…d uk heatwork planning dataset. Made tweaks to bronze data factory adding in excel processing as well as json.
  • Loading branch information
CHRISCARLON committed Nov 18, 2024
1 parent 55e0c9c commit bba99b4
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 15 deletions.
10 changes: 7 additions & 3 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
from .assets.energy_data_assets import (
analytics_platform_carbon_intensity_assets,
entsog_uk_gas_assets,
ukpn_smart_metres
ukpn_smart_metres,
renewable_energy_planning
)
from .assets.catalogue_metadata_assets import london_datastore, ukpn_datastore_roadmap

from .assets.infrastructure_data_assets import (
national_charge_points_london_assets,
uk_power_networks_live_faults,
national_charge_points_uk_assets
national_charge_points_uk_assets,
uk_heat_networks
)

from .assets.location_data_assets import built_up_areas
Expand Down Expand Up @@ -90,7 +92,9 @@ def get_env_var(var_name: str) -> str:
national_charge_points_uk_assets,
green_belt,
built_up_areas,
ukpn_smart_metres
ukpn_smart_metres,
renewable_energy_planning,
uk_heat_networks
]
),
jobs=[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Optional
from dagster import asset, AssetExecutionContext

from ...utils.variables_helper.url_links import asset_urls
from ...utils.etl_patterns.bronze_factory import bronze_asset_factory

@asset(
name="renewable_energy_planning",
group_name="energy_assets",
io_manager_key="S3Parquet"
)
@bronze_asset_factory(
url_key="renewable_energy_planning",
asset_urls=asset_urls,
sheet_name="REPD"
)
def renewable_energy_planning_bronze(context: AssetExecutionContext) -> Optional[bytes]:
"""Load Renewable Energy Planing Dataset as Parquet to bronze bucket"""
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Optional
from dagster import asset, AssetExecutionContext

from ...utils.variables_helper.url_links import asset_urls
from ...utils.etl_patterns.bronze_factory import bronze_asset_factory

@asset(
name="uk_heat_networks_bronze",
group_name="infrastructure_assets",
io_manager_key="S3Parquet"
)
@bronze_asset_factory(
url_key="uk_heat_networks",
asset_urls=asset_urls,
sheet_name="Heat Networks"
)
def uk_heat_networks_bronze(context: AssetExecutionContext) -> Optional[bytes]:
"""Load UK Heat Network Dataset as Parquet to bronze bucket"""
return None
59 changes: 50 additions & 9 deletions analytics_platform_dagster/utils/etl_patterns/bronze_factory.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union
import polars as pl
import requests
import io
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import BaseModel, ValidationError
from dagster import AssetExecutionContext
from functools import wraps
from builtins import bytes
from pathlib import Path

T = TypeVar('T', bound=BaseModel)

class BronzeETLBase:
"""Base class for Bronze ETL operations"""
"""Base class for Bronze ETL operations supporting both API and Excel data sources using Polars"""

def __init__(self, url_key: str, asset_urls: Dict[str, str]):
self.url_key = url_key
self.asset_urls = asset_urls

def fetch_api_data(self) -> Dict:
def _is_excel_url(self, url: str) -> bool:
"""Check if URL points to an Excel file"""
return any(url.lower().endswith(ext) for ext in ['.xlsx', '.xls', '.xlsm'])

def _process_excel_response(self, response: bytes, sheet_name: Optional[str] = None) -> Dict:
"""Process Excel file from response bytes using Polars"""
try:
# Create a BytesIO object from the response content
excel_buffer = io.BytesIO(response)

# Read Excel file into Polars DataFrame
if sheet_name:
df = pl.read_excel(excel_buffer, sheet_name=sheet_name)
else:
df = pl.read_excel(excel_buffer)

# Convert to records format similar to API response
records = df.to_dicts()

# Wrap in a dict with 'items' key to match API structure if needed
return {'items': records}

except Exception as e:
raise ValueError(f"Error processing Excel file: {str(e)}")

def fetch_data(self, sheet_name: Optional[str] = None) -> Dict:
"""Fetch data from either API or Excel source
Args:
sheet_name: Optional name of Excel sheet to read. If None, reads first sheet.
Returns:
Dict containing the data, with consistent structure regardless of source
"""
url = self.asset_urls.get(self.url_key)
if url is None:
raise ValueError(f"URL for {self.url_key} not found in asset_urls")

response = requests.get(url)
response.raise_for_status()
return response.json()

if self._is_excel_url(url):
return self._process_excel_response(response.content, sheet_name)
else:
return response.json()

def validate_data(
self,
Expand All @@ -31,7 +71,6 @@ def validate_data(
) -> List[Dict[str, Any]]:
if model is None:
return []

try:
if wrap_items:
model.model_validate({"items": data})
Expand All @@ -53,23 +92,25 @@ def bronze_asset_factory(
model: Optional[type[T]] = None,
transform_func: Optional[Callable] = None,
wrap_items: bool = False,
sheet_name: Optional[str] = None,
):
"""Factory function for creating bronze assets
"""Factory function for creating bronze assets with Excel support using Polars
Args:
url_key: Key to lookup URL in asset_urls
asset_urls: Dictionary of URLs
model: Optional Pydantic model for validation
transform_func: Optional function to transform data
wrap_items: Whether to wrap items for validation
sheet_name: Optional name of Excel sheet to read
"""
def decorator(func):
@wraps(func)
def wrapper(context: AssetExecutionContext) -> bytes:
etl = BronzeETLBase(url_key, asset_urls)
try:
# Fetch data
data = etl.fetch_api_data()
# Fetch data (now supports Excel with Polars)
data = etl.fetch_data(sheet_name=sheet_name)

# Validate data only if model is provided
validation_errors = etl.validate_data(data, model, wrap_items) if model else []
Expand All @@ -86,7 +127,7 @@ def wrapper(context: AssetExecutionContext) -> bytes:
else:
transformed_data = data

# Create DataFrame
# Create DataFrame using Polars
df = pl.DataFrame(transformed_data)

# Log some info about the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
"entsog_gas": "https://transparency.entsog.eu/api/v1/operationalData.json",
"os_built_up_areas": "https://api.os.uk/downloads/v1/products/BuiltUpAreas/downloads?area=GB&format=GeoPackage&redirect",
"ukpn_datastore_roadmap_bronze": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-external-facing-tracker/exports/json?lang=en&timezone=Europe%2FLondon",
"ukpn_smart_metres": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-smart-meter-installation-volumes/exports/json?lang=en&timezone=Europe%2FLondon"
"ukpn_smart_metres": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-smart-meter-installation-volumes/exports/json?lang=en&timezone=Europe%2FLondon",
"renewable_energy_planning": "https://assets.publishing.service.gov.uk/media/66fbf70430536cb927482a3d/repd-q2-july-2024.xlsx",
"uk_heat_networks": "https://assets.publishing.service.gov.uk/media/66bb52a10808eaf43b50e0f0/HNPD_2024_Q2.xlsx"
}
3 changes: 2 additions & 1 deletion images/Dockerfile.dagster-daemon
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ RUN pip install --no-cache-dir \
shapely \
aiohttp \
openpyxl \
polars
polars \
fastexcel

# Expose the Dagster daemon port
EXPOSE 3000
Expand Down
3 changes: 2 additions & 1 deletion images/Dockerfile.dagster-webserver
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ RUN pip install --no-cache-dir \
shapely \
aiohttp \
openpyxl \
polars
polars \
fastexcel

# Expose the Dagster webserver port
EXPOSE 3000
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ streamlit = "^1.38.0"
ruff = "^0.6.7"
openpyxl = "^3.1.5"
polars = "^1.12.0"
fastexcel = "^0.12.0"


[build-system]
Expand Down

0 comments on commit bba99b4

Please sign in to comment.