Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/routes/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ async def generate_stream():
iteration += 1
tool_uses = []
assistant_content = ""
last_stop_reason: str | None = None

logger.info(f"[CHAT] Iteration {iteration}: calling Anthropic, {len(conversation)} messages")
# Stream from Anthropic with retry on transient errors
Expand Down Expand Up @@ -352,6 +353,7 @@ async def generate_stream():

# Use final message for complete, parsed tool inputs
final = await stream.get_final_message()
last_stop_reason = getattr(final, "stop_reason", None)
for block in final.content:
if block.type == "tool_use":
if plan_mode:
Expand Down Expand Up @@ -394,7 +396,7 @@ async def generate_stream():
)
except Exception as e:
logger.warning(f"[CHAT] Failed to record usage: {e}")
yield f"data: {json.dumps({'type': 'done', 'content': assistant_content, 'session_id': session_id, 'model': model, 'usage': {'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens, 'cache_creation_input_tokens': total_cache_creation_input_tokens, 'cache_read_input_tokens': total_cache_read_input_tokens}, 'cost_gbp': billing['cost_gbp'] if billing else None, 'balance': billing['balance'] if billing else None})}\n\n"
yield f"data: {json.dumps({'type': 'done', 'content': assistant_content, 'session_id': session_id, 'model': model, 'stop_reason': last_stop_reason, 'usage': {'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens, 'cache_creation_input_tokens': total_cache_creation_input_tokens, 'cache_read_input_tokens': total_cache_read_input_tokens}, 'cost_gbp': billing['cost_gbp'] if billing else None, 'balance': billing['balance'] if billing else None})}\n\n"
break

# Detect infinite loops
Expand Down
217 changes: 209 additions & 8 deletions frontend/src/app/ChatPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ interface Message {
events?: StreamEvent[];
isComplete?: boolean;
cost_gbp?: number;
/** Anthropic stop_reason on the final iteration. "max_tokens" means truncated. */
stop_reason?: string;
/** True when the user clicked Stop and the stream was aborted mid-flight. */
stopped?: boolean;
}

interface BalanceSummary {
Expand Down Expand Up @@ -234,7 +238,18 @@ export default function ChatPage() {
try {
const data = conversationCache.current.get(conv.id) || await apiRequest<ConversationDetail>("GET", `conversations/${conv.id}`);
if (!data?.messages?.length) { console.error("No messages in conversation", data); return; }
const loaded: Message[] = data.messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content || "", isComplete: true, events: m.events }));
const loaded: Message[] = data.messages.map((m) => {
const raw = m as { role: string; content: string; events?: StreamEvent[]; stop_reason?: string; stopped?: boolean; cost_gbp?: number };
return {
role: raw.role as "user" | "assistant",
content: raw.content || "",
isComplete: true,
events: raw.events,
stop_reason: raw.stop_reason,
stopped: raw.stopped,
cost_gbp: raw.cost_gbp,
};
});
const collapsed = new Set(loaded.map((m, i) => (m.role === "assistant" && m.events?.some((e) => e.type === "tool") ? i : -1)).filter((i) => i >= 0));
sessionId.current = data.session_id;
setActiveConversationId(data.id);
Expand Down Expand Up @@ -287,10 +302,14 @@ export default function ChatPage() {
} catch (e) { console.error("Title generation failed", e); }

const apiMessages = msgs.map((m) => {
if (m.role === "assistant" && m.isComplete && m.events?.length) {
return { role: m.role, content: m.content, events: m.events };
const base: Record<string, unknown> = { role: m.role, content: m.content };
if (m.role === "assistant") {
if (m.isComplete && m.events?.length) base.events = m.events;
if (m.cost_gbp !== undefined) base.cost_gbp = m.cost_gbp;
if (m.stop_reason) base.stop_reason = m.stop_reason;
if (m.stopped) base.stopped = true;
}
return { role: m.role, content: m.content };
return base;
});

try {
Expand Down Expand Up @@ -482,20 +501,21 @@ export default function ChatPage() {
updateMessage();
if (data.session_id) sessionId.current = data.session_id;
const msgCost = typeof data.cost_gbp === "number" ? data.cost_gbp : undefined;
const stopReason = typeof data.stop_reason === "string" ? data.stop_reason : undefined;
const hasTools = events.some((e) => e.type === "tool");
if (hasTools) {
setMessages((prev) => {
const newMsgs = [...prev];
const lastIdx = newMsgs.length - 1;
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, cost_gbp: msgCost };
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, cost_gbp: msgCost, stop_reason: stopReason };
setCollapsedWorking((c) => new Set(c).add(lastIdx));
return newMsgs;
});
} else {
setMessages((prev) => {
const newMsgs = [...prev];
const lastIdx = newMsgs.length - 1;
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], cost_gbp: msgCost };
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], cost_gbp: msgCost, stop_reason: stopReason };
return newMsgs;
});
}
Expand All @@ -521,14 +541,15 @@ export default function ChatPage() {
}
} catch (error) {
if (error instanceof DOMException && error.name === "AbortError") {
// User stopped the stream — flush what we have
// User stopped the stream — flush what we have, mark `stopped`
// so the UI shows a Continue affordance.
if (drainTimer) { clearInterval(drainTimer); drainTimer = null; }
displayedText = currentText;
updateMessage();
setMessages((prev) => {
const newMsgs = [...prev];
const lastIdx = newMsgs.length - 1;
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true };
if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, stopped: true };
return newMsgs;
});
} else {
Expand All @@ -545,6 +566,169 @@ export default function ChatPage() {

const stopStreaming = () => { abortRef.current?.abort(); };

/** Resume a previously truncated or stopped assistant turn.
*
* The conversation up to and including the partial assistant message is
* sent to /chat/message; the model continues from there via Anthropic's
* assistant-prefill behaviour (no extra user nudge needed). New text and
* events are appended into the SAME message bubble so the user sees one
* continuous answer.
*/
const continueMessage = async (idx: number) => {
if (isStreaming) return;
const target = messages[idx];
if (!target || target.role !== "assistant" || !target.isComplete) return;
// Refuse if a tool is still pending in this message — re-triggering
// would orphan the partial tool call.
if (target.events?.some((e) => e.type === "tool" && e.data.status === "pending")) return;

const priorMessages = messages.slice(0, idx + 1);
const apiMessages = priorMessages.map((msg) => {
let content = msg.content;
if (msg.role === "assistant" && msg.events) {
const toolResults = msg.events.filter((e): e is { type: "tool"; data: ToolData } => e.type === "tool" && !!e.data.result_summary).map((e) => `[Tool: ${e.data.tool_name}] ${e.data.result_summary}`).join("\n\n");
if (toolResults) content += "\n\n---\nTool results:\n" + toolResults;
}
return { role: msg.role, content };
});

// Optimistic: clear truncation/stopped flags and mark in-flight.
setMessages((prev) => prev.map((m, i) => i === idx ? { ...m, isComplete: false, stop_reason: undefined, stopped: undefined } : m));
setIsStreaming(true);
setIsWaiting(true);

const controller = new AbortController();
abortRef.current = controller;

let appendedText = "";
const newEvents: StreamEvent[] = [];
const toolsMap = new Map<string, ToolData>();
const baseContent = target.content;
const baseEvents = target.events ? [...target.events] : [];
const baseCost = target.cost_gbp || 0;

const flushTarget = () => {
setMessages((prev) => prev.map((m, i) => i === idx ? {
...m,
content: baseContent + appendedText,
events: [...baseEvents, ...newEvents],
} : m));
};

try {
const headers: Record<string, string> = { "Content-Type": "application/json" };
if (user?.id) headers["X-User-Id"] = user.id;
const response = await fetch(getBackendEndpoint("chat/message"), {
method: "POST",
headers,
body: JSON.stringify({ messages: apiMessages, session_id: sessionId.current, user_id: user?.id || null, plan_mode: false }),
signal: controller.signal,
});
if (response.status === 402) {
const err = await response.json().catch(() => ({ error: "No credit remaining" }));
setMessages((prev) => [...prev, { role: "assistant", content: err.error || "No credit remaining. Please top up to continue.", isComplete: true }]);
return;
}
if (response.status === 429) {
const seconds = parseInt(response.headers.get("retry-after") || "60", 10);
setMessages((prev) => [...prev, { role: "assistant", content: `You're sending messages a bit fast — please wait ~${seconds}s and try again.`, isComplete: true }]);
return;
}
if (!response.ok) throw new Error("Request failed");
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) throw new Error("No body");

let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const data = JSON.parse(line.slice(6));
if (data.type === "chunk") {
setIsWaiting(false);
const lastEvent = newEvents[newEvents.length - 1];
if (lastEvent?.type === "text" && !lastEvent.thinking) lastEvent.content += data.content;
else newEvents.push({ type: "text", content: data.content });
appendedText += data.content;
flushTarget();
} else if (data.type === "tool_start") {
setIsWaiting(false);
const toolData: ToolData = { tool_name: data.tool_name, tool_id: data.tool_id, status: "pending" };
toolsMap.set(data.tool_id, toolData);
newEvents.push({ type: "tool", data: toolData });
flushTarget();
} else if (data.type === "tool_use") {
const existing = toolsMap.get(data.tool_id);
if (existing) existing.input = data.tool_input;
else { const td: ToolData = { tool_name: data.tool_name, tool_id: data.tool_id, status: "pending", input: data.tool_input }; toolsMap.set(data.tool_id, td); newEvents.push({ type: "tool", data: td }); }
flushTarget();
} else if (data.type === "tool_result") {
const tool = toolsMap.get(data.tool_id);
if (tool) { tool.status = data.status; tool.result_summary = data.result_summary; }
flushTarget();
setIsWaiting(true);
} else if (data.type === "done") {
setIsWaiting(false);
const newCost = typeof data.cost_gbp === "number" ? data.cost_gbp : 0;
const stopReason = typeof data.stop_reason === "string" ? data.stop_reason : undefined;
const finalContent = baseContent + appendedText;
const finalEvents = [...baseEvents, ...newEvents];
setMessages((prev) => prev.map((m, i) => i === idx ? {
...m,
isComplete: true,
content: finalContent,
events: finalEvents,
cost_gbp: baseCost + newCost,
stop_reason: stopReason,
stopped: false,
} : m));
if (data.balance) setBalance(data.balance); else fetchBalance();
if (sessionId.current) {
const savedMessages = messages.map((m, i) => i === idx ? { ...m, isComplete: true, content: finalContent, events: finalEvents } : m);
saveConversation(savedMessages, sessionId.current);
}
} else if (data.type === "error") {
const errorText = `Error: ${data.content || "Something went wrong"}`;
const lastEvent = newEvents[newEvents.length - 1];
if (lastEvent?.type === "text") lastEvent.content += "\n\n" + errorText;
else newEvents.push({ type: "text", content: errorText });
appendedText += errorText;
flushTarget();
}
} catch {}
}
}
} catch (error) {
if (error instanceof DOMException && error.name === "AbortError") {
setMessages((prev) => prev.map((m, i) => i === idx ? {
...m,
isComplete: true,
content: baseContent + appendedText,
events: [...baseEvents, ...newEvents],
stopped: true,
} : m));
} else {
const errorText = `Continuation failed: ${error instanceof Error ? error.message : "Unknown error"}`;
setMessages((prev) => prev.map((m, i) => i === idx ? {
...m,
isComplete: true,
content: baseContent + appendedText + "\n\n" + errorText,
events: [...baseEvents, ...newEvents],
} : m));
}
} finally {
abortRef.current = null;
setIsStreaming(false);
setIsWaiting(false);
}
};

const handleKeyDown = (e: React.KeyboardEvent) => {
if (e.key === "Enter" && !e.shiftKey) { e.preventDefault(); sendMessage(); }
};
Expand Down Expand Up @@ -928,6 +1112,23 @@ export default function ChatPage() {
{msg.cost_gbp < 0.01 ? `${(msg.cost_gbp * 100).toFixed(2)}p` : `£${msg.cost_gbp.toFixed(3)}`}
</div>
)}
{msg.isComplete && !isStreaming && (msg.stop_reason === "max_tokens" || msg.stopped) && !msg.events?.some((e) => e.type === "tool" && e.data.status === "pending") && (
<div style={{ marginTop: "10px", display: "flex", alignItems: "center", gap: "8px" }}>
<button
type="button"
onClick={() => continueMessage(idx)}
title={msg.stop_reason === "max_tokens"
? "The answer hit the response length cap — continue from where it stopped."
: "Resume from where you stopped the answer."}
style={{ display: "inline-flex", alignItems: "center", gap: "5px", fontSize: "12px", color: THEME.primary, background: "transparent", border: `1px solid ${THEME.primary}`, padding: "4px 10px", cursor: "pointer", fontFamily: "inherit" }}
>
↳ Continue
</button>
<span style={{ fontSize: "11px", color: "#9e9a90" }}>
{msg.stop_reason === "max_tokens" ? "Truncated at max length" : "Stopped"}
</span>
</div>
)}
</div>
)}
</div>
Expand Down