Skip to content

Commit

Permalink
added in uk charge point data asset and refactored some pydantic models
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Nov 5, 2024
1 parent 9884b4e commit fed4bea
Show file tree
Hide file tree
Showing 14 changed files with 673 additions and 155 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,4 @@ test.py
creds.py
.streamlit
poetry.lock
data_testing/
12 changes: 8 additions & 4 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
)
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets
from .assets.infrastructure_data_assets import (
analytics_platform_national_charge_points_assets,
uk_power_networks_live_faults
national_charge_points_london_assets,
uk_power_networks_live_faults,
national_charge_points_uk_assets
)

from .utils.io_manager_helper.io_manager import (
S3ParquetManager,
AwsWranglerDeltaLakeIOManager,
S3JSONManager,
PartitionedDuckDBParquetManager
)

from .jobs.analytics_platfom_jobs import (
Expand Down Expand Up @@ -72,9 +74,10 @@ def get_env_var(var_name: str) -> str:
analytics_platform_ea_flood_public_forecast,
analytics_platform_dbt_trade_barrier_assets,
analytics_platform_carbon_intensity_assets,
analytics_platform_national_charge_points_assets,
national_charge_points_london_assets,
entsog_uk_gas_assets,
uk_power_networks_live_faults
uk_power_networks_live_faults,
national_charge_points_uk_assets

]
),
Expand All @@ -98,6 +101,7 @@ def get_env_var(var_name: str) -> str:
resources={
"S3Parquet": S3ParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"S3Json": S3JSONManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"PartitionedDuckDBManager": PartitionedDuckDBParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"DeltaLake": AwsWranglerDeltaLakeIOManager(
bucket_name=get_env_var("SILVER_DATA_BUCKET")
),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
This currently only provides data for London only
"""
import pandas as pd
import io

from pydantic import ValidationError
from ...utils.requests_helper.requests_helper import return_json
from ...utils.variables_helper.url_links import asset_urls
from ...models.infrastructure_data_models.national_charge_point_london_model import (
ChargeDevice,
)
from dagster import AssetExecutionContext, AssetIn, asset
from ...utils.slack_messages.slack_message import with_slack_notification

@asset(group_name="infrastructure_assets", io_manager_key="S3Parquet")
def national_charge_point_london_bronze(context: AssetExecutionContext):
"""
Fetches json data and uploads charge point data to S3 using IO Manager
Returns:
A parquet file in S3
"""

# Fetch url
url = asset_urls.get("national_charge_point_london")
if url is None:
raise ValueError("API_ENDPOINT can't be None")

response = return_json(url)
validation_errors = []


try:
ChargeDevice.model_validate(response)
except ValidationError as e:
validation_errors = e.errors()

df = pd.DataFrame(response)
df = df.astype(str)

context.log.info(f"Processed {len(df)} records with {len(validation_errors)} validation errors")

parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, engine="pyarrow")
parquet_bytes = parquet_buffer.getvalue()

context.log.info("Successfully processed batch into Parquet format")
return parquet_bytes

@asset(
group_name="infrastructure_assets",
io_manager_key="DeltaLake",
metadata={"mode": "overwrite"},
ins={
"national_charge_point_london_bronze": AssetIn(
"national_charge_point_london_bronze"
)
},
required_resource_keys={"slack"}
)
@with_slack_notification("National EV Charge Point Data London")
def national_charge_point_london_silver(
context: AssetExecutionContext, national_charge_point_london_bronze
) -> pd.DataFrame:
"""
Write charge point data out to Delta Lake
Returns:
Delta Lake table in S3.
"""
try:
df = pd.DataFrame(national_charge_point_london_bronze)
return df

except Exception as e:
context.log.error(f"Error processing data: {e}")
raise
Loading

0 comments on commit fed4bea

Please sign in to comment.