Skip to content

Commit

Permalink
Updates 2024-08-05 - Entsog silver complete
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 5, 2024
1 parent 50857a9 commit 8c3729d
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,101 @@

from datetime import datetime
from dagster import asset, AssetExecutionContext, AssetIn
from ...models.energy_data_models.carbon_intensity_assets_model import CarbonIntensityResponse
from ...models.energy_data_models.carbon_intensity_assets_model import (
CarbonIntensityResponse,
)
from ...utils.variables_helper.url_links import asset_urls

API_ENDPOINT = asset_urls.get("carbon_intensity_api")

async def fetch_data(session: aiohttp.ClientSession, region_id: int) -> CarbonIntensityResponse:

async def fetch_data(
session: aiohttp.ClientSession, region_id: int
) -> CarbonIntensityResponse:
"""
Async function to fetch data and validate against model.
"""
url = f"{API_ENDPOINT}{region_id}"
async with session.get(url) as response:
data = await response.json()
return CarbonIntensityResponse.model_validate(data, strict=True)


async def fetch_all_data_async():
"""
Fetch ID for 18 region_ids.
Put into parquet format.
"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, region_id) for region_id in range(1, 18)]
results = await asyncio.gather(*tasks)

data = []
for result in results:
region = result.data[0]
region_data = {
'regionid': region.regionid,
'dnoregion': region.dnoregion,
'shortname': region.shortname,
'from': region.data[0].from_,
'to': region.data[0].to,
'intensity_forecast': region.data[0].intensity.forecast,
'intensity_index': region.data[0].intensity.index
"regionid": region.regionid,
"dnoregion": region.dnoregion,
"shortname": region.shortname,
"from": region.data[0].from_,
"to": region.data[0].to,
"intensity_forecast": region.data[0].intensity.forecast,
"intensity_index": region.data[0].intensity.index,
}
for item in region.data[0].generationmix:
region_data[f"generationmix_{item.fuel}"] = item.perc
data.append(region_data)

df = pd.DataFrame(data)
parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, engine='pyarrow')
df.to_parquet(parquet_buffer, engine="pyarrow")
parquet_bytes = parquet_buffer.getvalue()

return parquet_bytes

@asset(
group_name="energy_assets",
io_manager_key="S3Parquet"
)

@asset(group_name="energy_assets", io_manager_key="S3Parquet")
def carbon_intensity_bronze(context: AssetExecutionContext):
"""
Store carbon intensity data in Parquet in S3...
Store carbon intensity data in Parquet in S3...
Dump to bronze bucket.
"""
data = asyncio.run(fetch_all_data_async())
return data


@asset(
group_name="energy_assets",
io_manager_key="DeltaLake",
group_name="energy_assets",
io_manager_key="DeltaLake",
metadata={"mode": "append"},
ins={"carbon_intensity_bronze": AssetIn("carbon_intensity_bronze")}
)
ins={"carbon_intensity_bronze": AssetIn("carbon_intensity_bronze")},
)
def carbon_intensity_silver(context: AssetExecutionContext, carbon_intensity_bronze):
"""
Store carbon intensity data in Delta Lake.
Rename columns and add additonal information.
Store carbon intensity data in Delta Lake...
Rename columns and add additonal information...
"""
data = carbon_intensity_bronze
data = data.rename(columns={
'regionid': 'region_id',
'dnoregion': 'dno_region',
'shortname': 'short_name',
'from': 'start_period',
'to': 'end_period'
})

data = data.rename(
columns={
"regionid": "region_id",
"dnoregion": "dno_region",
"shortname": "short_name",
"from": "start_period",
"to": "end_period",
}
)

# Add date_processed column
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
data['date_processed'] = current_time
data["date_processed"] = current_time

# Add asset_group column
asset_group = context.assets_def.group_names_by_key[context.asset_key]
context.log.info(f"Asset group: {asset_group}")
data['asset_group'] = asset_group

return data
data["asset_group"] = asset_group
context.log.info(f"{data}")
return data
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import requests
import pandas as pd

from pydantic import ValidationError
from ...models.energy_data_models.entsog_gas_data_assets_model import EntsogModel
from dagster import AssetExecutionContext, asset, op
from dagster import AssetExecutionContext, AssetIn, asset, op
from datetime import datetime, timedelta


Expand All @@ -20,7 +21,7 @@ def validate_model(entsog_data):

@asset(group_name="energy_assets", io_manager_key="S3Json")
def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
"""Put data in bronze bucket"""
"""Put data in Bronze bucket"""
# Base URL
base_url = "https://transparency.entsog.eu/api/v1/operationalData.json"
# Parameters that don't change
Expand Down Expand Up @@ -49,7 +50,7 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
data = response.json()
validated_data = data["operationalData"]
validate_model(validated_data)

context.log.info(f"Success: {validated_data}")
return validated_data
except requests.RequestException as e:
# Handle any requests-related errors
Expand All @@ -67,3 +68,27 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
# Handle any other unexpected errors
print(f"Unexpected error: {e}")
raise


@asset(
group_name="energy_assets",
io_manager_key="DeltaLake",
metadata={"mode": "append"},
ins={"entsog_gas_uk_data_bronze": AssetIn("entsog_gas_uk_data_bronze")},
)
def entsog_gas_uk_data_silver(
context: AssetExecutionContext, entsog_gas_uk_data_bronze
):
"""
Dump data into Silver bucket
"""
data = entsog_gas_uk_data_bronze

if data:
try:
df = pd.DataFrame(data)
df = df.astype(str)
context.log.info(f"Success: {df.head}, {df.columns}")
return df
except Exception as e:
raise e
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def national_charge_point_data_bronze(context: AssetExecutionContext) -> List[Di
Fetches json data and compares with total record count to ensure that the correct amount of records was fetched.
Returns:
A list of dictionaries
A json file in S3
"""

response = fetch_national_charge_point_data()
Expand Down Expand Up @@ -91,7 +91,7 @@ def national_charge_point_data_silver(
"""Write charge point data out to Delta Lake once Pydantic models has been validated
Returns:
pd.DataFrame
Delta Lake table in S3.
"""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ...models.trade_data_models.trade_barriers_model import TradingBarriers
from ...utils.variables_helper.url_links import asset_urls


@op
def validate_model(trade_barriers_data) -> None:
"""Validate json against pydantic model"""
Expand All @@ -18,10 +19,8 @@ def validate_model(trade_barriers_data) -> None:
print(f"Field: {error['loc']}, Error: {error['msg']}")
raise

@asset(
group_name="trade_assets",
io_manager_key="S3Json"
)

@asset(group_name="trade_assets", io_manager_key="S3Json")
def dbt_trade_barriers_bronze(context: AssetExecutionContext):
"""Load data into bronze bucket"""
try:
Expand All @@ -34,53 +33,60 @@ def dbt_trade_barriers_bronze(context: AssetExecutionContext):
print(f"Error in dbt_trade_barriers: {str(error)}")
raise error


@asset(
group_name="trade_assets",
io_manager_key="DeltaLake",
group_name="trade_assets",
io_manager_key="DeltaLake",
metadata={"mode": "overwrite"},
ins={"dbt_trade_barriers_bronze": AssetIn("dbt_trade_barriers_bronze")}
)
def dbt_trade_barriers_silver(context: AssetExecutionContext, dbt_trade_barriers_bronze ):
ins={"dbt_trade_barriers_bronze": AssetIn("dbt_trade_barriers_bronze")},
)
def dbt_trade_barriers_silver(
context: AssetExecutionContext, dbt_trade_barriers_bronze
):
"""Load data into silver bucket"""
data = dbt_trade_barriers_bronze
trading_barriers = TradingBarriers.model_validate(data)

flattened_data = []

for barrier in trading_barriers.barriers:
barrier_dict = barrier.model_dump()
country = barrier_dict.pop('country')
trading_bloc = country.pop('trading_bloc', None)
sectors = barrier_dict.pop('sectors')
barrier_dict.pop('categories', None)
country = barrier_dict.pop("country")
trading_bloc = country.pop("trading_bloc", None)
sectors = barrier_dict.pop("sectors")
barrier_dict.pop("categories", None)

# Flatten country data
barrier_dict.update(country)

# Flatten trading bloc data
if trading_bloc:
for key, value in trading_bloc.items():
if key != 'overseas_regions':
barrier_dict[f'trading_bloc_{key}'] = value
if key != "overseas_regions":
barrier_dict[f"trading_bloc_{key}"] = value

# Handle overseas regions
if 'overseas_regions' in trading_bloc:
overseas_regions = trading_bloc['overseas_regions']
barrier_dict['trading_bloc_overseas_regions'] = [region['name'] for region in overseas_regions]
barrier_dict['trading_bloc_overseas_region_ids'] = [region['id'] for region in overseas_regions]

if "overseas_regions" in trading_bloc:
overseas_regions = trading_bloc["overseas_regions"]
barrier_dict["trading_bloc_overseas_regions"] = [
region["name"] for region in overseas_regions
]
barrier_dict["trading_bloc_overseas_region_ids"] = [
region["id"] for region in overseas_regions
]

# Keep sectors as a list
barrier_dict['sectors'] = [sector['name'] for sector in sectors]
barrier_dict["sectors"] = [sector["name"] for sector in sectors]

flattened_data.append(barrier_dict)

# Create df
df = pd.DataFrame(flattened_data)

# Minor data type transformations
df = df.astype(str)
df = df.fillna("N/A")
time_date_cols = ['last_published_on', 'reported_on' ]
time_date_cols = ["last_published_on", "reported_on"]

for col in time_date_cols:
if col in df.columns:
Expand All @@ -90,4 +96,3 @@ def dbt_trade_barriers_silver(context: AssetExecutionContext, dbt_trade_barriers
context.log.info(f"Model Validation Successful {df.head(15)}")

return df

1 change: 1 addition & 0 deletions analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"carbon_intensity_bronze",
"carbon_intensity_silver",
"entsog_gas_uk_data_bronze",
"entsog_gas_uk_data_silver",
],
)

Expand Down

0 comments on commit 8c3729d

Please sign in to comment.