Skip to content

Commit

Permalink
updated assets to polars for delta lake writing, created polars io ma…
Browse files Browse the repository at this point in the history
…nager
  • Loading branch information
CHRISCARLON committed Nov 8, 2024
1 parent fa0b371 commit 8987692
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 373 deletions.
8 changes: 4 additions & 4 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
S3ParquetManager,
AwsWranglerDeltaLakeIOManager,
S3JSONManager,
PartitionedDuckDBParquetManager
PartitionedDuckDBParquetManager,
PolarsDeltaLakeIOManager
)

from .jobs.analytics_platfom_jobs import (
Expand Down Expand Up @@ -105,9 +106,8 @@ def get_env_var(var_name: str) -> str:
"S3Parquet": S3ParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"S3Json": S3JSONManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"PartitionedDuckDBManager": PartitionedDuckDBParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"DeltaLake": AwsWranglerDeltaLakeIOManager(
bucket_name=get_env_var("SILVER_DATA_BUCKET")
),
"DeltaLake": AwsWranglerDeltaLakeIOManager(bucket_name=get_env_var("SILVER_DATA_BUCKET")),
"PolarsDeltaLake": PolarsDeltaLakeIOManager(bucket_name=get_env_var("SILVER_DATA_BUCKET")),
"slack": SlackResource(token=get_env_var("SLACKBOT")),
},
)
Original file line number Diff line number Diff line change
@@ -1,45 +1,98 @@
import requests
import json
import pandas as pd
import polars as pl
import io

from pydantic import ValidationError
from dagster import asset, AssetIn, AssetExecutionContext
from ...models.catalogue_metadata_models.london_datastore import (
LondonDatastoreCatalogue,
)
from ...utils.slack_messages.slack_message import with_slack_notification
from ...utils.variables_helper.url_links import asset_urls

@asset(group_name="metadata_catalogues", io_manager_key="S3Json")
@asset(group_name="metadata_catalogues", io_manager_key="S3Parquet")
def london_datastore_bronze(context: AssetExecutionContext):
"""
Load London Datastore Metadata bronze bucket
Load London Datastore Metadata as Parquet to bronze bucket
"""

try:
# Fetch datastore data
url = asset_urls.get("london_data_store")
if url is None:
raise ValueError("URL for london_data_store is not found in asset_urls")

# Manage response
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.content)

# Validate model
validate = response.json()
LondonDatastoreCatalogue.model_validate({"items": validate})
data = response.json()

validation_errors = []
try:
validated = LondonDatastoreCatalogue.model_validate({"items": data})
except ValidationError as e:
validation_errors = e.errors()

# Create flattened rows
rows = []
for item in data:
base_data = {
"id": str(item.get("id", "")),
"title": str(item.get("title", "")),
"description": str(item.get("description", "")),
"author": str(item.get("author", "")),
"author_email": str(item.get("author_email", "")),
"maintainer": str(item.get("maintainer", "")),
"maintainer_email": str(item.get("maintainer_email", "")),
"licence": str(item.get("licence", "")),
"licence_notes": str(item.get("licence_notes", "")),
"update_frequency": str(item.get("update_frequency", "")),
"slug": str(item.get("slug", "")),
"state": str(item.get("state", "")),
"createdAt": str(item.get("createdAt", "")),
"updatedAt": str(item.get("updatedAt", "")),
"london_smallest_geography": str(item.get("london_smallest_geography", "")),
"tags": ", ".join(item.get("tags", [])) if item.get("tags") else "",
"topics": ", ".join(item.get("topics", [])) if item.get("topics") else "",
"shares": str(item.get("shares", "")),
}

context.log.info(f"Model Validatred. There are: {len(data)} catalogue items.")
return data
# Handle resources
resources = item.get("resources", {})
if resources:
for resource_id, resource in resources.items():
resource_data = {
"resource_id": str(resource_id),
"resource_title": str(resource.get("title", "")),
"resource_format": str(resource.get("format", "")),
"resource_url": str(resource.get("url", "")),
"resource_description": str(resource.get("description", "")),
"resource_check_hash": str(resource.get("check_hash", "")),
"resource_check_size": str(resource.get("check_size", "")),
"resource_check_timestamp": str(resource.get("check_timestamp", "")),
}
rows.append({**base_data, **resource_data})
else:
# If no resources, still include the base data
rows.append(base_data)

# Create DataFrame from flattened data
df = pl.DataFrame(rows)

context.log.info(f"Processed {len(df)} records with {len(validation_errors)} validation errors")

parquet_buffer = io.BytesIO()
df.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()

context.log.info("Successfully processed Parquet data to S3")
return parquet_bytes

except Exception as e:
raise e


@asset(
group_name="metadata_catalogues",
io_manager_key="DeltaLake",
io_manager_key="PolarsDeltaLake",
metadata={"mode": "overwrite"},
ins={"london_datastore_bronze": AssetIn("london_datastore_bronze")},
required_resource_keys={"slack"}
Expand All @@ -50,58 +103,23 @@ def london_datastore_silver(context: AssetExecutionContext, london_datastore_bro
Process London Datastore Metadata into silver bucket.
"""

# Make sure bronze bucket data is read in
input_data = london_datastore_bronze

# Load back into Pydantic model - this makes accessing nested structures easier.
validated = LondonDatastoreCatalogue.model_validate({"items": input_data})

# list to store data pre dataframe
rows = []

# Loop through model and access data fields using dot notation
for item in validated.items:
base_data = {
"id": item.id,
"title": item.title,
"description": item.description,
"author": item.author,
"author_email": item.author_email,
"maintainer": item.maintainer,
"maintainer_email": item.maintainer_email,
"licence": item.licence,
"licence_notes": item.licence_notes,
"update_frequency": item.update_frequency,
"slug": item.slug,
"state": item.state,
"createdAt": item.createdAt,
"updatedAt": item.updatedAt,
"london_smallest_geography": item.london_smallest_geography,
"tags": ", ".join(item.tags) if item.tags else "",
"topics": ", ".join(item.topics) if item.topics else "",
"shares": str(item.shares),
}

# Access nested data fields
for resource_id, resource in item.resources.items():
resource_data = {
"resource_id": resource_id,
"resource_title": resource.title,
"resource_format": resource.format,
"resource_url": resource.url,
"resource_description": resource.description,
"resource_check_hash": resource.check_hash,
"resource_check_size": resource.check_size,
"resource_check_timestamp": resource.check_timestamp,
}
rows.append({**base_data, **resource_data})
df = pl.DataFrame(london_datastore_bronze)

df = df.with_columns([
pl.when(pl.col("createdAt").str.contains(r"\."))
.then(pl.col("createdAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S.%fZ", strict=False))
.otherwise(pl.col("createdAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%SZ", strict=False))
.cast(pl.Date),

df = pd.DataFrame(rows)
pl.when(pl.col("updatedAt").str.contains(r"\."))
.then(pl.col("updatedAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S.%fZ", strict=False))
.otherwise(pl.col("updatedAt").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%SZ", strict=False))
.cast(pl.Date)
])

context.log.info(f"Overview: {df.head(25)}")
context.log.info(f"Overview 2: {df.columns}")
context.log.info(f"Overview 2: {df.dtypes}")
context.log.info(f"Overview 2: {df.shape}")

df = df.astype(str)
return df
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import aiohttp
import asyncio
import pandas as pd
import polars as pl
import io

from datetime import datetime
Expand Down Expand Up @@ -60,9 +60,9 @@ async def fetch_all_data_async():
region_data[f"generationmix_{item.fuel}"] = item.perc
data.append(region_data)

df = pd.DataFrame(data)
df = pl.DataFrame(data)
parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, engine="pyarrow")
df.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()

return parquet_bytes
Expand All @@ -81,7 +81,7 @@ def carbon_intensity_bronze(context: AssetExecutionContext):

@asset(
group_name="energy_assets",
io_manager_key="DeltaLake",
io_manager_key="PolarsDeltaLake",
metadata={"mode": "overwrite"},
ins={"carbon_intensity_bronze": AssetIn("carbon_intensity_bronze")},
required_resource_keys={"slack"}
Expand All @@ -94,8 +94,9 @@ def carbon_intensity_silver(context: AssetExecutionContext, carbon_intensity_bro
Rename columns and add additonal information.
"""
data = carbon_intensity_bronze

data = data.rename(
columns={
{
"regionid": "region_id",
"dnoregion": "dno_region",
"shortname": "short_name",
Expand All @@ -104,16 +105,7 @@ def carbon_intensity_silver(context: AssetExecutionContext, carbon_intensity_bro
}
)

# Add date_processed column
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
data["date_processed"] = current_time

# Add asset_group column
asset_group = context.assets_def.group_names_by_key[context.asset_key]
context.log.info(f"Asset group: {asset_group}")
data["asset_group"] = asset_group

# Print info
context.log.info(f"{data.head(10)}")
context.log.info(f"{data.columns}")

return data

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import requests
import pandas as pd
import polars as pl
import io

from pydantic import ValidationError
from ...models.energy_data_models.entsog_gas_data_assets_model import EntsogModel
from dagster import AssetExecutionContext, AssetIn, asset, op
from datetime import datetime, timedelta
from ...utils.slack_messages.slack_message import with_slack_notification
from ...utils.variables_helper.url_links import asset_urls

@op
def validate_model(entsog_data):
Expand All @@ -20,13 +22,15 @@ def validate_model(entsog_data):
print(f"Field: {error['loc']}, Error: {error['msg']}")
raise

@asset(group_name="energy_assets", io_manager_key="S3Json")
@asset(group_name="energy_assets", io_manager_key="S3Parquet")
def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
"""
Put data in Bronze bucket
"""
# Base URL
base_url = "https://transparency.entsog.eu/api/v1/operationalData.json"
base_url = asset_urls.get("entsog_gas")
if base_url is None:
raise ValueError("Could not load base url")

# Parameters that don't change - this will get you gas flows in and out of the UK
params = {
Expand All @@ -52,14 +56,21 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext):
response = requests.get(base_url, params=params)
response.raise_for_status()

# Parse JSON and validate model
# Parse data and validate model
data = response.json()
validated_data = data["operationalData"]
validate_model(validated_data)

context.log.info(f"Success: {validated_data}")

return validated_data
df = pl.DataFrame(validated_data, infer_schema_length=None)

parquet_buffer = io.BytesIO()
df.write_parquet(parquet_buffer)

parquet_bytes = parquet_buffer.getvalue()

return parquet_bytes

except requests.RequestException as e:
# Handle any requests-related errors
print(f"Error fetching data: {e}")
Expand All @@ -79,7 +90,7 @@ def entsog_gas_uk_data_bronze(context: AssetExecutionContext):

@asset(
group_name="energy_assets",
io_manager_key="DeltaLake",
io_manager_key="PolarsDeltaLake",
metadata={"mode": "overwrite"},
ins={"entsog_gas_uk_data_bronze": AssetIn("entsog_gas_uk_data_bronze")},
required_resource_keys={"slack"}
Expand All @@ -93,16 +104,24 @@ def entsog_gas_uk_data_silver(
"""

# Access data in the Bronze delta lake
data = entsog_gas_uk_data_bronze

if data:
try:
df = pd.DataFrame(data)
df = df.astype(str)

# Print info
context.log.info(f"Success: {df.head(25)}")
context.log.info(f"Success: {df.columns}")
return df
except Exception as e:
raise e
data = pl.DataFrame(entsog_gas_uk_data_bronze)

try:
df = pl.DataFrame(data)

# Log initial schema
context.log.info(f"Initial schema: {df.schema}")

null_cols = [name for name, dtype in df.schema.items() if dtype == pl.Null]

if null_cols:
df = df.with_columns([
pl.col(col).cast(pl.Utf8).fill_null("") for col in null_cols
])

# Log final schema
context.log.info(f"Final schema: {df.schema}")
context.log.info(f"Sample data: {df.head(5)}")
return df
except Exception as e:
raise e
Loading

0 comments on commit 8987692

Please sign in to comment.