Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MLOB-2340] feat(vertexai): add @google-cloud/vertexai instrumentation #5369

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,14 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/plugins/test

google-cloud-vertexai:
runs-on: ubuntu-latest
env:
PLUGINS: google-cloud-vertexai
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/plugins/test

graphql:
runs-on: ubuntu-latest
env:
Expand Down
2 changes: 2 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@
/packages/dd-trace/test/llmobs/ @DataDog/ml-observability
/packages/datadog-plugin-openai/ @DataDog/ml-observability
/packages/datadog-plugin-langchain/ @DataDog/ml-observability
/packages/datadog-plugin-google-cloud-vertexai/ @DataDog/ml-observability
/packages/datadog-instrumentations/src/openai.js @DataDog/ml-observability
/packages/datadog-instrumentations/src/langchain.js @DataDog/ml-observability
/packages/datadog-instrumentations/src/google-cloud-vertexai.js @DataDog/ml-observability
/packages/datadog-plugin-aws-sdk/src/services/bedrockruntime @DataDog/ml-observability
/packages/datadog-plugin-aws-sdk/test/bedrockruntime.spec.js @DataDog/ml-observability

Expand Down
1 change: 1 addition & 0 deletions docs/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ tracer.use('fetch');
tracer.use('fetch', httpClientOptions);
tracer.use('generic-pool');
tracer.use('google-cloud-pubsub');
tracer.use('google-cloud-vertexai');
tracer.use('graphql');
tracer.use('graphql', graphqlOptions);
tracer.use('graphql', { variables: ['foo', 'bar'] });
Expand Down
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ interface Plugins {
"fetch": tracer.plugins.fetch;
"generic-pool": tracer.plugins.generic_pool;
"google-cloud-pubsub": tracer.plugins.google_cloud_pubsub;
"google-cloud-vertexai": tracer.plugins.google_cloud_vertexai;
"graphql": tracer.plugins.graphql;
"grpc": tracer.plugins.grpc;
"hapi": tracer.plugins.hapi;
Expand Down Expand Up @@ -1370,6 +1371,12 @@ declare namespace tracer {
*/
interface google_cloud_pubsub extends Integration {}

/**
* This plugin automatically instruments the
* [@google-cloud/vertexai](https://github.com/googleapis/nodejs-vertexai) module.
*/
interface google_cloud_vertexai extends Integration {}

/** @hidden */
interface ExecutionArgs {
schema: any,
Expand Down
102 changes: 102 additions & 0 deletions packages/datadog-instrumentations/src/google-cloud-vertexai.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict'

const { addHook } = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

const vertexaiTracingChannel = require('dc-polyfill').tracingChannel('apm:vertexai:request')

function wrapGenerate (generate) {
return function (request) {
if (!vertexaiTracingChannel.start.hasSubscribers) {
return generate.apply(this, arguments)
}

const ctx = {
request,
instance: this,
resource: [this.constructor.name, generate.name].join('.')
}

return vertexaiTracingChannel.tracePromise(generate, ctx, this, ...arguments)
}
}

function wrapGenerateStream (generateStream) {
return function (request) {
if (!vertexaiTracingChannel.start.hasSubscribers) {
return generateStream.apply(this, arguments)
}

const ctx = {
request,
instance: this,
resource: [this.constructor.name, generateStream.name].join('.'),
stream: true
}

return vertexaiTracingChannel.start.runStores(ctx, () => {
let streamingResult
try {
streamingResult = generateStream.apply(this, arguments)
} catch (e) {
finish(ctx, null, e, true)
throw e
}

vertexaiTracingChannel.end.publish(ctx)

return streamingResult.then(stream => {
stream.response.then(response => {
finish(ctx, response, null)
}).catch(e => {
finish(ctx, null, e)
throw e
})

return stream
}).catch(e => {
finish(ctx, null, e)
throw e
})
})
}
}

function finish (ctx, response, err, publishEndEvent = false) {
if (err) {
ctx.error = err
vertexaiTracingChannel.error.publish(ctx)
}

ctx.result = { response }

if (publishEndEvent) vertexaiTracingChannel.end.publish(ctx)

vertexaiTracingChannel.asyncEnd.publish(ctx)
}

addHook({
name: '@google-cloud/vertexai',
file: 'build/src/models/generative_models.js',
versions: ['>=1']
}, exports => {
const GenerativeModel = exports.GenerativeModel

shimmer.wrap(GenerativeModel.prototype, 'generateContent', wrapGenerate)
shimmer.wrap(GenerativeModel.prototype, 'generateContentStream', wrapGenerateStream)

return exports
})

addHook({
name: '@google-cloud/vertexai',
file: 'build/src/models/chat_session.js',
versions: ['>=1']
}, exports => {
const ChatSession = exports.ChatSession

shimmer.wrap(ChatSession.prototype, 'sendMessage', wrapGenerate)
shimmer.wrap(ChatSession.prototype, 'sendMessageStream', wrapGenerateStream)

return exports
})
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
'@elastic/elasticsearch': () => require('../elasticsearch'),
'@elastic/transport': () => require('../elasticsearch'),
'@google-cloud/pubsub': () => require('../google-cloud-pubsub'),
'@google-cloud/vertexai': () => require('../google-cloud-vertexai'),
'@graphql-tools/executor': () => require('../graphql'),
'@grpc/grpc-js': () => require('../grpc'),
'@hapi/hapi': () => require('../hapi'),
Expand Down
195 changes: 195 additions & 0 deletions packages/datadog-plugin-google-cloud-vertexai/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
'use strict'

const { MEASURED } = require('../../../ext/tags')
const { storage } = require('../../datadog-core')
const TracingPlugin = require('../../dd-trace/src/plugins/tracing')
const makeUtilities = require('../../dd-trace/src/plugins/util/llm')

class GoogleCloudVertexAIPlugin extends TracingPlugin {
static get id () { return 'google-cloud-vertexai' }
static get prefix () {
return 'tracing:apm:vertexai:request'
}

constructor () {
super(...arguments)

Object.assign(this, makeUtilities('vertexai', this._tracerConfig))
}

bindStart (ctx) {
const { instance, request, resource, stream } = ctx

const tags = this.tagRequest(request, instance, stream)

const span = this.startSpan('vertexai.request', {
service: this.config.service,
resource,
kind: 'client',
meta: {
[MEASURED]: 1,
...tags
}
}, false)

const store = storage('legacy').getStore() || {}
ctx.currentStore = { ...store, span }

return ctx.currentStore
}

asyncEnd (ctx) {
const span = ctx.currentStore?.span
if (!span) return

const { result } = ctx

const response = result?.response
if (response) {
const tags = this.tagResponse(response)
span.addTags(tags)
}

span.finish()
}

tagRequest (request, instance, stream) {
const model = extractModel(instance)
const tags = {
'vertexai.request.model': model
}

const history = instance.historyInternal
let contents = typeof request === 'string' || Array.isArray(request) ? request : request.contents
if (history) {
contents = [...history, ...(Array.isArray(contents) ? contents : [contents])]
}

const generationConfig = instance.generationConfig || {}
for (const key of Object.keys(generationConfig)) {
const transformedKey = key.replace(/([a-z0-9])([A-Z])/g, '$1_$2').toLowerCase()
tags[`vertexai.request.generation_config.${transformedKey}`] = JSON.stringify(generationConfig[key])
}

if (stream) {
tags['vertexai.request.stream'] = true
}

if (!this.isPromptCompletionSampled()) return tags

const systemInstructions = extractSystemInstructions(instance)

for (const [idx, systemInstruction] of systemInstructions.entries()) {
tags[`vertexai.request.system_instruction.${idx}.text`] = systemInstruction
}

if (typeof contents === 'string') {
tags['vertexai.request.contents.0.text'] = contents
return tags
}

for (const [contentIdx, content] of contents.entries()) {
this.tagRequestContent(tags, content, contentIdx)
}

return tags
}

tagRequestPart (part, tags, partIdx, contentIdx) {
tags[`vertexai.request.contents.${contentIdx}.parts.${partIdx}.text`] = this.normalize(part.text)

const functionCall = part.functionCall
const functionResponse = part.functionResponse

if (functionCall) {
tags[`vertexai.request.contents.${contentIdx}.parts.${partIdx}.function_call.name`] = functionCall.name
tags[`vertexai.request.contents.${contentIdx}.parts.${partIdx}.function_call.args`] =
this.normalize(JSON.stringify(functionCall.args))
}
if (functionResponse) {
tags[`vertexai.request.contents.${contentIdx}.parts.${partIdx}.function_response.name`] =
functionResponse.name
tags[`vertexai.request.contents.${contentIdx}.parts.${partIdx}.function_response.response`] =
this.normalize(JSON.stringify(functionResponse.response))
}
}

tagRequestContent (tags, content, contentIdx) {
if (typeof content === 'string') {
tags[`vertexai.request.contents.${contentIdx}.text`] = this.normalize(content)
return
}

if (content.text || content.functionCall || content.functionResponse) {
this.tagRequestPart(content, tags, 0, contentIdx)
return
}

const { role, parts } = content
if (role) {
tags[`vertexai.request.contents.${contentIdx}.role`] = role
}

for (const [partIdx, part] of parts.entries()) {
this.tagRequestPart(part, tags, partIdx, contentIdx)
}
}

tagResponse (response) {
const tags = {}

const candidates = response.candidates
for (const [candidateIdx, candidate] of candidates.entries()) {
const finishReason = candidate.finishReason
if (finishReason) {
tags[`vertexai.response.candidates.${candidateIdx}.finish_reason`] = finishReason
}
const candidateContent = candidate.content
const role = candidateContent.role
tags[`vertexai.response.candidates.${candidateIdx}.content.role`] = role

if (!this.isPromptCompletionSampled()) continue

const parts = candidateContent.parts
for (const [partIdx, part] of parts.entries()) {
const text = part.text
tags[`vertexai.response.candidates.${candidateIdx}.content.parts.${partIdx}.text`] =
this.normalize(String(text))

const functionCall = part.functionCall
if (!functionCall) continue

tags[`vertexai.response.candidates.${candidateIdx}.content.parts.${partIdx}.function_call.name`] =
functionCall.name
tags[`vertexai.response.candidates.${candidateIdx}.content.parts.${partIdx}.function_call.args`] =
this.normalize(JSON.stringify(functionCall.args))
}
}

const tokenCounts = response.usageMetadata
if (tokenCounts) {
tags['vertexai.response.usage.prompt_tokens'] = tokenCounts.promptTokenCount
tags['vertexai.response.usage.completion_tokens'] = tokenCounts.candidatesTokenCount
tags['vertexai.response.usage.total_tokens'] = tokenCounts.totalTokenCount
}

return tags
}
}

function extractModel (instance) {
const model = instance.model || instance.resourcePath || instance.publisherModelEndpoint
return model?.split('/').pop()
}

function extractSystemInstructions (instance) {
// systemInstruction is either a string or a Content object
// Content objects have parts (Part[]) and a role
const systemInstruction = instance.systemInstruction
if (!systemInstruction) return []
if (typeof systemInstruction === 'string') return [systemInstruction]

return systemInstruction.parts?.map(part => part.text)
}

module.exports = GoogleCloudVertexAIPlugin
Loading
Loading