Skip to content

Commit

Permalink
fix instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
sabrenner committed Feb 13, 2025
1 parent 710a873 commit 3d968fa
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 91 deletions.
6 changes: 3 additions & 3 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ module.exports = {
'@jest/test-sequencer': () => require('../jest'),
'@jest/transform': () => require('../jest'),
'@koa/router': () => require('../koa'),
'@langchain/core': () => require('../langchain'),
'@langchain/openai': () => require('../langchain'),
'@langchain/core': { esmFirst: true, fn: () => require('../langchain') },
'@langchain/openai': { esmFirst: true, fn: () => require('../langchain') },
'@node-redis/client': () => require('../redis'),
'@opensearch-project/opensearch': () => require('../opensearch'),
'@opentelemetry/sdk-trace-node': () => require('../otel-sdk-trace'),
Expand Down Expand Up @@ -100,7 +100,7 @@ module.exports = {
'node:vm': () => require('../vm'),
nyc: () => require('../nyc'),
oracledb: () => require('../oracledb'),
openai: () => require('../openai'),
openai: { esmFirst: true, fn: () => require('../openai') },
paperplane: () => require('../paperplane'),
passport: () => require('../passport'),
'passport-http': () => require('../passport-http'),
Expand Down
180 changes: 92 additions & 88 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,92 +8,92 @@ const ch = dc.tracingChannel('apm:openai:request')

const V4_PACKAGE_SHIMS = [
{
file: 'resources/chat/completions.js',
file: 'resources/chat/completions',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/completions.js',
file: 'resources/completions',
targetClass: 'Completions',
baseResource: 'completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/embeddings.js',
file: 'resources/embeddings',
targetClass: 'Embeddings',
baseResource: 'embeddings',
methods: ['create']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['create', 'del', 'list', 'retrieve']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['retrieveContent'],
versions: ['>=4.0.0 <4.17.1']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['content'], // replaced `retrieveContent` in v4.17.1
versions: ['>=4.17.1']
},
{
file: 'resources/images.js',
file: 'resources/images',
targetClass: 'Images',
baseResource: 'images',
methods: ['createVariation', 'edit', 'generate']
},
{
file: 'resources/fine-tuning/jobs/jobs.js',
file: 'resources/fine-tuning/jobs/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.34.0'] // file location changed in 4.34.0
},
{
file: 'resources/fine-tuning/jobs.js',
file: 'resources/fine-tuning/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.1.0 <4.34.0']
},
{
file: 'resources/fine-tunes.js', // deprecated after 4.1.0
file: 'resources/fine-tunes', // deprecated after 4.1.0
targetClass: 'FineTunes',
baseResource: 'fine-tune',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.0.0 <4.1.0']
},
{
file: 'resources/models.js',
file: 'resources/models',
targetClass: 'Models',
baseResource: 'models',
methods: ['del', 'list', 'retrieve']
},
{
file: 'resources/moderations.js',
file: 'resources/moderations',
targetClass: 'Moderations',
baseResource: 'moderations',
methods: ['create']
},
{
file: 'resources/audio/transcriptions.js',
file: 'resources/audio/transcriptions',
targetClass: 'Transcriptions',
baseResource: 'audio.transcriptions',
methods: ['create']
},
{
file: 'resources/audio/translations.js',
file: 'resources/audio/translations',
targetClass: 'Translations',
baseResource: 'audio.translations',
methods: ['create']
Expand Down Expand Up @@ -259,93 +259,97 @@ function wrapStreamIterator (response, options, n, ctx) {
}
}

for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype
const extensions = ['js', 'mjs']

for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
for (const extension of extensions) {
for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file: file + extension, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype

// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
}

const client = this._client || this.client
// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
}
}

const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}
const client = this._client || this.client

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)

// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
return origApiPromParse.apply(this, arguments)
// the original response is wrapped in a promise, so we need to unwrap it
.then(body => Promise.all([this.responsePromise, body]))
.then(([{ response, options }, body]) => {
if (stream) {
if (body.iterator) {
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, n, ctx))
const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)

// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
return origApiPromParse.apply(this, arguments)
// the original response is wrapped in a promise, so we need to unwrap it
.then(body => Promise.all([this.responsePromise, body]))
.then(([{ response, options }, body]) => {
if (stream) {
if (body.iterator) {
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, n, ctx))
} else {
shimmer.wrap(
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, n, ctx)
)
}
} else {
shimmer.wrap(
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, n, ctx)
)
finish(ctx, {
headers: response.headers,
data: body,
request: {
path: response.url,
method: options.method
}
})
}
} else {
finish(ctx, {
headers: response.headers,
data: body,
request: {
path: response.url,
method: options.method
}
})
}

return body
})
.catch(error => {
finish(ctx, undefined, error)
return body
})
.catch(error => {
finish(ctx, undefined, error)

throw error
})
.finally(() => {
// maybe we don't want to unwrap here in case the promise is re-used?
// other hand: we want to avoid resource leakage
shimmer.unwrap(apiProm, 'parse')
})
})
throw error
})
.finally(() => {
// maybe we don't want to unwrap here in case the promise is re-used?
// other hand: we want to avoid resource leakage
shimmer.unwrap(apiProm, 'parse')
})
})

ch.end.publish(ctx)
ch.end.publish(ctx)

return apiProm
return apiProm
})
})
})
}
return exports
})
}
return exports
})
}
}

function finish (ctx, response, error) {
Expand Down

0 comments on commit 3d968fa

Please sign in to comment.