Skip to content

fix: Add cache token support to ChatAnthropicVertex streaming responses #1010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions libs/vertexai/langchain_google_vertexai/_anthropic_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
ToolCall,
ToolMessage,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.messages.ai import InputTokenDetails, UsageMetadata
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
from pydantic import BaseModel
Expand All @@ -50,6 +50,41 @@
}


def _create_usage_metadata(anthropic_usage: BaseModel) -> UsageMetadata:
"""Create UsageMetadata from Anthropic usage with proper cache token handling.

This matches the official langchain_anthropic implementation exactly.
"""
input_token_details: dict = {
"cache_read": getattr(anthropic_usage, "cache_read_input_tokens", None),
"cache_creation": getattr(anthropic_usage, "cache_creation_input_tokens", None),
}

# Anthropic input_tokens exclude cached token counts.
input_tokens = (
(getattr(anthropic_usage, "input_tokens", 0) or 0)
+ (input_token_details["cache_read"] or 0)
+ (input_token_details["cache_creation"] or 0)
)
output_tokens = getattr(anthropic_usage, "output_tokens", 0) or 0

# Only add input_token_details if we have non-None cache values
filtered_details = {k: v for k, v in input_token_details.items() if v is not None}
if filtered_details:
return UsageMetadata(
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
input_token_details=InputTokenDetails(**filtered_details),
)
else:
return UsageMetadata(
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
)


def _format_image(image_url: str, project: Optional[str]) -> Dict:
"""Formats a message image to a dict for anthropic api."""
regex = r"^data:(?P<media_type>(?:image|application)/.+);base64,(?P<data>.+)$"
Expand Down Expand Up @@ -336,14 +371,22 @@ def _make_message_chunk_from_anthropic_event(
message_chunk: Optional[AIMessageChunk] = None
# See https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501
if event.type == "message_start" and stream_usage:
input_tokens = event.message.usage.input_tokens
# Follow official langchain_anthropic pattern exactly
usage_metadata = _create_usage_metadata(event.message.usage)
# We pick up a cumulative count of output_tokens at the end of the stream,
# so here we zero out to avoid double counting.
usage_metadata["total_tokens"] = (
usage_metadata["total_tokens"] - usage_metadata["output_tokens"]
)
usage_metadata["output_tokens"] = 0
if hasattr(event.message, "model"):
response_metadata = {"model_name": event.message.model}
else:
response_metadata = {}
message_chunk = AIMessageChunk(
content="" if coerce_content_to_string else [],
usage_metadata=UsageMetadata(
input_tokens=input_tokens,
output_tokens=0,
total_tokens=input_tokens,
),
usage_metadata=usage_metadata,
response_metadata=response_metadata,
)
elif (
event.type == "content_block_start"
Expand Down Expand Up @@ -403,14 +446,17 @@ def _make_message_chunk_from_anthropic_event(
tool_call_chunks=[tool_call_chunk], # type: ignore
)
elif event.type == "message_delta" and stream_usage:
output_tokens = event.usage.output_tokens
# Follow official langchain_anthropic pattern - NO cache tokens for delta
# Only output tokens are provided in message_delta events
usage_metadata = {
"input_tokens": 0,
"output_tokens": event.usage.output_tokens,
"total_tokens": event.usage.output_tokens,
}

message_chunk = AIMessageChunk(
content="",
usage_metadata=UsageMetadata(
input_tokens=0,
output_tokens=output_tokens,
total_tokens=output_tokens,
),
usage_metadata=usage_metadata,
response_metadata={
"stop_reason": event.delta.stop_reason,
"stop_sequence": event.delta.stop_sequence,
Expand Down
19 changes: 3 additions & 16 deletions libs/vertexai/langchain_google_vertexai/model_garden.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
AIMessage,
BaseMessage,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.outputs import (
ChatGeneration,
ChatGenerationChunk,
Expand All @@ -55,6 +54,7 @@
_extract_tool_calls,
)
from langchain_google_vertexai._anthropic_utils import (
_create_usage_metadata,
_documents_in_params,
_format_messages_anthropic,
_make_message_chunk_from_anthropic_event,
Expand All @@ -66,13 +66,6 @@
from langchain_google_vertexai._retry import create_base_retry_decorator


class CacheUsageMetadata(UsageMetadata):
cache_creation_input_tokens: Optional[int]
"""The number of input tokens used to create the cache entry."""
cache_read_input_tokens: Optional[int]
"""The number of input tokens read from the cache."""


def _create_retry_decorator(
*,
max_retries: int = 3,
Expand Down Expand Up @@ -287,14 +280,8 @@ def _format_output(self, data: Any, **kwargs: Any) -> ChatResult:
)
else:
msg = AIMessage(content=content)
# Collect token usage
msg.usage_metadata = CacheUsageMetadata(
input_tokens=data.usage.input_tokens,
output_tokens=data.usage.output_tokens,
total_tokens=data.usage.input_tokens + data.usage.output_tokens,
cache_creation_input_tokens=data.usage.cache_creation_input_tokens,
cache_read_input_tokens=data.usage.cache_read_input_tokens,
)
# Collect token usage using the reusable function (matches langchain_anthropic)
msg.usage_metadata = _create_usage_metadata(data.usage)
return ChatResult(
generations=[ChatGeneration(message=msg)],
llm_output=llm_output,
Expand Down
18 changes: 7 additions & 11 deletions libs/vertexai/tests/integration_tests/test_chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,10 @@ def get_climate_info(query: str):
tools=tools,
prompt=prompt_template,
)
agent_executor = agents.AgentExecutor( # type: ignore[call-arg]
agent_executor = agents.AgentExecutor(
agent=agent,
tools=tools,
verbose=False,
stream_runnable=False,
)
output = agent_executor.invoke({"input": message})
assert isinstance(output["output"], str)
Expand Down Expand Up @@ -1045,7 +1044,7 @@ def test_structured_output_schema_enum():
"""
The film aims to educate and inform viewers about real-life subjects, events, or
people. It offers a factual record of a particular topic by combining interviews
, historical footage and narration. The primary purpose of a film is to present
, historical footage and narration. The primary purpose of a film is to present
information and provide insights into various aspects of reality.
"""
)
Expand All @@ -1060,14 +1059,14 @@ def test_structured_output_schema_enum():
@pytest.mark.first
def test_context_catching():
system_instruction = """

You are an expert researcher. You always stick to the facts in the sources provided,
and never make up new facts.

If asked about it, the secret number is 747.

Now look at these research papers, and answer the following questions.

"""

cached_content = create_context_cache(
Expand Down Expand Up @@ -1133,9 +1132,8 @@ def get_secret_number() -> int:

You have a get_secret_number function available. Use this tool if someone asks
for the secret number.

Now look at these research papers, and answer the following questions.

"""

cached_content = create_context_cache(
Expand Down Expand Up @@ -1180,9 +1178,7 @@ def get_secret_number() -> int:
tools=tools,
prompt=prompt,
)
agent_executor = agents.AgentExecutor( # type: ignore[call-arg]
agent=agent, tools=tools, verbose=False, stream_runnable=False
)
agent_executor = agents.AgentExecutor(agent=agent, tools=tools, verbose=False)
response = agent_executor.invoke({"input": "what is the secret number?"})
assert isinstance(response["output"], str)

Expand Down
20 changes: 12 additions & 8 deletions libs/vertexai/tests/unit_tests/test_chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,11 +1385,13 @@ def model_dump(self):
assert message.tool_calls[0]["name"] == "calculator"
assert message.tool_calls[0]["args"] == {"number": 42}
assert message.usage_metadata == {
"input_tokens": 2,
"input_tokens": 4, # 2 + 1 + 1 (original + cache_read + cache_creation)
"output_tokens": 1,
"total_tokens": 3,
"cache_creation_input_tokens": 1,
"cache_read_input_tokens": 1,
"total_tokens": 5, # 4 + 1
"input_token_details": {
"cache_creation": 1,
"cache_read": 1,
},
}


Expand Down Expand Up @@ -1453,11 +1455,13 @@ def model_dump(self):
assert len(message.content) == 3
assert message.content == test_msg.model_dump()["content"]
assert message.usage_metadata == {
"input_tokens": 2,
"input_tokens": 4, # 2 + 1 + 1 (original + cache_read + cache_creation)
"output_tokens": 1,
"total_tokens": 3,
"cache_creation_input_tokens": 1,
"cache_read_input_tokens": 1,
"total_tokens": 5, # 4 + 1
"input_token_details": {
"cache_creation": 1,
"cache_read": 1,
},
}


Expand Down
Loading