Skip to content

Commit

Permalink
[MLOB-2240] fix(openai, langchain, llmobs): OpenAI and LangChain inst…
Browse files Browse the repository at this point in the history
…rumentation is ESM-compatible (#5267)

* add module register hooks

* fix instrumentation

* re-enable tests

* file extension fix for openai

* Update packages/datadog-plugin-openai/test/integration-test/client.spec.js

* trigger ci

* change openai instrumentation

* fix for new openai version

* try fix for --loader

* name arguments

* name arguments
  • Loading branch information
sabrenner authored Feb 20, 2025
1 parent 23abc09 commit 96bb84a
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 100 deletions.
12 changes: 8 additions & 4 deletions initialize.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ ${result.source}`
const [NODE_MAJOR, NODE_MINOR] = process.versions.node.split('.').map(x => +x)

const brokenLoaders = NODE_MAJOR === 18 && NODE_MINOR === 0
const iitmExclusions = [/langsmith/, /openai\/_shims/, /openai\/resources\/chat\/completions\/messages/]

export async function load (...args) {
const loadHook = brokenLoaders ? args[args.length - 1] : origLoad
return insertInit(await loadHook(...args))
export async function load (url, context, nextLoad) {
const iitmExclusionsMatch = iitmExclusions.some((exclusion) => exclusion.test(url))
const loadHook = (brokenLoaders || iitmExclusionsMatch) ? nextLoad : origLoad
return insertInit(await loadHook(url, context, nextLoad))
}

export const resolve = brokenLoaders ? undefined : origResolve
Expand All @@ -53,6 +55,8 @@ if (isMainThread) {
const require = Module.createRequire(import.meta.url)
require('./init.js')
if (Module.register) {
Module.register('./loader-hook.mjs', import.meta.url)
Module.register('./loader-hook.mjs', import.meta.url, {
data: { exclude: iitmExclusions }
})
}
}
6 changes: 3 additions & 3 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ module.exports = {
'@jest/test-sequencer': () => require('../jest'),
'@jest/transform': () => require('../jest'),
'@koa/router': () => require('../koa'),
'@langchain/core': () => require('../langchain'),
'@langchain/openai': () => require('../langchain'),
'@langchain/core': { esmFirst: true, fn: () => require('../langchain') },
'@langchain/openai': { esmFirst: true, fn: () => require('../langchain') },
'@node-redis/client': () => require('../redis'),
'@opensearch-project/opensearch': () => require('../opensearch'),
'@opentelemetry/sdk-trace-node': () => require('../otel-sdk-trace'),
Expand Down Expand Up @@ -100,7 +100,7 @@ module.exports = {
'node:vm': () => require('../vm'),
nyc: () => require('../nyc'),
oracledb: () => require('../oracledb'),
openai: () => require('../openai'),
openai: { esmFirst: true, fn: () => require('../openai') },
paperplane: () => require('../paperplane'),
passport: () => require('../passport'),
'passport-http': () => require('../passport-http'),
Expand Down
152 changes: 78 additions & 74 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,98 +8,98 @@ const ch = dc.tracingChannel('apm:openai:request')

const V4_PACKAGE_SHIMS = [
{
file: 'resources/chat/completions.js',
file: 'resources/chat/completions',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/completions.js',
file: 'resources/completions',
targetClass: 'Completions',
baseResource: 'completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/embeddings.js',
file: 'resources/embeddings',
targetClass: 'Embeddings',
baseResource: 'embeddings',
methods: ['create']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['create', 'del', 'list', 'retrieve']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['retrieveContent'],
versions: ['>=4.0.0 <4.17.1']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['content'], // replaced `retrieveContent` in v4.17.1
versions: ['>=4.17.1']
},
{
file: 'resources/images.js',
file: 'resources/images',
targetClass: 'Images',
baseResource: 'images',
methods: ['createVariation', 'edit', 'generate']
},
{
file: 'resources/fine-tuning/jobs/jobs.js',
file: 'resources/fine-tuning/jobs/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.34.0'] // file location changed in 4.34.0
},
{
file: 'resources/fine-tuning/jobs.js',
file: 'resources/fine-tuning/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.1.0 <4.34.0']
},
{
file: 'resources/fine-tunes.js', // deprecated after 4.1.0
file: 'resources/fine-tunes', // deprecated after 4.1.0
targetClass: 'FineTunes',
baseResource: 'fine-tune',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.0.0 <4.1.0']
},
{
file: 'resources/models.js',
file: 'resources/models',
targetClass: 'Models',
baseResource: 'models',
methods: ['del', 'list', 'retrieve']
},
{
file: 'resources/moderations.js',
file: 'resources/moderations',
targetClass: 'Moderations',
baseResource: 'moderations',
methods: ['create']
},
{
file: 'resources/audio/transcriptions.js',
file: 'resources/audio/transcriptions',
targetClass: 'Transcriptions',
baseResource: 'audio.transcriptions',
methods: ['create']
},
{
file: 'resources/audio/translations.js',
file: 'resources/audio/translations',
targetClass: 'Translations',
baseResource: 'audio.translations',
methods: ['create']
},
{
file: 'resources/chat/completions/completions.js',
file: 'resources/chat/completions/completions',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create'],
Expand Down Expand Up @@ -267,82 +267,86 @@ function wrapStreamIterator (response, options, n, ctx) {
}
}

for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype
const extensions = ['.js', '.mjs']

for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
for (const extension of extensions) {
for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file: file + extension, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype

// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
}

const client = this._client || this.client
// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
}
}

const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}
const client = this._client || this.client

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)
const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)

if (baseResource === 'chat.completions' && typeof apiProm._thenUnwrap === 'function') {
// this should only ever be invoked from a client.beta.chat.completions.parse call
shimmer.wrap(apiProm, '_thenUnwrap', origApiPromThenUnwrap => function () {
// TODO(sam.brenner): I wonder if we can patch the APIPromise prototype instead, although
// we might not have access to everything we need...
if (baseResource === 'chat.completions' && typeof apiProm._thenUnwrap === 'function') {
// this should only ever be invoked from a client.beta.chat.completions.parse call
shimmer.wrap(apiProm, '_thenUnwrap', origApiPromThenUnwrap => function () {
// TODO(sam.brenner): I wonder if we can patch the APIPromise prototype instead, although
// we might not have access to everything we need...

// this is a new apipromise instance
const unwrappedPromise = origApiPromThenUnwrap.apply(this, arguments)
// this is a new apipromise instance
const unwrappedPromise = origApiPromThenUnwrap.apply(this, arguments)

shimmer.wrap(unwrappedPromise, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
shimmer.wrap(unwrappedPromise, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})

return unwrappedPromise
})
}

return unwrappedPromise
// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})
}

// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
ch.end.publish(ctx)

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
return apiProm
})

ch.end.publish(ctx)

return apiProm
})
})
}
return exports
})
}
return exports
})
}
}

function handleUnwrappedAPIPromise (apiProm, ctx, stream, n) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ const {
} = require('../../../../integration-tests/helpers')
const { assert } = require('chai')

// there is currently an issue with langchain + esm loader hooks from IITM
// https://github.com/nodejs/import-in-the-middle/issues/163
describe.skip('esm', () => {
describe('esm', () => {
let agent
let proc
let sandbox
Expand Down Expand Up @@ -47,7 +45,9 @@ describe.skip('esm', () => {
assert.strictEqual(checkSpansForServiceName(payload, 'langchain.request'), true)
})

proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port)
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port, null, {
NODE_OPTIONS: '--import dd-trace/register.js'
})

await res
}).timeout(20000)
Expand Down
14 changes: 12 additions & 2 deletions packages/datadog-plugin-langchain/test/integration-test/server.mjs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import 'dd-trace/init.js'

import { OpenAI } from '@langchain/openai'
import { StringOutputParser } from '@langchain/core/output_parsers'
import nock from 'nock'

nock('https://api.openai.com:443')
.post('/v1/completions')
.reply(200, {})
.reply(200, {
model: 'gpt-3.5-turbo-instruct',
choices: [{
text: 'The answer is 4',
index: 0,
logprobs: null,
finish_reason: 'length'
}],
usage: { prompt_tokens: 8, completion_tokens: 12, otal_tokens: 20 }
})

const llm = new OpenAI({
apiKey: '<not-a-real-key>'
Expand All @@ -15,4 +25,4 @@ const parser = new StringOutputParser()

const chain = llm.pipe(parser)

await chain.invoke('a test')
await chain.invoke('what is 2 + 2?')
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ describe('esm', () => {
let sandbox

// limit v4 tests while the IITM issue is resolved or a workaround is introduced
// this is only relevant for `openai` >=4.0 <=4.1
// issue link: https://github.com/DataDog/import-in-the-middle/issues/60
withVersions('openai', 'openai', '>=3 <4', version => {
withVersions('openai', 'openai', '>=3 <4.0.0 || >4.1.0', version => {
before(async function () {
this.timeout(20000)
sandbox = await createSandbox([`'openai@${version}'`, 'nock'], false, [
Expand All @@ -43,7 +44,9 @@ describe('esm', () => {
assert.strictEqual(checkSpansForServiceName(payload, 'openai.request'), true)
})

proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port)
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port, null, {
NODE_OPTIONS: '--import dd-trace/register.js'
})

await res
}).timeout(20000)
Expand Down
Loading

0 comments on commit 96bb84a

Please sign in to comment.