Skip to content

Commit

Permalink
updates 2024-11-16 - added in built up areas bronze data, added in ne…
Browse files Browse the repository at this point in the history
…w batch processing parquet io manager
  • Loading branch information
CHRISCARLON committed Nov 16, 2024
1 parent 4dda9b6 commit 12406e9
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 3 deletions.
15 changes: 12 additions & 3 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@
green_belt
)
from .assets.trade_data_assets import analytics_platform_dbt_trade_barrier_assets

from .assets.energy_data_assets import (
analytics_platform_carbon_intensity_assets,
entsog_uk_gas_assets,
)
from .assets.catalogue_metadata_assets import analytics_platform_datastore_assets

from .assets.infrastructure_data_assets import (
national_charge_points_london_assets,
uk_power_networks_live_faults,
national_charge_points_uk_assets
)

from .assets.location_data_assets import built_up_areas

from .utils.io_manager_helper.io_manager import (
S3ParquetManager,
S3ParquetManagerPartition,
AwsWranglerDeltaLakeIOManager,
S3JSONManager,
PartitionedDuckDBParquetManager,
Expand All @@ -41,7 +46,8 @@
infrastructure_job_1,
infrastructure_job_1_weekly,
infrastructure_job_2,
infrastructure_job_2_daily
infrastructure_job_2_daily,
location_job_1
)


Expand Down Expand Up @@ -80,7 +86,8 @@ def get_env_var(var_name: str) -> str:
entsog_uk_gas_assets,
uk_power_networks_live_faults,
national_charge_points_uk_assets,
green_belt
green_belt,
built_up_areas
]
),
jobs=[
Expand All @@ -90,7 +97,8 @@ def get_env_var(var_name: str) -> str:
metadata_job_1,
energy_job_1,
infrastructure_job_1,
infrastructure_job_2
infrastructure_job_2,
location_job_1
],
schedules=[
energy_job_1_daily,
Expand All @@ -104,6 +112,7 @@ def get_env_var(var_name: str) -> str:
sensors=[slack_failure_sensor],
resources={
"S3Parquet": S3ParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"S3ParquetPartition": S3ParquetManagerPartition(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"S3Json": S3JSONManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"PartitionedDuckDBManager": PartitionedDuckDBParquetManager(bucket_name=get_env_var("BRONZE_DATA_BUCKET")),
"DeltaLake": AwsWranglerDeltaLakeIOManager(bucket_name=get_env_var("SILVER_DATA_BUCKET")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import tempfile
import zipfile
import requests
import polars as pl
import fiona
import io

from typing import List, Dict
from shapely import wkt
from shapely.geometry import shape
from dagster import AssetExecutionContext, Output, asset
from ...utils.variables_helper.url_links import asset_urls
from ...utils.requests_helper.requests_helper import fetch_redirect_url

@asset(
group_name="location_assets",
io_manager_key="S3ParquetPartition",
)
def os_built_up_areas_bronze(context: AssetExecutionContext):
"""
Processes OS Built Up Areas data and returns a generator of parquet bytes for S3 storage
Flow:
ZIP → GPKG → Batch → DataFrame → Parquet Bytes → S3
Directory structure in S3:
bucket/
└── asset_name/
├── batch0001_20240116.parquet
├── batch0002_20240116.parquet
└── batchNumber_20240116.parquet
"""
def generate_batches():
BATCH_SIZE = 2000
batch_number = 0
errors = []
current_batch = []

# Fetch and validate URL
url = asset_urls.get("os_built_up_areas")
if url is None:
raise ValueError("No URL provided")

redirect_url = fetch_redirect_url(url)
if redirect_url is None:
raise ValueError("No redirect URL found")

# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
# Download and extract zip
zip_path = os.path.join(temp_dir, 'temp.zip')
response = requests.get(redirect_url)
response.raise_for_status()

with open(zip_path, 'wb') as zip_file:
zip_file.write(response.content)

# Extract GPKG file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)

# Find GPKG file
gpkg_file = next(
(os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('gpkg')),
None
)
if not gpkg_file:
raise FileNotFoundError("No GPKG file found in zip")

# Process GPKG file
with fiona.open(gpkg_file, 'r') as src:
context.log.info(f"CRS: {src.crs}")
context.log.info(f"Schema: {src.schema}")

for i, feature in enumerate(src):
try:
# Extract properties and add geometry as WKT
properties = dict(feature['properties'])
geom = shape(feature['geometry'])
properties['geometry'] = wkt.dumps(geom)

# Ensure all required fields are present with correct types
processed_properties = {
'gsscode': str(properties.get('gsscode', '')),
'name1_text': str(properties.get('name1_text', '')),
'name1_language': str(properties.get('name1_language', '')),
'name2_text': str(properties.get('name2_text', '')),
'name2_language': str(properties.get('name2_language', '')),
'areahectares': float(properties.get('areahectares', 0.0)),
'geometry_area_m': float(properties.get('geometry_area_m', 0.0)),
'geometry': properties['geometry']
}

current_batch.append(processed_properties)

# Hanlde errors if there are any and log them
except Exception as e:
error_msg = f"Error processing feature {i}: {e}"
context.log.error(error_msg)
errors.append(error_msg)
continue

# When the batch size is reached, convert to parquet bytes and yield
if len(current_batch) >= BATCH_SIZE:
batch_number += 1

# Convert batch to DataFrame then to parquet bytes
df_batch = pl.DataFrame(current_batch)
context.log.info(f"Processed final batch {df_batch.head(25)}")
parquet_buffer = io.BytesIO()
df_batch.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()

yield parquet_bytes

# Reset for next batch
current_batch = []
errors = []
context.log.info(f"Processed batch {batch_number}")

# Process final batch if any remains
if current_batch:
batch_number += 1
df_batch = pl.DataFrame(current_batch)
context.log.info(f"Processed final batch {df_batch.head(25)}")
parquet_buffer = io.BytesIO()
df_batch.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()
yield parquet_bytes
context.log.info(f"Processed final batch {batch_number}")

return Output(
value=generate_batches(),
metadata={
"description": "Generator of parquet bytes for batch processing"
}
)
8 changes: 8 additions & 0 deletions analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@
execution_timezone="Europe/London",
name="infrastructure_daily_schedule",
)

# LOCATION
location_job_1 = define_asset_job(
name="location_job_1",
selection=[
"os_built_up_areas_bronze"
]
)
99 changes: 99 additions & 0 deletions analytics_platform_dagster/utils/io_manager_helper/io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,105 @@ def load_input(self, context) -> pl.DataFrame:
except Exception as e:
raise e

class S3ParquetManagerPartition(IOManager):
"""
IO manager to handle reading and writing parquet bytes to S3.
"""
def __init__(self, bucket_name: str):
if not bucket_name:
raise S3BucketError()
self.bucket_name = bucket_name
self.aws_client = boto3.client("s3")

def handle_output(self, context, obj):
# Define base variables
asset_name = context.asset_key.path[-1]
batch_number = 0

# Handle generator of parquet bytes
if hasattr(obj, '__iter__'):
for batch_bytes in obj:
batch_number += 1
timestamp = datetime.now().strftime("%Y%m%d")
object_key = f"{asset_name}/batch_{timestamp}_{batch_number:04d}.parquet"

# Upload bytes to S3
try:
self.aws_client.put_object(
Bucket=self.bucket_name,
Key=object_key,
Body=batch_bytes,
ContentType="application/octet-stream",
)
context.log.info(
f"Batch {batch_number} written to S3 at s3://{self.bucket_name}/{object_key}"
)
except Exception as e:
raise e
else:
# Handle single batch case
timestamp = datetime.now().strftime("%Y%m%d")
object_key = f"{asset_name}/batch_{timestamp}.parquet"

try:
self.aws_client.put_object(
Bucket=self.bucket_name,
Key=object_key,
Body=obj,
ContentType="application/octet-stream",
)
context.log.info(
f"Data written to S3 at s3://{self.bucket_name}/{object_key}"
)
except Exception as e:
raise e

def load_input(self, context) -> pl.DataFrame:
asset_name = context.asset_key.path[-1]
prefix = f"{asset_name}/data/batch_"
try:
# List objects in the bucket with the given prefix
response = self.aws_client.list_objects_v2(
Bucket=self.bucket_name, Prefix=prefix
)

if "Contents" not in response or not response["Contents"]:
raise FileNotFoundError(
f"No files found with prefix '{prefix}' in bucket '{self.bucket_name}'"
)

# Sort the objects by last modified date
sorted_objects = sorted(
response["Contents"], key=lambda x: x["LastModified"], reverse=True
)

# Load all batches and concatenate
dfs = []
for obj in sorted_objects:
object_key = obj["Key"]
response = self.aws_client.get_object(
Bucket=self.bucket_name, Key=object_key
)
parquet_bytes = response["Body"].read()
df = pl.read_parquet(io.BytesIO(parquet_bytes))
dfs.append(df)
context.log.info(f"Loaded batch from: {object_key}")

# Concatenate all dataframes
final_df = pl.concat(dfs)
context.log.info(f"Concatenated {len(dfs)} batches")
return final_df

except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError(
f"No files found with prefix '{prefix}' in bucket '{self.bucket_name}'"
)
else:
raise e
except Exception as e:
raise e

class PartitionedDuckDBParquetManager(IOManager):
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,23 @@ def stream_json(url: str, set_chunk: int):
except requests.RequestException as error:
print(f"An error occurred: {error}")
raise

import requests

def fetch_redirect_url(url) -> str:
"""
Call the redirect url and then fetch the actual download url.
"""

if url is None:
raise ValueError("url is empty")

try:
response = requests.get(url)
response.raise_for_status()
redirect_url = response.url
print(f"The Redirect URL is: {redirect_url}")
except (requests.exceptions.RequestException, ValueError, Exception) as e:
print(f"An error retrieving the redirect URL: {e}")
raise
return redirect_url
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
"ukpn_live_faults": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-live-faults/exports/xlsx",
"green_belt": "https://files.planning.data.gov.uk/dataset/green-belt.json",
"entsog_gas": "https://transparency.entsog.eu/api/v1/operationalData.json",
"os_built_up_areas": "https://api.os.uk/downloads/v1/products/BuiltUpAreas/downloads?area=GB&format=GeoPackage&redirect"
}

0 comments on commit 12406e9

Please sign in to comment.