-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated Workflow with relevant resources for Week 2 #7
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your ops and jobs are looking good!
heapq
was new to me so that was nice to see how it worked- I think (and would like to see what others say) in the future you might want to create a branch on your fork (say
onyekaugochukwu/corise-dagster/week1
) and then do a PR to your fork's master branch (merge toonyekaugochukwu/corise-dagster/master
) This way you keep your implementation in your own environment without conflicting with everyone elses. - You can also pull in the latest changes from Dennis's fork and then turn on GHA workflow (under the actions tab) and it will automatically run the tests whenever you do a PR, like a CI process would.
Commit suggestions from Rebecca Sanjabi Co-authored-by: Rebecca Sanjabi <[email protected]>
) | ||
def process_data(stocks: List[Stock]) -> Aggregation: | ||
stock_high_list = [stock.high for stock in stocks] | ||
highest_stock = float(heapq.nlargest(1,stock_high_list)[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal, but I believe this will fail if stocks
is an empty list (I believe Dagster will call all portions of a pipeline, even if the output from a previous step is empty). I think you could clean this up and avoid the failure by doing the following:
def process_data(stocks: List[Stock], highest_n: int = 1) -> Aggregation:
highest_stocks = heapq.nlargest(n, stocks, key=lambda x: x.high)
return highest_stocks
description="Receives list from S3 Data and selects highest stock value" | ||
) | ||
def process_data(stocks: List[Stock]) -> Aggregation: | ||
stock_high_list = [stock.high for stock in stocks] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
}, | ||
description="A resource that can run Redis", | ||
) | ||
def redis_resource(context) -> Redis: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to do this, but I think you can add type hints here for context
. Based on the examples in the Dagster docs, it doesn't seem standard to do so, but I find it helps me differentiate the different context
arguments that are passed to ops
, resources
, etc.
from dagster import InitResourceContext, OpExecutionContext
@op(...)
def blah(context: OpExecutionContext):
....
@resource(...)
def my_resource(context: InitResourceContext):
....
Looks good @onyekaugochukwu ! I left a couple of comments on some small things, but otherwise it looks like you got it. As @rsanjabi said, you can create a fork of Dennis' repo and then create your branches off your personal fork. Then your PR's will be into your personal repo and not Dennis' repo. |
Hi Dennis,
I updated the ops and job as requested