Skip to content

Commit

Permalink
Updates 2024-08-01 - Created trade barriers bronze asset
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 1, 2024
1 parent ae421ec commit 8e780ab
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 59 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ scripts/.build
df.parquet
workspace.yaml
quick_check.ipynb
tmp*
tmp*
test.py
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

> [!NOTE]
> This is currently a work in progress.
> Early stage development.
>
> This project is in early stage development.
![image](https://github.com/user-attachments/assets/5aba2d3e-6ab5-4965-baa3-68caffb44afa)
4 changes: 2 additions & 2 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .assets.location_data_assets import analytics_platform_os_assets
from .assets.environment_data_assets import analytics_platform_ea_assets
from .assets.trade_data_assets import analytics_platform_dbt_assets
from .assets.trade_data_assets import analytics_platform_dbt_trade_barrier_assets
from .assets.energy_data_assets import analytics_platform_carbon_intensity_assets
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets
from .assets.infrastructure_data_assets import analytics_platform_national_charge_points_assets
Expand All @@ -23,7 +23,7 @@
analytics_platform_datastore_assets,
analytics_platform_os_assets,
analytics_platform_ea_assets,
analytics_platform_dbt_assets,
analytics_platform_dbt_trade_barrier_assets,
analytics_platform_carbon_intensity_assets,
analytics_platform_national_charge_points_assets
]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from datetime import datetime
from dagster import asset, AssetExecutionContext, AssetIn
from ...models.energy_data_models.carbon_intensity_assets_model import CarbonIntensityResponse
from ...utils.variables_helper.url_links import asset_urls

API_ENDPOINT = "https://api.carbonintensity.org.uk/regional/regionid/"
API_ENDPOINT = asset_urls.get("carbon_intensity_api")

async def fetch_data(session: aiohttp.ClientSession, region_id: int) -> CarbonIntensityResponse:
url = f"{API_ENDPOINT}{region_id}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

from pydantic import ValidationError
from dagster import asset, op, AssetExecutionContext

from ...utils.requests_helper.requests_helper import return_json
from ...models.trade_data_models.trade_barriers_model import TradingBarriers
from ...utils.variables_helper.url_links import asset_urls

@op
def validate_model(trade_barriers_data) -> None:
"""Validate json against pydantic model"""
try:
TradingBarriers.model_validate(trade_barriers_data)
except ValidationError as e:
print("Validation errors:")
for error in e.errors():
print(f"Field: {error['loc']}, Error: {error['msg']}")
raise

@asset(
group_name="trade_assets",
io_manager_key="S3Json"
)
def dbt_trade_barriers_bronze(context: AssetExecutionContext):
"""
Load data into bronze bucket
"""
try:
url = asset_urls.get("dbt_trading_bariers_asset")
data = return_json(url)
validate_model(data)
context.log.info("Model Validation Successful")
return data
except Exception as error:
print(f"Error in dbt_trade_barriers: {str(error)}")
raise error

2 changes: 1 addition & 1 deletion analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Trade
trade_job_1 = define_asset_job(
name="trade_job_1",
selection=["dbt_trade_barriers", "dbt_trade_barriers_transform", "dbt_trade_barriers_delta_lake"]
selection=["dbt_trade_barriers_bronze"]
)

# Metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List, Any, Optional
from pydantic import BaseModel

class Category(BaseModel):
pass

class OverseasRegion(BaseModel):
id: str
name: str

class TradingBloc(BaseModel):
code: str
name: str
overseas_regions: List[OverseasRegion]
short_name: str

class Country(BaseModel):
name: str
trading_bloc: Optional[TradingBloc] = None

class Sector(BaseModel):
name: str

class Barrier(BaseModel):
categories: List[Category]
caused_by_trading_bloc: Optional[Any] = None
country: Country
id: str
is_resolved: bool
last_published_on: str
location: str
reported_on: Optional[str] = None
sectors: List[Sector]
status_date: str
summary: str
title: str
trading_bloc: Optional[str] = None

class TradingBarriers(BaseModel):
barriers: List[Barrier]

model_config = {
"extra": "ignore"
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def load_input(self, context) -> pd.DataFrame:
reverse=True
)

# Get the latest object (first item after sorting)
# Get the latest object (first item after sorting
# Fetch the key which will be the filename that you need to look for
latest_object = sorted_objects[0]
object_key = latest_object['Key']

Expand All @@ -220,7 +221,7 @@ def load_input(self, context) -> pd.DataFrame:
df = pd.read_parquet(parquet_buffer)

context.log.info(f"Loaded latest Parquet file: s3://{self.bucket_name}/{object_key}")
# Return DataFrame
# Return as a DataFrame
return df

except ClientError as e:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
asset_urls = {
"dbt_asset":"https://data.api.trade.gov.uk/v1/datasets/market-barriers/versions/latest/data?format=json",
"dbt_trading_bariers_asset":"https://data.api.trade.gov.uk/v1/datasets/market-barriers/versions/latest/data?format=json",
"ea_flood_areas":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999",
"ea_floods":"https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit=9999",
"london_data_store":"https://data.london.gov.uk/api/datasets/export.json",
"national_charge_points": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/exports/json?lang=en&timezone=Europe%2FLondon",
"national_charge_points_api": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/records?limit=1"
"national_charge_points_api": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/records?limit=1",
"carbon_intensity_api": "https://api.carbonintensity.org.uk/regional/regionid/"
}

0 comments on commit 8e780ab

Please sign in to comment.