Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export type {
// Output item types (StreamableOutputItem members)
OutputFileSearchCallItem,
OutputFunctionCallItem,
OutputImage,
OutputImage as OutputInputImage,
OutputImageGenerationCallItem,
OutputInputImage,
OutputItems,
OutputMessage,
OutputReasoningItem,
Expand Down
36 changes: 32 additions & 4 deletions packages/agent/src/lib/model-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import {
isServerTool,
isToolCallOutputEvent,
} from './tool-types.js';
import { normalizeInputToArray } from './turn-context.js';

/**
* Typeguard for plain-object records (non-null, non-array).
Expand Down Expand Up @@ -218,6 +219,8 @@ export class ModelResult<
}> = [];
// Track resolved request after async function resolution
private resolvedRequest: models.ResponsesRequest | null = null;
// Fresh user items to persist atomically with the assistant response
private pendingFreshItems: models.BaseInputsUnion[] | undefined;

// State management for multi-turn conversations
private stateAccessor: StateAccessor<TTools> | null = null;
Expand Down Expand Up @@ -491,11 +494,17 @@ export class ModelResult<
response.output,
];

// Persist pending fresh user items together with the assistant output
// so they land atomically — if the stream failed before reaching here
// neither the user turn nor the assistant turn is written to state.
let messages = this.currentState.messages;
if (this.pendingFreshItems && this.pendingFreshItems.length > 0) {
messages = appendToMessages(messages, this.pendingFreshItems);
this.pendingFreshItems = undefined;
}

await this.saveStateSafely({
messages: appendToMessages(
this.currentState.messages,
outputItems as models.BaseInputsUnion[],
),
messages: appendToMessages(messages, outputItems as models.BaseInputsUnion[]),
previousResponseId: response.id,
});
}
Expand Down Expand Up @@ -1493,11 +1502,17 @@ export class ModelResult<
// (newly supplied this turn). `onResponseReceived` must fire only for
// fresh items — re-hooking historical outputs on every callModel call
// would double-invoke non-idempotent hooks.
//
// Fresh items are tracked locally and persisted to state only after the
// API call succeeds, avoiding duplication when a caller retries after a
// transient API failure.
const hasLoadedHistory =
!!this.currentState?.messages &&
Array.isArray(this.currentState.messages) &&
this.currentState.messages.length > 0;

let freshItemsForState: models.BaseInputsUnion[] | undefined;

if (hasLoadedHistory && this.currentState) {
// `currentState.messages` is InputsUnion — keep it as that union so
// appendToMessages (which expects InputsUnion) accepts it directly.
Expand Down Expand Up @@ -1527,6 +1542,8 @@ export class ModelResult<
? await this.applyHooksToFreshItems(freshItems, historicalMessages, initialContext)
: undefined;

freshItemsForState = hookedFresh;

baseRequest = {
...baseRequest,
input: hookedFresh
Expand All @@ -1544,6 +1561,9 @@ export class ModelResult<
this.contextStore ?? undefined,
this.options.sharedContextSchema,
);

freshItemsForState = normalizeInputToArray(hookedInput);

baseRequest = {
...baseRequest,
input: hookedInput,
Expand Down Expand Up @@ -1572,6 +1592,14 @@ export class ModelResult<
throw apiResult.error;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion] streaming failure after ok:true can leave state with user input but no assistant output, causing duplicate messages on retry

Details

Why: betaResponsesSend returns ok:true when the HTTP 200 response starts, but the stream body can still fail (network drop mid-stream). At this point freshItemsForState has already been saved. If the caller retries with the same input, hasLoadedHistory is true (prior [user_input] exists in state), so appendToMessages(historicalMessages, freshItems) sends [user_input, user_input] to the API and persists a duplicate.

This is distinct from the ok:false path, which is correctly guarded. The streaming error scenario: (1) API returns 200 + EventStream, (2) user input saved here, (3) consumeStreamForCompletion throws mid-read, (4) caller retries with same input.

Fix (preferred): Move the freshItemsForState save into toolExecutionPromise, immediately after await this.getInitialResponse() succeeds but before saveResponseToState, so user input and model output are persisted on the same success path.

Prompt for agents
In packages/agent/src/lib/model-result.ts, the freshItemsForState save at ~line 1590 runs after betaResponsesSend returns ok:true but before the stream is consumed. When consumeStreamForCompletion later throws (network drop), state holds the user input with no matching assistant output. On retry, hasLoadedHistory is true and the same user input is appended again. Preferred fix: move the freshItemsForState save into toolExecutionPromise, immediately after `await this.getInitialResponse()` succeeds and before `saveResponseToState`, so user input and model output land atomically.

Reviewed at 7687633

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in ce00bf9. Fresh user items are now stashed on this.pendingFreshItems in initStream (after ok:true) and only persisted inside saveResponseToState, which runs after getInitialResponse() consumes the stream to completion. If the stream errors mid-read, getInitialResponse() throws before saveResponseToState is reached — neither user input nor assistant output is written to state, so a retry starts clean with no duplicates.

}

// Stash fresh user items so saveResponseToState can persist them
// atomically with the assistant output. Writing them here would leave
// an orphaned user turn if the stream fails after ok:true — on retry
// the same input would be appended again, producing duplicates.
if (freshItemsForState && freshItemsForState.length > 0) {
this.pendingFreshItems = freshItemsForState;
}

// Handle both streaming and non-streaming responses
// The API may return a non-streaming response even when stream: true is requested
if (isEventStream(apiResult.value)) {
Expand Down
Loading
Loading