Skip to content

Commit

Permalink
updates 2024-11-10 - small bugs fixes due to storage options in s3 po…
Browse files Browse the repository at this point in the history
…lars filesystem
  • Loading branch information
CHRISCARLON committed Nov 10, 2024
1 parent 37e2887 commit 1b65c67
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def ea_flood_areas_bronze(context: AssetExecutionContext):
except ValidationError as e:
validation_errors = e.errors()

df = pl.DataFrame(result)
data = result["items"]
df = pl.DataFrame(data)

context.log.info(f"Processed {len(df)} records with {len(validation_errors)} validation errors")

Expand Down Expand Up @@ -61,8 +62,7 @@ def ea_flood_areas_silver(context: AssetExecutionContext, ea_flood_areas_bronze)

if data:
try:
items = data["items"]
df = pl.DataFrame(items)
df = pl.DataFrame(data)
context.log.info(f"Success: {df.head(25)}, {df.columns}, {df.shape}")
context.log.info(f"Success: {df.columns}, {df.shape}")
context.log.info(f"Success: {df.shape}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub
if source.ground:
output["ground_risk"] = source.ground

df = pl.DataFrame([output])
df = pl.DataFrame(output)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def national_charge_point_london_bronze(context: AssetExecutionContext):
validation_errors = e.errors()

df = pl.DataFrame(response, infer_schema_length=None)
df = df.select([pl.col("*").cast(pl.Utf8)])

context.log.info(f"Processed {len(df)} records with {len(validation_errors)} validation errors")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import pandas as pd
import polars as pl

from io import BytesIO
from pydantic import TypeAdapter, ValidationError
Expand Down Expand Up @@ -57,10 +58,9 @@ def ukpn_live_faults_bronze():
raise e

parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, engine="pyarrow")
df = df.astype(str)
polars_df = pl.from_pandas(df)
polars_df.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()

return parquet_bytes

@asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def __init__(
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
"AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY"),
"AWS_SESSION_TOKEN": os.getenv("AWS_SESSION_TOKEN")
# "AWS_SESSION_TOKEN": os.getenv("AWS_SESSION_TOKEN")
}

# Override defaults with provided storage options
Expand Down

0 comments on commit 1b65c67

Please sign in to comment.