From e78c189a46ca1dc471783278db26b0cbc8f336b1 Mon Sep 17 00:00:00 2001 From: Korntewin Boonchuay Date: Tue, 23 Aug 2022 21:57:33 +0700 Subject: [PATCH 1/4] finish resources.py --- week_2/dagster_ucr/resources.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..e8c4aaa9 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -4,6 +4,7 @@ import boto3 import redis +from regex import W import sqlalchemy from dagster import Field, Int, String, resource @@ -91,13 +92,33 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": str, + "access_key": str, + "secret_key": str, + "endpoint_url": str, + } +) +def s3_resource(context): """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": str, + "port": int, + } +) +def redis_resource(context): """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + ) From b92f577dc7f38e19c7337ef66f126d8adf2ead65 Mon Sep 17 00:00:00 2001 From: Korntewin Boonchuay Date: Tue, 23 Aug 2022 22:41:45 +0700 Subject: [PATCH 2/4] finish week_2 graph --- week_2/dagster_ucr/project/week_2.py | 41 +++++++++++++++++++++------- week_2/tests/test_answer.py | 2 +- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 8e32a715..97c81b72 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,30 +1,51 @@ from typing import List from dagster import In, Nothing, Out, ResourceDefinition, graph, op +from pkg_resources import require +from py import process from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + required_resource_keys={"s3"}, + config_schema={"s3_key": str}, + tags={"kind": "s3"}, +) +def get_s3_data(context) -> List[Stock]: + s3 = context.resources.s3 + return [ + Stock.from_list(data) + for data in s3.get_data(context.op_config["s3_key"]) + ] @op -def process_data(): +def process_data(stocks: List[Stock]) -> Aggregation: # Use your op from week 1 - pass + highest = max(stocks, key=lambda stock: stock.high) + return Aggregation( + date=highest.date, + high=highest.high, + ) -@op -def put_redis_data(): - pass +@op( + required_resource_keys={"redis"}, + tags={"kind": "redis"} +) +def put_redis_data(context, agg: Aggregation): + redis = context.resources.redis + redis.put_data(agg.date, agg.high) + context.log.info(f"put key: ({agg.date}, val: {agg.high}) to redis") @graph def week_2_pipeline(): # Use your graph from week 1 - pass + stocks = get_s3_data() + agg = process_data(stocks) + put_redis_data(agg) local = { @@ -48,7 +69,7 @@ def week_2_pipeline(): } }, }, - "ops": {"get_s3_data": {"config": {"s3_key": "preifx/stock.csv"}}}, + "ops": {"get_s3_data": {"config": {"s3_key": "prefix/stock.csv"}}}, } local_week_2_pipeline = week_2_pipeline.to_job( diff --git a/week_2/tests/test_answer.py b/week_2/tests/test_answer.py index 926a4328..cdf3884c 100644 --- a/week_2/tests/test_answer.py +++ b/week_2/tests/test_answer.py @@ -50,7 +50,7 @@ def test_aggregation(aggregation): def test_get_s3_data(stock_list): s3_mock = MagicMock() s3_mock.get_data.return_value = [stock_list] * 10 - with build_op_context(op_config={"s3_key": "data/stock.csv"}, resources={"s3": s3_mock}) as context: + with build_op_context(op_config={"s3_key": "week2/data/stock.csv"}, resources={"s3": s3_mock}) as context: get_s3_data(context) assert s3_mock.get_data.called From a433788895453644d057ec4214fa215eaa1054a3 Mon Sep 17 00:00:00 2001 From: Korntewin Boonchuay Date: Tue, 23 Aug 2022 23:26:17 +0700 Subject: [PATCH 3/4] remove py module for docker compose to be working properly --- week_2/dagster_ucr/project/week_2.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 97c81b72..e553ae12 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,8 +1,6 @@ from typing import List -from dagster import In, Nothing, Out, ResourceDefinition, graph, op -from pkg_resources import require -from py import process +from dagster import ResourceDefinition, graph, op from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource From 43ca447d5f98c8cda2117a623fdf1f0a0abfa729 Mon Sep 17 00:00:00 2001 From: Korntewin Boonchuay Date: Tue, 23 Aug 2022 23:44:57 +0700 Subject: [PATCH 4/4] adjust type for redis key --- week_2/dagster_ucr/project/week_2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index e553ae12..addc4092 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -34,8 +34,9 @@ def process_data(stocks: List[Stock]) -> Aggregation: ) def put_redis_data(context, agg: Aggregation): redis = context.resources.redis - redis.put_data(agg.date, agg.high) - context.log.info(f"put key: ({agg.date}, val: {agg.high}) to redis") + key, val = agg.date.strftime("%Y-%m-%d"), agg.high + redis.put_data(key, val) + context.log.info(f"put key: ({key}, val: {val}) to redis") @graph