diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..8c15e69c 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List -from dagster import In, Nothing, Out, job, op, usable_as_dagster_type +from dagster import In, Nothing, Out, job, op, usable_as_dagster_type, get_dagster_logger from pydantic import BaseModel @@ -50,16 +50,29 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + out=None, + description="Write to Redis" +) +def put_redis_data(agg_max: Aggregation) -> None: + # Log the output + logger = get_dagster_logger() + logger.info(f"Write {agg_max} to Redis.") pass @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 3313403d..18c0665c 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,30 +1,54 @@ from typing import List -from dagster import In, Nothing, Out, ResourceDefinition, graph, op +from dagster import In, Nothing, Out, ResourceDefinition, graph, op, get_dagster_logger from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def process_data(): - # Use your op from week 1 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): - pass +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(dagster_type=Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_2_pipeline(): - # Use your graph from week 1 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..e00f36e2 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,35 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can connect to S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + }, + description="A resource that connects to Redis.", +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + ) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index f4753753..f6b82e52 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -13,34 +13,57 @@ op, sensor, static_partitioned_config, + get_dagster_logger, ) from project.resources import mock_s3_resource, redis_resource, s3_resource from project.sensors import get_s3_keys from project.types import Aggregation, Stock -@op -def get_s3_data(): - # Use your ops from week 2 - pass - - -@op -def process_data(): - # Use your ops from week 2 - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def put_redis_data(): - # Use your ops from week 2 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") + return Aggregation(date=max_stock.date, high=max_stock.high) + + +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(dagster_type=Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_3_pipeline(): - # Use your graph from week 2 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { @@ -69,8 +92,16 @@ def week_3_pipeline(): } -def docker_config(): - pass +# Create a fixed number of partitions (1-10) +partition_keys = [str(i) for i in range(1, 11)] + +@static_partitioned_config(partition_keys=partition_keys) +def docker_config(partition_key: str): + key = f'prefix/stock_{partition_key}.csv' + return { + **docker, + "ops": {"get_s3_data": {"config": {"s3_key": key }}} + } local_week_3_pipeline = week_3_pipeline.to_job( @@ -84,19 +115,57 @@ def docker_config(): docker_week_3_pipeline = week_3_pipeline.to_job( name="docker_week_3_pipeline", - config=docker_config, + config=docker, resource_defs={ "s3": s3_resource, "redis": redis_resource, }, + op_retry_policy=RetryPolicy(max_retries=10, delay=1) ) +# Schedule for local: Every 15 minutes +local_week_3_schedule = ScheduleDefinition(job=local_week_3_pipeline, cron_schedule="*/15 * * * *") -local_week_3_schedule = None # Add your schedule - -docker_week_3_schedule = None # Add your schedule +# Schedule for docker: Start of every hour +@schedule( + cron_schedule="0 * * * *", + job=docker_week_3_pipeline, + tags={"kind": "schedule"}, + description="Run scheduled for the start of every hour" +) +def docker_week_3_schedule(): + for partition_key in partition_keys: + request = docker_week_3_pipeline.run_request_for_partition(partition_key=partition_key, run_key=partition_key) + yield request -@sensor -def docker_week_3_sensor(): - pass +@sensor( + job=docker_week_3_pipeline, + minimum_interval_seconds=30, + description="Check S3 bucket for new files every 30 seconds." +) +def docker_week_3_sensor(context): + + # Check for new files in the S3 bucket. + new_files = get_s3_keys( + bucket="dagster", + prefix="prefix", + endpoint_url="http://host.docker.internal:4566" + ) + + if not new_files: + yield SkipReason("No new s3 files found in bucket.") + return + + log = get_dagster_logger() + log.info(f"RunRequest for {new_files}") + + # RunRequest for every new file. + for new_file in new_files: + yield RunRequest( + run_key=new_file, + run_config={ + **docker, + "ops": {"get_s3_data": {"config": {"s3_key": new_file}}} + } + )