Skip to content

Commit

Permalink
Updates 2024-07-29 - Added in National Charge Point asset bronze data
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Jul 28, 2024
1 parent ccc4b0c commit 173f637
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 48 deletions.
17 changes: 13 additions & 4 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from dagster import Definitions, load_assets_from_modules

from .assets.location_data_assets import analytics_platform_os_assets
from .assets.environment_data_assets import analytics_platform_ea_assets
from .assets.trade_data_assets import analytics_platform_dbt_assets
from .assets.energy_data_assets import analytics_platform_carbon_intensity_assets
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets
from .utils.io_manager import S3ParquetManager, AwsWranglerDeltaLakeIOManager
from .assets.infrastructure_data_assets import analytics_platform_national_charge_points_assets

from .utils.io_manager import S3ParquetManager, AwsWranglerDeltaLakeIOManager, S3JSONManager

from .jobs.analytics_platfom_jobs import (
environment_job_1,
trade_job_1,
metadata_job_1,
energy_job_1,
energy_job_1_daily
energy_job_1_daily,
infrastructure_job_1
)

defs = Definitions(
Expand All @@ -20,18 +24,23 @@
analytics_platform_os_assets,
analytics_platform_ea_assets,
analytics_platform_dbt_assets,
analytics_platform_carbon_intensity_assets
analytics_platform_carbon_intensity_assets,
analytics_platform_national_charge_points_assets
]),

jobs=[
environment_job_1,
trade_job_1,
metadata_job_1,
energy_job_1,
infrastructure_job_1,
],
schedules=[energy_job_1_daily],
schedules=[
energy_job_1_daily
],
resources={
"S3Parquet": S3ParquetManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"),
"S3Json": S3JSONManager(bucket_name="datastackprod-bronzedatabucket85c612b2-tjqgl6ahaks5"),
"DeltaLake": AwsWranglerDeltaLakeIOManager(bucket_name="datastackprod-silverdatabucket04c06b24-mrfdumn6njwe")
},
)
Original file line number Diff line number Diff line change
@@ -1,41 +1,25 @@
# from ...utils.requests_helper import return_api_data_json
# from ...utils.io_manager import S3JSONManager
# from ...models.infrastructure_data_models.national_charge_point_model import ChargeDeviceResponse
# from datetime import datetime
# from dagster import asset, AssetExecutionContext, AssetIn

# API_ENDPOINT = "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/records?limit=50"

# def fetch_national_charge_point_data() -> ChargeDeviceResponse:
# url = API_ENDPOINT
# data = return_api_data_json(url)
# return ChargeDeviceResponse.model_validate(data, strict=True)
from typing import Any
from ...utils.requests_helper import stream_json
from ...utils.url_links import asset_urls
# from ...models.infrastructure_data_models.national_charge_point_model import ChargeDeviceResponse
from dagster import AssetExecutionContext, asset

# @asset(
# group_name="energy_assets",
# io_manager_key="S3Parquet"
# )
# def national_charge_point_data_bronze():
# response = fetch_national_charge_point_data()
# return response
DONWLOADLINK = asset_urls.get("national_charge_points")

def fetch_national_charge_point_data() -> Any:
try:
url = DONWLOADLINK
return stream_json(url, set_chunk=5000)
except Exception as error:
raise error

# def national_charge_point_data_silver():
# # Fetch the data
# response = fetch_national_charge_point_data()

# # Extract the results
# charge_devices = response.results

# # Convert to a list of dictionaries
# data = [device.model_dump() for device in charge_devices]

# # Create a DataFrame
# df = pd.DataFrame(data)

# # Handle the nested geo_point structure
# df['lon'] = df['geo_point'].apply(lambda x: x['lon'])
# df['lat'] = df['geo_point'].apply(lambda x: x['lat'])
# df = df.drop('geo_point', axis=1)

# return df
@asset(
group_name="infrastructure_assets",
io_manager_key="S3Json"
)
def national_charge_point_data_bronze(context: AssetExecutionContext) -> Any:
context.log.info("Started Processing Data")
response = fetch_national_charge_point_data()
context.log.info("Finished Processing Data")
return response
14 changes: 10 additions & 4 deletions analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@

# Evironment
environment_job_1 = define_asset_job(
name="analytics_platform_ea_assets",
name="environment_job_1",
selection=["ea_floods", "ea_flood_areas"]
)

# Trade
trade_job_1 = define_asset_job(
name="analytics_platform_dbt_assets",
name="trade_job_1",
selection=["dbt_trade_barriers", "dbt_trade_barriers_transform", "dbt_trade_barriers_delta_lake"]
)

# Metadata
metadata_job_1 = define_asset_job(
name="analytics_platform_datastore_assets",
name="metadata_job_1",
selection=["london_datastore"]
)

# Energy
energy_job_1 = define_asset_job(
name="analytics_platform_carbon_intensity_assets",
name="energy_job_1",
selection=["carbon_intensity_bronze", "carbon_intensity_silver"]
)

Expand All @@ -30,3 +30,9 @@
execution_timezone="Europe/London",
name="energy_daily_schedule"
)

# Infrastructure
infrastructure_job_1 = define_asset_job(
name="infrastructure_job_1",
selection=["national_charge_point_data_bronze"]
)
17 changes: 15 additions & 2 deletions analytics_platform_dagster/utils/requests_helper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import requests
import json

def return_api_data_json(url_link: str):
def return_json(url_link: str) -> json:
try:
response = requests.get(url_link)
response.raise_for_status()
data = response.json()
return data
except requests.RequestException as error:
print(f"An error occurred: {error}")
raise
raise


def stream_json(url: str, set_chunk: int) -> json:
try:
response = requests.get(url, stream=True)
buffer = ''
for chunk in response.iter_content(set_chunk):
buffer += chunk.decode('utf-8')
return json.loads(buffer)
except requests.RequestException as error:
print(f"An error occurred: {error}")
raise
3 changes: 2 additions & 1 deletion analytics_platform_dagster/utils/url_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"dbt_asset":"https://data.api.trade.gov.uk/v1/datasets/market-barriers/versions/latest/data?format=json",
"ea_flood_areas":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999",
"ea_floods":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999",
"london_data_store":"https://data.london.gov.uk/api/datasets/export.json"
"london_data_store":"https://data.london.gov.uk/api/datasets/export.json",
"national_charge_points": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/exports/json?lang=en&timezone=Europe%2FLondon"
}

0 comments on commit 173f637

Please sign in to comment.