Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update tutorial for clarity #16354

Merged
merged 4 commits into from
Dec 12, 2024
Merged

update tutorial for clarity #16354

merged 4 commits into from
Dec 12, 2024

Conversation

zzstoatzz
Copy link
Collaborator

@zzstoatzz zzstoatzz commented Dec 12, 2024

makes a couple updates to the Build a data pipeline tutorial

  • existing use of .submit()/ .result() is bit more verbose and I would rather recommend using sequential .map calls in cases where the io actually takes a while, bc in this example the second loop blocks at each result() call for each item, instead of resolving futures concurrently like .map(list[T]).result(). i also think its easier to read / reason about
for example

slower

image

faster

image

import time
from typing import Any

from prefect import flow, task


@task
def slow_api_call(item: str) -> dict[str, Any]:
    """Simulate a slow API call that takes 1 second"""
    time.sleep(1)  # Simulate network delay
    return {"item": item, "data": f"data for {item}"}


@task
def slow_processing(data: dict[str, Any]) -> str:
    """Simulate slow processing that takes 0.5 seconds"""
    time.sleep(0.5)  # Simulate processing time
    return f"Processed {data['item']}: {data['data']}"


@flow(name="inefficient-flow")
def process_inefficiently(items: list[str]) -> None:
    """
    Inefficient approach using submit/result in sequence.
    For N items, this takes roughly N * (1 + 0.5) seconds because we block
    on each result() call.
    """
    print("\nStarting inefficient flow...")
    start_time = time.time()

    # First phase: submit API calls
    api_futures = []
    for item in items:
        api_futures.append({"item": item, "future": slow_api_call.submit(item)})

    # Second phase: wait for each API call and process
    # This blocks sequentially on each result() call!
    for future_info in api_futures:
        item = future_info["item"]
        api_result = future_info["future"].result()  # Blocks here!
        processed = slow_processing(api_result)  # Waits for processing
        print(f"Completed {item}: {processed}")

    duration = time.time() - start_time
    print(f"Inefficient flow took {duration:.2f} seconds")


@flow(name="efficient-flow")
def process_efficiently(items: list[str]) -> None:
    """
    Efficient approach using map.
    For N items, this takes roughly max(1, 0.5) * N seconds because
    all futures are resolved concurrently.
    """
    print("\nStarting efficient flow...")
    start_time = time.time()

    # Map over all items for API calls
    api_results = slow_api_call.map(items)

    # Map over all results for processing
    # This resolves all futures concurrently!
    processed = slow_processing.map(api_results).result()

    # Print results
    for item, result in zip(items, processed):
        print(f"Completed {item}: {result}")

    duration = time.time() - start_time
    print(f"Efficient flow took {duration:.2f} seconds")


if __name__ == "__main__":
    test_items = ["item1", "item2", "item3", "item4"]

    # Run both flows to compare
    process_inefficiently(test_items)
    process_efficiently(test_items)
  • moves rate_limit into the task. happy to reconsider this, I was just thinking that i'd want my task runtimes to reflect time waiting for the github api to free up - but i could also see situations where you might want to avoid submission until some resource is free

  • keeping with the theme of the tutorial, updates the full examples to compound use of adopted features throughout the page (i.e. continue to use retries=3 even when adding caching or rate limiting)

  • in normal convention, defines tasks that are referenced in the show_stars flow before said flow

  • uses proper type hinting

Copy link
Contributor

@discdiver discdiver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggested edits.

docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Show resolved Hide resolved
docs/v3/tutorials/pipelines.mdx Outdated Show resolved Hide resolved
@zzstoatzz zzstoatzz force-pushed the improve-data-pipeline-tutorial branch from 451e71e to 73498a8 Compare December 12, 2024 18:14
@zzstoatzz
Copy link
Collaborator Author

good suggestions @discdiver ! updated in 73498a8

@zzstoatzz zzstoatzz enabled auto-merge (squash) December 12, 2024 18:37
@zzstoatzz zzstoatzz merged commit b2faff3 into main Dec 12, 2024
5 checks passed
@zzstoatzz zzstoatzz deleted the improve-data-pipeline-tutorial branch December 12, 2024 18:45
EmilRex pushed a commit that referenced this pull request Dec 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants