Skip to content

Commit

Permalink
[MLOB-2233] feat(openai, llmobs): patch _thenUnwrap of chat complet…
Browse files Browse the repository at this point in the history
…ion api promise instances (#5253)

* handle _thenUnwrap for OpenAI

* rename unwrap handler method

* add tests

* add test case for using _thenUnwrap on userland

* refactor, fix tests

* fix

* fix + todo

---------

Co-authored-by: José Mussa <[email protected]>
  • Loading branch information
sabrenner and josemussa authored Feb 19, 2025
1 parent 18e840f commit 34d89b1
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 34 deletions.
86 changes: 53 additions & 33 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,44 +306,33 @@ for (const shim of V4_PACKAGE_SHIMS) {
return ch.start.runStores(ctx, () => {
const apiProm = methodFn.apply(this, arguments)

if (baseResource === 'chat.completions' && typeof apiProm._thenUnwrap === 'function') {
// this should only ever be invoked from a client.beta.chat.completions.parse call
shimmer.wrap(apiProm, '_thenUnwrap', origApiPromThenUnwrap => function () {
// TODO(sam.brenner): I wonder if we can patch the APIPromise prototype instead, although
// we might not have access to everything we need...

// this is a new apipromise instance
const unwrappedPromise = origApiPromThenUnwrap.apply(this, arguments)

shimmer.wrap(unwrappedPromise, 'parse', origApiPromParse => function () {
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))

return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})

return unwrappedPromise
})
}

// wrapping `parse` avoids problematic wrapping of `then` when trying to call
// `withResponse` in userland code after. This way, we can return the whole `APIPromise`
shimmer.wrap(apiProm, 'parse', origApiPromParse => function () {
return origApiPromParse.apply(this, arguments)
// the original response is wrapped in a promise, so we need to unwrap it
const parsedPromise = origApiPromParse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
.then(([{ response, options }, body]) => {
if (stream) {
if (body.iterator) {
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, n, ctx))
} else {
shimmer.wrap(
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, n, ctx)
)
}
} else {
finish(ctx, {
headers: response.headers,
data: body,
request: {
path: response.url,
method: options.method
}
})
}

return body
})
.catch(error => {
finish(ctx, undefined, error)

throw error
})
.finally(() => {
// maybe we don't want to unwrap here in case the promise is re-used?
// other hand: we want to avoid resource leakage
shimmer.unwrap(apiProm, 'parse')
})
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
})

ch.end.publish(ctx)
Expand All @@ -356,6 +345,37 @@ for (const shim of V4_PACKAGE_SHIMS) {
})
}

function handleUnwrappedAPIPromise (apiProm, ctx, stream, n) {
return apiProm
.then(([{ response, options }, body]) => {
if (stream) {
if (body.iterator) {
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, n, ctx))
} else {
shimmer.wrap(
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, n, ctx)
)
}
} else {
finish(ctx, {
headers: response.headers,
data: body,
request: {
path: response.url,
method: options.method
}
})
}

return body
})
.catch(error => {
finish(ctx, undefined, error)

throw error
})
}

function finish (ctx, response, error) {
if (error) {
ctx.error = error
Expand Down
54 changes: 53 additions & 1 deletion packages/datadog-plugin-openai/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,10 @@ describe('Plugin', () => {
}

if (semver.satisfies(realVersion, '>=4.0.0')) {
const result = await openai.chat.completions.create(params)
const prom = openai.chat.completions.create(params)
expect(prom).to.have.property('withResponse')

const result = await prom

expect(result.id).to.eql('chatcmpl-7GaWqyMTD9BLmkmy8SxyjUGX3KSRN')
expect(result.model).to.eql('gpt-3.5-turbo-0301')
Expand Down Expand Up @@ -3786,6 +3789,55 @@ describe('Plugin', () => {
}
})
}

if (semver.intersects('>=4.59.0', version)) {
it('makes a successful call with the beta chat completions', async () => {
nock('https://api.openai.com:443')
.post('/v1/chat/completions')
.reply(200, {
id: 'chatcmpl-7GaWqyMTD9BLmkmy8SxyjUGX3KSRN',
object: 'chat.completion',
created: 1684188020,
model: 'gpt-4o',
usage: {
prompt_tokens: 37,
completion_tokens: 10,
total_tokens: 47
},
choices: [
{
message: {
role: 'assistant',
content: 'I am doing well, how about you?'
},
finish_reason: 'stop',
index: 0
}
]
})

const checkTraces = agent
.use(traces => {
const span = traces[0][0]
expect(span).to.have.property('name', 'openai.request')
})

const prom = openai.beta.chat.completions.parse({
model: 'gpt-4o',
messages: [{ role: 'user', content: 'Hello, OpenAI!', name: 'hunter2' }],
temperature: 0.5,
stream: false
})

expect(prom).to.have.property('withResponse')

const response = await prom

expect(response.choices[0].message.content).to.eql('I am doing well, how about you?')

await checkTraces
})
}
})
})
})

0 comments on commit 34d89b1

Please sign in to comment.