Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Workflow with relevant resources for Week 2 #7

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

onyekaugochukwu
Copy link

Hi Dennis,

I updated the ops and job as requested

Copy link

@rsanjabi rsanjabi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your ops and jobs are looking good!

  • heapq was new to me so that was nice to see how it worked
  • I think (and would like to see what others say) in the future you might want to create a branch on your fork (say onyekaugochukwu/corise-dagster/week1) and then do a PR to your fork's master branch (merge to onyekaugochukwu/corise-dagster/master) This way you keep your implementation in your own environment without conflicting with everyone elses.
  • You can also pull in the latest changes from Dennis's fork and then turn on GHA workflow (under the actions tab) and it will automatically run the tests whenever you do a PR, like a CI process would.

week_1/project/week_1.py Outdated Show resolved Hide resolved
@onyekaugochukwu onyekaugochukwu changed the title Updated Workflow with Ops and Jobs for Week 1 Updated Workflow with relevant resources for Week 2 Aug 31, 2022
)
def process_data(stocks: List[Stock]) -> Aggregation:
stock_high_list = [stock.high for stock in stocks]
highest_stock = float(heapq.nlargest(1,stock_high_list)[0])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but I believe this will fail if stocks is an empty list (I believe Dagster will call all portions of a pipeline, even if the output from a previous step is empty). I think you could clean this up and avoid the failure by doing the following:

def process_data(stocks: List[Stock], highest_n: int = 1) -> Aggregation:
    highest_stocks = heapq.nlargest(n, stocks, key=lambda x: x.high)
    return highest_stocks

description="Receives list from S3 Data and selects highest stock value"
)
def process_data(stocks: List[Stock]) -> Aggregation:
stock_high_list = [stock.high for stock in stocks]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

},
description="A resource that can run Redis",
)
def redis_resource(context) -> Redis:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to do this, but I think you can add type hints here for context. Based on the examples in the Dagster docs, it doesn't seem standard to do so, but I find it helps me differentiate the different context arguments that are passed to ops, resources, etc.

from dagster import InitResourceContext, OpExecutionContext

@op(...)
def blah(context: OpExecutionContext):
    ....
    
@resource(...)
def my_resource(context: InitResourceContext):
    ....

@rmwoods
Copy link

rmwoods commented Aug 31, 2022

Looks good @onyekaugochukwu ! I left a couple of comments on some small things, but otherwise it looks like you got it.

As @rsanjabi said, you can create a fork of Dennis' repo and then create your branches off your personal fork. Then your PR's will be into your personal repo and not Dennis' repo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants