Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/two-terms-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

fix: event data adjustment for #749
4 changes: 2 additions & 2 deletions examples/agent-patterns/agents-as-tools-streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async function main() {
// When you pass onStream handler, the agent is executed in streaming mode.
onStream: (event) => {
console.log(
`### onStream method streaming event from ${event.agentName} in streaming mode:\n\n` +
`### onStream method streaming event from ${event.agent.name} in streaming mode:\n\n` +
JSON.stringify(event) +
'\n',
);
Expand All @@ -47,7 +47,7 @@ async function main() {
/*
billingAgentTool.on('raw_model_stream_event', (event) => {
console.log(
`### on method streaming event from ${event.agentName} in streaming mode:\n\n` +
`### on method streaming event from ${event.agent.name} in streaming mode:\n\n` +
JSON.stringify(event) +
'\n',
);
Expand Down
39 changes: 20 additions & 19 deletions packages/agents-core/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,26 @@ type CompletedRunResult<TContext, TAgent extends Agent<TContext, any>> = (

type AgentToolRunOptions<TContext> = Omit<StreamRunOptions<TContext>, 'stream'>;

type AgentToolStreamEvent = {
type AgentToolStreamEvent<TAgent extends Agent<any, any>> = {
// Raw stream event emitted by the nested agent run.
event: RunStreamEvent;
// Convenience metadata so callers can correlate to the invoking tool call/agent.
agentName: string;
toolCallId?: string;
// The agent instance being executed as a tool.
agent: TAgent;
// The tool call item that triggered this nested run (when available).
toolCall?: protocol.FunctionCallItem;
};
type AgentToolEventName = AgentToolStreamEvent['event']['type'] | '*';
type AgentToolEventHandler = (
event: AgentToolStreamEvent,
type AgentToolEventName = RunStreamEvent['type'] | '*';
type AgentToolEventHandler<TAgent extends Agent<any, any>> = (
event: AgentToolStreamEvent<TAgent>,
) => void | Promise<void>;
type AgentTool<TContext> = FunctionTool<
type AgentTool<TContext, TAgent extends Agent<TContext, any>> = FunctionTool<
TContext,
typeof AgentAsToolNeedApprovalSchame
> & {
on: (
name: AgentToolEventName,
handler: AgentToolEventHandler,
) => AgentTool<TContext>;
handler: AgentToolEventHandler<TAgent>,
) => AgentTool<TContext, TAgent>;
};

// Per-process, ephemeral map linking a function tool call to its nested
Expand Down Expand Up @@ -566,9 +567,9 @@ export class Agent<
/**
* Optional hook to receive streamed events from the nested agent run.
*/
onStream?: (event: AgentToolStreamEvent) => void | Promise<void>;
onStream?: (event: AgentToolStreamEvent<TAgent>) => void | Promise<void>;
},
): AgentTool<TContext> {
): AgentTool<TContext, TAgent> {
const {
toolName,
toolDescription,
Expand All @@ -582,9 +583,9 @@ export class Agent<
// Event handlers are scoped to this agent tool instance and are not shared; we only support registration (no removal) to keep the API surface small.
const eventHandlers = new Map<
AgentToolEventName,
Set<AgentToolEventHandler>
Set<AgentToolEventHandler<TAgent>>
>();
const emitEvent = async (event: AgentToolStreamEvent) => {
const emitEvent = async (event: AgentToolStreamEvent<TAgent>) => {
// We intentionally keep only add semantics (no off) to reduce surface area; handlers are scoped to this agent tool instance.
const specific = eventHandlers.get(event.event.type);
const wildcard = eventHandlers.get('*');
Expand Down Expand Up @@ -627,9 +628,8 @@ export class Agent<
...(runOptions ?? {}),
});
const streamPayload = {
agentName: this.name,
// Tool calls should carry IDs, but direct invocation or provider quirks can omit it, so keep this optional.
toolCallId: details?.toolCall?.callId,
agent: this,
toolCall: details?.toolCall,
};

if (shouldStream) {
Expand Down Expand Up @@ -682,10 +682,11 @@ export class Agent<
},
});

const agentTool: AgentTool<TContext> = {
const agentTool: AgentTool<TContext, TAgent> = {
...baseTool,
on: (name, handler) => {
const set = eventHandlers.get(name) ?? new Set<AgentToolEventHandler>();
const set =
eventHandlers.get(name) ?? new Set<AgentToolEventHandler<TAgent>>();
set.add(handler);
eventHandlers.set(name, set);
return agentTool;
Expand Down
59 changes: 35 additions & 24 deletions packages/agents-core/test/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,21 @@ describe('Agent', () => {
expect.objectContaining({ stream: true }),
);
expect(onStream).toHaveBeenCalledTimes(streamEvents.length);
expect(onStream).toHaveBeenCalledWith({
event: streamEvents[0],
agentName: agent.name,
toolCallId: undefined,
});
expect(onStream).toHaveBeenCalledWith(
expect.objectContaining({
event: streamEvents[0],
agent,
toolCall: undefined,
}),
);
});

it('includes toolCallId and agentName when streaming from nested agent tools', async () => {
it('includes toolCall when streaming from nested agent tools', async () => {
const agent = new Agent({
name: 'Streamer Agent',
instructions: 'Stream things.',
});
const toolCall = { callId: 'call-123' } as any;
const streamEvents = [
{ type: 'raw_model_stream_event', data: { type: 'response_started' } },
] as any[];
Expand Down Expand Up @@ -392,7 +395,7 @@ describe('Agent', () => {
const output = await tool.invoke(
new RunContext(),
'{"input":"run streaming"}',
{ toolCall: { callId: 'call-123' } as any },
{ toolCall },
);

expect(output).toBe('tool output');
Expand All @@ -401,11 +404,13 @@ describe('Agent', () => {
'run streaming',
expect.objectContaining({ stream: true }),
);
expect(onStream).toHaveBeenCalledWith({
agentName: 'Streamer Agent',
event: streamEvents[0],
toolCallId: 'call-123',
});
expect(onStream).toHaveBeenCalledWith(
expect.objectContaining({
agent,
toolCall,
event: streamEvents[0],
}),
);
});

it('supports event handlers registered via on (agent tool only) and wildcard handlers', async () => {
Expand Down Expand Up @@ -453,10 +458,11 @@ describe('Agent', () => {

tool.on('raw_model_stream_event', rawHandler).on('*', wildcardHandler);

const toolCall = { callId: 'call-abc' } as any;
const output = await tool.invoke(
new RunContext(),
'{"input":"run streaming"}',
{ toolCall: { callId: 'call-abc' } as any },
{ toolCall },
);

expect(output).toBe('tool output');
Expand All @@ -466,11 +472,13 @@ describe('Agent', () => {
expect.objectContaining({ stream: true }),
);
expect(rawHandler).toHaveBeenCalledTimes(1);
expect(rawHandler).toHaveBeenCalledWith({
agentName: 'Streamer Agent',
event: streamEvents[0],
toolCallId: 'call-abc',
});
expect(rawHandler).toHaveBeenCalledWith(
expect.objectContaining({
agent,
toolCall,
event: streamEvents[0],
}),
);
expect(wildcardHandler).toHaveBeenCalledTimes(streamEvents.length);
});

Expand Down Expand Up @@ -513,10 +521,11 @@ describe('Agent', () => {

tool.on('raw_model_stream_event', handler);

const toolCall = { callId: 'call-xyz' } as any;
const output = await tool.invoke(
new RunContext(),
'{"input":"run streaming"}',
{ toolCall: { callId: 'call-xyz' } as any },
{ toolCall },
);

expect(output).toBe('tool output');
Expand All @@ -525,11 +534,13 @@ describe('Agent', () => {
'run streaming',
expect.objectContaining({ stream: true }),
);
expect(handler).toHaveBeenCalledWith({
agentName: 'Handler Agent',
event: streamEvents[0],
toolCallId: 'call-xyz',
});
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
agent,
toolCall,
event: streamEvents[0],
}),
);
});

it('filters tools using isEnabled predicates', async () => {
Expand Down