Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Vertex gen AI response attributes and gen_ai.choice events #3227

Merged
merged 8 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3208](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3208))
- VertexAI emit user, system, and assistant events
([#3203](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3203))
- Add Vertex gen AI response span attributes
([#3227](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3227))
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
schematized in YAML and the Weaver tool supports it.
"""

from __future__ import annotations

from dataclasses import asdict, dataclass
from typing import Literal

from opentelemetry._events import Event
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
from opentelemetry.util.types import AnyValue
Expand Down Expand Up @@ -89,3 +94,46 @@ def system_event(
},
body=body,
)


@dataclass
class ChoiceMessage:
"""The message field for a gen_ai.choice event"""

content: AnyValue = None
role: str = "assistant"


FinishReason = Literal[
"content_filter", "error", "length", "stop", "tool_calls"
]


# TODO add tool calls
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3216
def choice_event(
*,
finish_reason: FinishReason | str,
index: int,
message: ChoiceMessage,
) -> Event:
"""Creates a choice event, which describes the Gen AI response message.
https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/gen-ai/gen-ai-events.md#event-gen_aichoice
"""
body: dict[str, AnyValue] = {
"finish_reason": finish_reason,
"index": index,
"message": asdict(
message,
# filter nulls
dict_factory=lambda kvs: {k: v for (k, v) in kvs if v is not None},
),
}

return Event(
name="gen_ai.choice",
attributes={
gen_ai_attributes.GEN_AI_SYSTEM: gen_ai_attributes.GenAiSystemValues.VERTEX_AI.value,
},
body=body,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
from opentelemetry.instrumentation.vertexai.utils import (
GenerateContentParams,
get_genai_request_attributes,
get_genai_response_attributes,
get_server_attributes,
get_span_name,
request_to_events,
response_to_events,
)
from opentelemetry.trace import SpanKind, Tracer

Expand Down Expand Up @@ -113,25 +115,28 @@ def traced_method(
name=span_name,
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as _span:
) as span:
for event in request_to_events(
params=params, capture_content=capture_content
):
event_logger.emit(event)

# TODO: set error.type attribute
# https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-spans.md
result = wrapped(*args, **kwargs)
response = wrapped(*args, **kwargs)
# TODO: handle streaming
# if is_streaming(kwargs):
# return StreamWrapper(
# result, span, event_logger, capture_content
# )

# TODO: add response attributes and events
# _set_response_attributes(
# span, result, event_logger, capture_content
# )
return result
if span.is_recording():
span.set_attributes(get_genai_response_attributes(response))
for event in response_to_events(
response=response, capture_content=capture_content
):
event_logger.emit(event)

return response

return traced_method
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

from opentelemetry._events import Event
from opentelemetry.instrumentation.vertexai.events import (
ChoiceMessage,
FinishReason,
assistant_event,
choice_event,
system_event,
user_event,
)
Expand All @@ -39,15 +42,25 @@
from opentelemetry.util.types import AnyValue, AttributeValue

if TYPE_CHECKING:
from google.cloud.aiplatform_v1.types import content, tool
from google.cloud.aiplatform_v1.types import (
content,
prediction_service,
tool,
)
from google.cloud.aiplatform_v1beta1.types import (
content as content_v1beta1,
)
from google.cloud.aiplatform_v1beta1.types import (
prediction_service as prediction_service_v1beta1,
)
from google.cloud.aiplatform_v1beta1.types import (
tool as tool_v1beta1,
)


_MODEL = "model"


@dataclass(frozen=True)
class GenerateContentParams:
model: str
Expand Down Expand Up @@ -137,6 +150,24 @@ def get_genai_request_attributes(
return attributes


def get_genai_response_attributes(
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
) -> dict[str, AttributeValue]:
finish_reasons: list[str] = [
_map_finish_reason(candidate.finish_reason)
for candidate in response.candidates
]
# TODO: add gen_ai.response.id once available in the python client
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3246
return {
aabmass marked this conversation as resolved.
Show resolved Hide resolved
GenAIAttributes.GEN_AI_RESPONSE_MODEL: response.model_version,
GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS: finish_reasons,
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS: response.usage_metadata.prompt_token_count,
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS: response.usage_metadata.candidates_token_count,
}


_MODEL_STRIP_RE = re.compile(
r"^projects/(.*)/locations/(.*)/publishers/google/models/"
)
Expand Down Expand Up @@ -182,7 +213,7 @@ def request_to_events(

for content in params.contents or []:
# Assistant message
if content.role == "model":
if content.role == _MODEL:
request_content = _parts_to_any_value(
capture_content=capture_content, parts=content.parts
)
Expand All @@ -196,6 +227,27 @@ def request_to_events(
yield user_event(role=content.role, content=request_content)


def response_to_events(
*,
response: prediction_service.GenerateContentResponse
| prediction_service_v1beta1.GenerateContentResponse,
capture_content: bool,
) -> Iterable[Event]:
for candidate in response.candidates:
yield choice_event(
finish_reason=_map_finish_reason(candidate.finish_reason),
index=candidate.index,
# default to "model" since Vertex uses that instead of assistant
message=ChoiceMessage(
role=candidate.content.role or _MODEL,
content=_parts_to_any_value(
lzchen marked this conversation as resolved.
Show resolved Hide resolved
capture_content=capture_content,
parts=candidate.content.parts,
),
),
)


def _parts_to_any_value(
*,
capture_content: bool,
Expand All @@ -208,3 +260,22 @@ def _parts_to_any_value(
cast("dict[str, AnyValue]", type(part).to_dict(part)) # type: ignore[reportUnknownMemberType]
for part in parts
]


def _map_finish_reason(
finish_reason: content.Candidate.FinishReason
| content_v1beta1.Candidate.FinishReason,
) -> FinishReason | str:
EnumType = type(finish_reason) # pylint: disable=invalid-name
if (
finish_reason is EnumType.FINISH_REASON_UNSPECIFIED
or finish_reason is EnumType.OTHER
):
return "error"
if finish_reason is EnumType.STOP:
return "stop"
if finish_reason is EnumType.MAX_TOKENS:
return "length"

# If there is no 1:1 mapping to an OTel preferred enum value, use the exact vertex reason
return finish_reason.name
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,53 @@ def test_generate_content(
assert dict(spans[0].attributes) == {
"gen_ai.operation.name": "chat",
"gen_ai.request.model": "gemini-1.5-flash-002",
"gen_ai.response.finish_reasons": ("stop",),
"gen_ai.response.model": "gemini-1.5-flash-002",
"gen_ai.system": "vertex_ai",
"gen_ai.usage.input_tokens": 5,
"gen_ai.usage.output_tokens": 19,
"server.address": "us-central1-aiplatform.googleapis.com",
"server.port": 443,
}

# Emits content event
# Emits user and choice events
logs = log_exporter.get_finished_logs()
assert len(logs) == 1
log_record = logs[0].log_record
assert len(logs) == 2
user_log, choice_log = [log_data.log_record for log_data in logs]

span_context = spans[0].get_span_context()
assert log_record.trace_id == span_context.trace_id
assert log_record.span_id == span_context.span_id
assert log_record.trace_flags == span_context.trace_flags
assert log_record.attributes == {
assert user_log.trace_id == span_context.trace_id
assert user_log.span_id == span_context.span_id
assert user_log.trace_flags == span_context.trace_flags
assert user_log.attributes == {
"gen_ai.system": "vertex_ai",
"event.name": "gen_ai.user.message",
}
assert log_record.body == {
assert user_log.body == {
"content": [{"text": "Say this is a test"}],
"role": "user",
}

assert choice_log.trace_id == span_context.trace_id
assert choice_log.span_id == span_context.span_id
assert choice_log.trace_flags == span_context.trace_flags
assert choice_log.attributes == {
"gen_ai.system": "vertex_ai",
"event.name": "gen_ai.choice",
}
assert choice_log.body == {
"finish_reason": "stop",
"index": 0,
"message": {
"content": [
{
"text": "Okay, I understand. I'm ready for your test. Please proceed.\n"
}
],
"role": "model",
},
}


@pytest.mark.vcr
def test_generate_content_without_events(
Expand All @@ -81,20 +106,34 @@ def test_generate_content_without_events(
assert dict(spans[0].attributes) == {
"gen_ai.operation.name": "chat",
"gen_ai.request.model": "gemini-1.5-flash-002",
"gen_ai.response.finish_reasons": ("stop",),
"gen_ai.response.model": "gemini-1.5-flash-002",
"gen_ai.system": "vertex_ai",
"gen_ai.usage.input_tokens": 5,
"gen_ai.usage.output_tokens": 19,
"server.address": "us-central1-aiplatform.googleapis.com",
"server.port": 443,
}

# Emits event without body.content
# Emits user and choice event without body.content
logs = log_exporter.get_finished_logs()
assert len(logs) == 1
log_record = logs[0].log_record
assert log_record.attributes == {
assert len(logs) == 2
user_log, choice_log = [log_data.log_record for log_data in logs]
assert user_log.attributes == {
"gen_ai.system": "vertex_ai",
"event.name": "gen_ai.user.message",
}
assert log_record.body == {"role": "user"}
assert user_log.body == {"role": "user"}

assert choice_log.attributes == {
"gen_ai.system": "vertex_ai",
"event.name": "gen_ai.choice",
}
assert choice_log.body == {
"finish_reason": "stop",
"index": 0,
"message": {"role": "model"},
}


@pytest.mark.vcr
Expand Down Expand Up @@ -255,7 +294,11 @@ def test_generate_content_extra_params(span_exporter, instrument_no_content):
"gen_ai.request.stop_sequences": ("\n\n\n",),
"gen_ai.request.temperature": 0.20000000298023224,
"gen_ai.request.top_p": 0.949999988079071,
"gen_ai.response.finish_reasons": ("length",),
"gen_ai.response.model": "gemini-1.5-flash-002",
"gen_ai.system": "vertex_ai",
"gen_ai.usage.input_tokens": 5,
"gen_ai.usage.output_tokens": 5,
"server.address": "us-central1-aiplatform.googleapis.com",
"server.port": 443,
}
Expand All @@ -274,7 +317,7 @@ def assert_span_error(span: ReadableSpan) -> None:


@pytest.mark.vcr
def test_generate_content_all_input_events(
def test_generate_content_all_events(
log_exporter: InMemoryLogExporter,
instrument_with_content: VertexAIInstrumentor,
):
Expand All @@ -299,10 +342,10 @@ def test_generate_content_all_input_events(
],
)

# Emits a system event, 2 users events, and a assistant event
# Emits a system event, 2 users events, an assistant event, and the choice (response) event
logs = log_exporter.get_finished_logs()
assert len(logs) == 4
system_log, user_log1, assistant_log, user_log2 = [
assert len(logs) == 5
system_log, user_log1, assistant_log, user_log2, choice_log = [
log_data.log_record for log_data in logs
]

Expand Down Expand Up @@ -342,3 +385,16 @@ def test_generate_content_all_input_events(
"content": [{"text": "Address me by name and say this is a test"}],
"role": "user",
}

assert choice_log.attributes == {
"gen_ai.system": "vertex_ai",
"event.name": "gen_ai.choice",
}
assert choice_log.body == {
"finish_reason": "stop",
"index": 0,
"message": {
"content": [{"text": "OpenTelemetry, this is a test.\n"}],
"role": "model",
},
}
Loading