Skip to content

Commit

Permalink
Handles the instrumentation case where a future is only used to wait_…
Browse files Browse the repository at this point in the history
…for (#16709)
  • Loading branch information
chrisguidry authored and zzstoatzz committed Jan 14, 2025
1 parent 89934a0 commit bfb34a7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/prefect/utilities/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,19 @@ def resolve_to_final_result(expr: Any, context: dict[str, Any]) -> Any:
parameter_context = propagate.extract(
{"traceparent": state.state_details.traceparent}
)
trace.get_current_span().add_link(
context=trace.get_current_span(parameter_context).get_span_context(),
attributes={
attributes = {}

# If this future is being used as a parameter (as opposed to just a wait_for),
# add attributes to the span to indicate the parameter name and type
if "parameter_name" in context:
attributes = {
"prefect.input.name": context["parameter_name"],
"prefect.input.type": type(result).__name__,
},
}

trace.get_current_span().add_link(
context=trace.get_current_span(parameter_context).get_span_context(),
attributes=attributes,
)

return result
Expand Down
54 changes: 54 additions & 0 deletions tests/telemetry/test_run_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,60 @@ def sync_flow():
}


async def test_span_links_wait_for_only(
engine_type: Literal["async", "sync"],
instrumentation: InstrumentationTester,
):
"""Regression test for https://github.com/PrefectHQ/prefect/issues/16708, where
we are looking for the parameter name of a future that is only used for waiting"""

@task(task_run_name="produces42")
def produces42() -> int:
return 42

if engine_type == "async":

@task(task_run_name="async_task")
async def async_task():
return "hi"

@flow(flow_run_name="async-flow")
async def async_flow():
f = produces42.submit()
await async_task(wait_for=[f])

await async_flow()
else:

@task(task_run_name="sync_task")
def sync_task():
return "hi"

@flow(flow_run_name="sync-flow")
def sync_flow():
f = produces42.submit()
sync_task(wait_for=[f])

sync_flow()

spans = instrumentation.get_finished_spans()

assert len(spans) == 3 # flow, producer, task

flow_span = next(span for span in spans if span.name.endswith("-flow"))
task_span = next(span for span in spans if span.name.endswith("_task"))
producer_span = next(span for span in spans if span.name == "produces42")

assert not flow_span.links
assert not producer_span.links

assert len(task_span.links) == 1
link = task_span.links[0]
assert link.context.trace_id == producer_span.context.trace_id
assert link.context.span_id == producer_span.context.span_id
assert link.attributes == {}


async def test_span_status_on_success(
engine_type: Literal["async", "sync"],
instrumentation: InstrumentationTester,
Expand Down

0 comments on commit bfb34a7

Please sign in to comment.