diff --git a/src/prefect/utilities/engine.py b/src/prefect/utilities/engine.py index 65c2dfbda7288..c8072b4f9721a 100644 --- a/src/prefect/utilities/engine.py +++ b/src/prefect/utilities/engine.py @@ -757,12 +757,19 @@ def resolve_to_final_result(expr: Any, context: dict[str, Any]) -> Any: parameter_context = propagate.extract( {"traceparent": state.state_details.traceparent} ) - trace.get_current_span().add_link( - context=trace.get_current_span(parameter_context).get_span_context(), - attributes={ + attributes = {} + + # If this future is being used as a parameter (as opposed to just a wait_for), + # add attributes to the span to indicate the parameter name and type + if "parameter_name" in context: + attributes = { "prefect.input.name": context["parameter_name"], "prefect.input.type": type(result).__name__, - }, + } + + trace.get_current_span().add_link( + context=trace.get_current_span(parameter_context).get_span_context(), + attributes=attributes, ) return result diff --git a/tests/telemetry/test_run_telemetry.py b/tests/telemetry/test_run_telemetry.py index 5c41bc075e5b3..97cd3a21208d3 100644 --- a/tests/telemetry/test_run_telemetry.py +++ b/tests/telemetry/test_run_telemetry.py @@ -333,6 +333,60 @@ def sync_flow(): } +async def test_span_links_wait_for_only( + engine_type: Literal["async", "sync"], + instrumentation: InstrumentationTester, +): + """Regression test for https://github.com/PrefectHQ/prefect/issues/16708, where + we are looking for the parameter name of a future that is only used for waiting""" + + @task(task_run_name="produces42") + def produces42() -> int: + return 42 + + if engine_type == "async": + + @task(task_run_name="async_task") + async def async_task(): + return "hi" + + @flow(flow_run_name="async-flow") + async def async_flow(): + f = produces42.submit() + await async_task(wait_for=[f]) + + await async_flow() + else: + + @task(task_run_name="sync_task") + def sync_task(): + return "hi" + + @flow(flow_run_name="sync-flow") + def sync_flow(): + f = produces42.submit() + sync_task(wait_for=[f]) + + sync_flow() + + spans = instrumentation.get_finished_spans() + + assert len(spans) == 3 # flow, producer, task + + flow_span = next(span for span in spans if span.name.endswith("-flow")) + task_span = next(span for span in spans if span.name.endswith("_task")) + producer_span = next(span for span in spans if span.name == "produces42") + + assert not flow_span.links + assert not producer_span.links + + assert len(task_span.links) == 1 + link = task_span.links[0] + assert link.context.trace_id == producer_span.context.trace_id + assert link.context.span_id == producer_span.context.span_id + assert link.attributes == {} + + async def test_span_status_on_success( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester,