Skip to content

Commit

Permalink
updates 2024-11-16 - started to move to decorator design pattern for …
Browse files Browse the repository at this point in the history
…bronze assets - refactor in progress
  • Loading branch information
CHRISCARLON committed Nov 16, 2024
1 parent b38f9db commit 55e0c9c
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 128 deletions.
9 changes: 6 additions & 3 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from .assets.energy_data_assets import (
analytics_platform_carbon_intensity_assets,
entsog_uk_gas_assets,
ukpn_smart_metres
)
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets
from .assets.catalogue_metadata_assets import london_datastore, ukpn_datastore_roadmap

from .assets.infrastructure_data_assets import (
national_charge_points_london_assets,
Expand Down Expand Up @@ -77,7 +78,8 @@ def get_env_var(var_name: str) -> str:
defs = Definitions(
assets=load_assets_from_modules(
[
analytics_platform_datastore_assets,
london_datastore,
ukpn_datastore_roadmap,
analytics_platform_ea_flood_areas,
analytics_platform_ea_flood_public_forecast,
analytics_platform_dbt_trade_barrier_assets,
Expand All @@ -87,7 +89,8 @@ def get_env_var(var_name: str) -> str:
uk_power_networks_live_faults,
national_charge_points_uk_assets,
green_belt,
built_up_areas
built_up_areas,
ukpn_smart_metres
]
),
jobs=[
Expand Down
1 change: 1 addition & 0 deletions analytics_platform_dagster/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import Dict, List, Optional
from dagster import asset, AssetIn, AssetExecutionContext
import polars as pl

from ...models.catalogue_metadata_models.london_datastore import LondonDatastoreCatalogue
from ...utils.slack_messages.slack_message import with_slack_notification
from ...utils.variables_helper.url_links import asset_urls
from ...utils.etl_patterns.bronze_factory import bronze_asset_factory

def transform_london_datastore(data: List[Dict]) -> List[Dict]:
"""Transform London Datastore data into flattened format"""
rows = []
for item in data:
base_data = {
"id": str(item.get("id", "")),
"title": str(item.get("title", "")),
"description": str(item.get("description", "")),
"author": str(item.get("author", "")),
"author_email": str(item.get("author_email", "")),
"maintainer": str(item.get("maintainer", "")),
"maintainer_email": str(item.get("maintainer_email", "")),
"licence": str(item.get("licence", "")),
"licence_notes": str(item.get("licence_notes", "")),
"update_frequency": str(item.get("update_frequency", "")),
"slug": str(item.get("slug", "")),
"state": str(item.get("state", "")),
"createdAt": str(item.get("createdAt", "")),
"updatedAt": str(item.get("updatedAt", "")),
"london_smallest_geography": str(item.get("london_smallest_geography", "")),
"tags": ", ".join(item.get("tags", [])) if item.get("tags") else "",
"topics": ", ".join(item.get("topics", [])) if item.get("topics") else "",
"shares": str(item.get("shares", "")),
}

resources = item.get("resources", {})
if resources:
for resource_id, resource in resources.items():
resource_data = {
"resource_id": str(resource_id),
"resource_title": str(resource.get("title", "")),
"resource_format": str(resource.get("format", "")),
"resource_url": str(resource.get("url", "")),
"resource_description": str(resource.get("description", "")),
"resource_check_hash": str(resource.get("check_hash", "")),
"resource_check_size": str(resource.get("check_size", "")),
"resource_check_timestamp": str(resource.get("check_timestamp", "")),
}
rows.append({**base_data, **resource_data})
else:
rows.append(base_data)
return rows

@asset(
name="london_datastore_bronze",
group_name="metadata_catalogues",
io_manager_key="S3Parquet"
)
@bronze_asset_factory(
url_key="london_data_store",
model=LondonDatastoreCatalogue,
asset_urls=asset_urls,
transform_func=transform_london_datastore,
wrap_items=True
)
def london_datastore_bronze(context: AssetExecutionContext) -> Optional[bytes]:
"""Load London Datastore Metadata as Parquet to bronze bucket"""
return None

@asset(
name="london_datastore_silver",
group_name="metadata_catalogues",
io_manager_key="PolarsDeltaLake",
metadata={"mode": "overwrite"},
ins={
"london_datastore_bronze": AssetIn("london_datastore_bronze")
},
required_resource_keys={"slack"}
)
@with_slack_notification("London Datastore Catalogue Data")
def london_datastore_silver(
context: AssetExecutionContext,
london_datastore_bronze: pl.DataFrame
) -> pl.DataFrame:
"""Process London Datastore Metadata into silver bucket"""
df = london_datastore_bronze

df = df.with_columns([
pl.when(pl.col("createdAt").str.contains(r"\."))
.then(pl.col("createdAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.fZ", strict=False))
.otherwise(pl.col("createdAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%SZ", strict=False))
.cast(pl.Date),

pl.when(pl.col("updatedAt").str.contains(r"\."))
.then(pl.col("updatedAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.fZ", strict=False))
.otherwise(pl.col("updatedAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%SZ", strict=False))
.cast(pl.Date)
])

context.log.info(f"Overview: {df.head(25)}")
context.log.info(f"Columns: {df.columns}")
context.log.info(f"Types: {df.dtypes}")
context.log.info(f"Shape: {df.shape}")

return df
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional
from dagster import asset, AssetExecutionContext

from ...utils.variables_helper.url_links import asset_urls
from ...utils.etl_patterns.bronze_factory import bronze_asset_factory

@asset(
name="ukpn_datastore_roadmap_bronze",
group_name="metadata_catalogues",
io_manager_key="S3Parquet"
)
@bronze_asset_factory(
url_key="ukpn_datastore_roadmap_bronze",
asset_urls=asset_urls,
)
def ukpn_datastore_roadmap_bronze(context: AssetExecutionContext) -> Optional[bytes]:
"""Load UKPN Datastore Roadmap Metadata as Parquet to bronze bucket"""
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional
from dagster import asset, AssetExecutionContext

from ...utils.variables_helper.url_links import asset_urls
from ...utils.etl_patterns.bronze_factory import bronze_asset_factory

@asset(
name="ukpn_smart_metres",
group_name="energy_assets",
io_manager_key="S3Parquet"
)
@bronze_asset_factory(
url_key="ukpn_smart_metres",
asset_urls=asset_urls,
)
def ukpn_smart_metres(context: AssetExecutionContext) -> Optional[bytes]:
"""Load UKPN Smart Metres as Parquet to bronze bucket"""
return None
Loading

0 comments on commit 55e0c9c

Please sign in to comment.