From c66931b2c7c2902ee1fb89921eec2f5b27475ee6 Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Fri, 15 Nov 2024 11:57:31 +0000 Subject: [PATCH] updates 2024-11-15 - small flood data error --- ...ytics_platform_ea_flood_public_forecast.py | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py index 60b693f..03a3fbf 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py +++ b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py @@ -54,34 +54,28 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub """ EA Public Forecast flooding data silver bucket """ + # Load data into model + flood_risk_data = FloodRiskData.model_validate(ea_flood_public_forecast_bronze.to_dict()) - # Find data in bronze bucket - data = ea_flood_public_forecast_bronze - - # Load data into model - to use dot notation further down - flood_risk_data = FloodRiskData.model_validate(data) - - statement = flood_risk_data.statement - - # Retrieve values using dot notation from the model + # Create flattened dictionary structure output = { - "area_of_concern": statement.area_of_concern_url, - "headline": statement.headline, - "pdf_report": statement.pdf_url, - "issued_at": statement.issued_at, - "next_issue_due": statement.next_issue_due_at, - "flood_risk_day1": statement.flood_risk_trend.day1, - "flood_risk_day2": statement.flood_risk_trend.day2, - "flood_risk_day3": statement.flood_risk_trend.day3, - "flood_risk_day4": statement.flood_risk_trend.day4, - "flood_risk_day5": statement.flood_risk_trend.day5, - "england_forecast": statement.public_forecast.england_forecast, - "wales_forecast_english": statement.public_forecast.wales_forecast_english, - "risks_areas": statement.risk_areas + "area_of_concern": flood_risk_data.statement.area_of_concern_url, + "headline": flood_risk_data.statement.headline, + "pdf_report": flood_risk_data.statement.pdf_url, + "issued_at": flood_risk_data.statement.issued_at, + "next_issue_due": flood_risk_data.statement.next_issue_due_at, + "flood_risk_day1": flood_risk_data.statement.flood_risk_trend.day1, + "flood_risk_day2": flood_risk_data.statement.flood_risk_trend.day2, + "flood_risk_day3": flood_risk_data.statement.flood_risk_trend.day3, + "flood_risk_day4": flood_risk_data.statement.flood_risk_trend.day4, + "flood_risk_day5": flood_risk_data.statement.flood_risk_trend.day5, + "england_forecast": flood_risk_data.statement.public_forecast.england_forecast, + "wales_forecast_english": flood_risk_data.statement.public_forecast.wales_forecast_english, + "risks_areas": flood_risk_data.statement.risk_areas } - # Access nested values - for source in statement.sources: + # Add source data + for source in flood_risk_data.statement.sources: if source.coastal: output["coastal_risk"] = source.coastal if source.surface: @@ -91,6 +85,7 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub if source.ground: output["ground_risk"] = source.ground - df = pl.DataFrame(output) + # Convert to DataFrame + df = pl.DataFrame([output]) return df