Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MLOB-2240] fix(openai, langchain, llmobs): OpenAI and LangChain instrumentation is ESM-compatible #5267

Merged
merged 11 commits into from
Feb 20, 2025
12 changes: 8 additions & 4 deletions initialize.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ ${result.source}`
const [NODE_MAJOR, NODE_MINOR] = process.versions.node.split('.').map(x => +x)

const brokenLoaders = NODE_MAJOR === 18 && NODE_MINOR === 0
const iitmExclusions = [/langsmith/, /openai\/_shims/, /openai\/resources\/chat\/completions\/messages/]

export async function load (...args) {
const loadHook = brokenLoaders ? args[args.length - 1] : origLoad
return insertInit(await loadHook(...args))
export async function load (url, context, nextLoad) {
const iitmExclusionsMatch = iitmExclusions.some((exclusion) => exclusion.test(url))
const loadHook = (brokenLoaders || iitmExclusionsMatch) ? nextLoad : origLoad
return insertInit(await loadHook(url, context, nextLoad))
}

export const resolve = brokenLoaders ? undefined : origResolve
Expand All @@ -53,6 +55,8 @@ if (isMainThread) {
const require = Module.createRequire(import.meta.url)
require('./init.js')
if (Module.register) {
Module.register('./loader-hook.mjs', import.meta.url)
Module.register('./loader-hook.mjs', import.meta.url, {
data: { exclude: iitmExclusions }
})
}
}
6 changes: 3 additions & 3 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ module.exports = {
'@jest/test-sequencer': () => require('../jest'),
'@jest/transform': () => require('../jest'),
'@koa/router': () => require('../koa'),
'@langchain/core': () => require('../langchain'),
'@langchain/openai': () => require('../langchain'),
'@langchain/core': { esmFirst: true, fn: () => require('../langchain') },
'@langchain/openai': { esmFirst: true, fn: () => require('../langchain') },
'@node-redis/client': () => require('../redis'),
'@opensearch-project/opensearch': () => require('../opensearch'),
'@opentelemetry/sdk-trace-node': () => require('../otel-sdk-trace'),
Expand Down Expand Up @@ -100,7 +100,7 @@ module.exports = {
'node:vm': () => require('../vm'),
nyc: () => require('../nyc'),
oracledb: () => require('../oracledb'),
openai: () => require('../openai'),
openai: { esmFirst: true, fn: () => require('../openai') },
paperplane: () => require('../paperplane'),
passport: () => require('../passport'),
'passport-http': () => require('../passport-http'),
Expand Down
152 changes: 78 additions & 74 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,98 +8,98 @@ const ch = dc.tracingChannel('apm:openai:request')

const V4_PACKAGE_SHIMS = [
{
file: 'resources/chat/completions.js',
file: 'resources/chat/completions',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/completions.js',
file: 'resources/completions',
targetClass: 'Completions',
baseResource: 'completions',
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/embeddings.js',
file: 'resources/embeddings',
targetClass: 'Embeddings',
baseResource: 'embeddings',
methods: ['create']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['create', 'del', 'list', 'retrieve']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['retrieveContent'],
versions: ['>=4.0.0 <4.17.1']
},
{
file: 'resources/files.js',
file: 'resources/files',
targetClass: 'Files',
baseResource: 'files',
methods: ['content'], // replaced `retrieveContent` in v4.17.1
versions: ['>=4.17.1']
},
{
file: 'resources/images.js',
file: 'resources/images',
targetClass: 'Images',
baseResource: 'images',
methods: ['createVariation', 'edit', 'generate']
},
{
file: 'resources/fine-tuning/jobs/jobs.js',
file: 'resources/fine-tuning/jobs/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.34.0'] // file location changed in 4.34.0
},
{
file: 'resources/fine-tuning/jobs.js',
file: 'resources/fine-tuning/jobs',
targetClass: 'Jobs',
baseResource: 'fine_tuning.jobs',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.1.0 <4.34.0']
},
{
file: 'resources/fine-tunes.js', // deprecated after 4.1.0
file: 'resources/fine-tunes', // deprecated after 4.1.0
targetClass: 'FineTunes',
baseResource: 'fine-tune',
methods: ['cancel', 'create', 'list', 'listEvents', 'retrieve'],
versions: ['>=4.0.0 <4.1.0']
},
{
file: 'resources/models.js',
file: 'resources/models',
targetClass: 'Models',
baseResource: 'models',
methods: ['del', 'list', 'retrieve']
},
{
file: 'resources/moderations.js',
file: 'resources/moderations',
targetClass: 'Moderations',
baseResource: 'moderations',
methods: ['create']
},
{
file: 'resources/audio/transcriptions.js',
file: 'resources/audio/transcriptions',
targetClass: 'Transcriptions',
baseResource: 'audio.transcriptions',
methods: ['create']
},
{
file: 'resources/audio/translations.js',
file: 'resources/audio/translations',
targetClass: 'Translations',
baseResource: 'audio.translations',
methods: ['create']
},
{
file: 'resources/chat/completions/completions.js',
file: 'resources/chat/completions/completions',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create'],
Expand Down Expand Up @@ -267,82 +267,86 @@ function wrapStreamIterator (response, options, n, ctx) {
}
}

for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype
const extensions = ['.js', '.mjs']

for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
for (const extension of extensions) {
for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file: file + extension, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype

// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
for (const methodName of methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
if (!ch.start.hasSubscribers) {
return methodFn.apply(this, arguments)
}
}

const client = this._client || this.client
// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && getOption(arguments, 'stream', false)

// we need to compute how many prompts we are sending in streamed cases for completions
// not applicable for chat completiond
let n
if (stream) {
n = getOption(arguments, 'n', 1)
const prompt = getOption(arguments, 'prompt')
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
n *= prompt.length
}
}

const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}
const client = this._client || this.client

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)
const ctx = {
methodName: `${baseResource}.${methodName}`,
args: arguments,
basePath: client.baseURL,
apiKey: client.apiKey
}

return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)

if (baseResource === 'chat.completions' && typeof apiProm._thenUnwrap === 'function') {
// this should only ever be invoked from a client.beta.chat.completions.parse call
shimmer.wrap(apiProm, '_thenUnwrap', origApiPromThenUnwrap => function () {
// TODO(sam.brenner): I wonder if we can patch the APIPromise prototype instead, although
// we might not have access to everything we need...
if (baseResource === 'chat.completions' && typeof apiProm._thenUnwrap === 'function') {
// this should only ever be invoked from a client.beta.chat.completions.parse call
shimmer.wrap(apiProm, '_thenUnwrap', origApiPromThenUnwrap => function () {
// TODO(sam.brenner): I wonder if we can patch the APIPromise prototype instead, although
// we might not have access to everything we need...

// this is a new apipromise instance
const unwrappedPromise = origApiPromThenUnwrap.apply(this, arguments)
// this is a new apipromise instance
const unwrappedPromise = origApiPromThenUnwrap.apply(this, arguments)

shimmer.wrap(unwrappedPromise, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
shimmer.wrap(unwrappedPromise, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})

return unwrappedPromise
})
}

return unwrappedPromise
// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})
}

// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
ch.end.publish(ctx)

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
return apiProm
})

ch.end.publish(ctx)

return apiProm
})
})
}
return exports
})
}
return exports
})
}
}

function handleUnwrappedAPIPromise (apiProm, ctx, stream, n) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ const {
} = require('../../../../integration-tests/helpers')
const { assert } = require('chai')

// there is currently an issue with langchain + esm loader hooks from IITM
// https://github.com/nodejs/import-in-the-middle/issues/163
describe.skip('esm', () => {
describe('esm', () => {
let agent
let proc
let sandbox
Expand Down Expand Up @@ -47,7 +45,9 @@ describe.skip('esm', () => {
assert.strictEqual(checkSpansForServiceName(payload, 'langchain.request'), true)
})

proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port)
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port, null, {
NODE_OPTIONS: '--import dd-trace/register.js'
})

await res
}).timeout(20000)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import 'dd-trace/init.js'

import { OpenAI } from '@langchain/openai'
import { StringOutputParser } from '@langchain/core/output_parsers'
import nock from 'nock'

nock('https://api.openai.com:443')
.post('/v1/completions')
.reply(200, {})
.reply(200, {
model: 'gpt-3.5-turbo-instruct',
choices: [{
text: 'The answer is 4',
index: 0,
logprobs: null,
finish_reason: 'length'
}],
usage: { prompt_tokens: 8, completion_tokens: 12, otal_tokens: 20 }
})

const llm = new OpenAI({
apiKey: '<not-a-real-key>'
Expand All @@ -15,4 +25,4 @@ const parser = new StringOutputParser()

const chain = llm.pipe(parser)

await chain.invoke('a test')
await chain.invoke('what is 2 + 2?')
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ describe('esm', () => {
let sandbox

// limit v4 tests while the IITM issue is resolved or a workaround is introduced
// this is only relevant for `openai` >=4.0 <=4.1
// issue link: https://github.com/DataDog/import-in-the-middle/issues/60
withVersions('openai', 'openai', '>=3 <4', version => {
withVersions('openai', 'openai', '>=3 <4.0.0 || >4.1.0', version => {
before(async function () {
this.timeout(20000)
sandbox = await createSandbox([`'openai@${version}'`, 'nock'], false, [
Expand All @@ -43,7 +44,9 @@ describe('esm', () => {
assert.strictEqual(checkSpansForServiceName(payload, 'openai.request'), true)
})

proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port)
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'server.mjs', agent.port, null, {
NODE_OPTIONS: '--import dd-trace/register.js'
})

await res
}).timeout(20000)
Expand Down
Loading
Loading