-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Week 3 Project Submission #53
base: master
Are you sure you want to change the base?
Changes from 12 commits
b4733e8
8785918
60b321a
3dbe4e0
439c194
069a604
3586228
5a0ea7a
132fd2e
0213ded
0edb024
9e47576
aa9ed44
eefb297
700f72d
a8c4b35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,34 +13,57 @@ | |
op, | ||
sensor, | ||
static_partitioned_config, | ||
get_dagster_logger, | ||
) | ||
from project.resources import mock_s3_resource, redis_resource, s3_resource | ||
from project.sensors import get_s3_keys | ||
from project.types import Aggregation, Stock | ||
|
||
|
||
@op | ||
def get_s3_data(): | ||
# Use your ops from week 2 | ||
pass | ||
|
||
|
||
@op | ||
def process_data(): | ||
# Use your ops from week 2 | ||
pass | ||
@op( | ||
config_schema={"s3_key": str}, | ||
required_resource_keys={"s3"}, | ||
out={"stocks": Out(dagster_type=List[Stock])}, | ||
tags={"kind": "s3"}, | ||
description="Get a list of stocks from an S3 file", | ||
) | ||
def get_s3_data(context) -> List[Stock]: | ||
stocks = context.resources.s3.get_data(context.op_config["s3_key"]) | ||
return [Stock.from_list(stock) for stock in stocks] | ||
|
||
|
||
@op | ||
def put_redis_data(): | ||
# Use your ops from week 2 | ||
pass | ||
@op( | ||
ins={"stocks": In(dagster_type=List[Stock])}, | ||
out={"agg_max": Out(dagster_type=Aggregation)}, | ||
tags={"kind": "transformation"}, | ||
description="Determine the stock with the highest value" | ||
) | ||
def process_data(stocks: List[Stock]) -> Aggregation: | ||
# Find the stock with the highest value | ||
max_stock = max(stocks, key=lambda x: x.high) | ||
# Log the output | ||
logger = get_dagster_logger() | ||
logger.info(f"Higest stock is: {max_stock}") | ||
return Aggregation(date=max_stock.date, high=max_stock.high) | ||
|
||
|
||
@op( | ||
ins={"agg_max": In(dagster_type=Aggregation)}, | ||
required_resource_keys={"redis"}, | ||
out=Out(dagster_type=Nothing), | ||
tags={"kind": "redis"}, | ||
description="Write to Redis" | ||
) | ||
def put_redis_data(context, agg_max: Aggregation) -> None: | ||
data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) | ||
context.log.info(f"Write {data} to Redis.") | ||
|
||
|
||
@graph | ||
def week_3_pipeline(): | ||
# Use your graph from week 2 | ||
pass | ||
s3_data = get_s3_data() | ||
highest_stock = process_data(s3_data) | ||
put_redis_data(highest_stock) | ||
|
||
|
||
local = { | ||
|
@@ -69,8 +92,16 @@ def week_3_pipeline(): | |
} | ||
|
||
|
||
def docker_config(): | ||
pass | ||
# Create a fixed number of partitions (1-10) | ||
partition_keys = [str(i) for i in range(1, 11)] | ||
|
||
@static_partitioned_config(partition_keys=partition_keys) | ||
def docker_config(partition_key: str): | ||
key = f'prefix/stock_{partition_key}.csv' | ||
return { | ||
"resources": {**docker["resources"]}, | ||
"ops": {"get_s3_data": {"config": {"s3_key": key }}} | ||
} | ||
|
||
|
||
local_week_3_pipeline = week_3_pipeline.to_job( | ||
|
@@ -89,14 +120,43 @@ def docker_config(): | |
"s3": s3_resource, | ||
"redis": redis_resource, | ||
}, | ||
op_retry_policy=RetryPolicy(max_retries=10, delay=1) | ||
) | ||
|
||
# Schedule for local: Every 15 minutes | ||
local_week_3_schedule = ScheduleDefinition(job=local_week_3_pipeline, cron_schedule="*/15 * * * *") | ||
|
||
local_week_3_schedule = None # Add your schedule | ||
|
||
docker_week_3_schedule = None # Add your schedule | ||
# Schedule for docker: Start of every hour | ||
docker_week_3_schedule = ScheduleDefinition(job=docker_week_3_pipeline, cron_schedule="0 * * * *") | ||
|
||
|
||
@sensor | ||
def docker_week_3_sensor(): | ||
pass | ||
@sensor( | ||
job=docker_week_3_pipeline, | ||
minimum_interval_seconds=30, | ||
description="Check S3 bucket for new files every 30 seconds." | ||
) | ||
def docker_week_3_sensor(context): | ||
|
||
# Check for new files in the S3 bucket. | ||
new_files = get_s3_keys( | ||
bucket="dagster", | ||
prefix="prefix", | ||
endpoint_url="http://host.docker.internal:4566" | ||
) | ||
|
||
if not new_files: | ||
yield SkipReason("No new s3 files found in bucket.") | ||
return | ||
|
||
log = get_dagster_logger() | ||
log.info(f"RunRequest for {new_files}") | ||
|
||
# RunRequest for every new file. | ||
for new_file in new_files: | ||
yield RunRequest( | ||
run_key=new_file, | ||
run_config={ | ||
"resources": {**docker["resources"]}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ianyoung I did exactly the same as you:) However, the code can be simplified as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point @yjw868. Now updated. Thanks! |
||
"ops": {"get_s3_data": {"config": {"s3_key": new_file}}}, | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code passes the tests, but I believe this definition is incorrect. Try to turn o this schedule in Dagit, and it will compain about incorrect configuration. See this discussion - https://data-orchestration.slack.com/archives/C03SB6TAXQV/p1661960430498179