Skip to content

Commit d3d23e2

Browse files
authored
fix(anthropic): streaming token counting to defer input tokens until completion (#32518)
Supersedes #32461 Fixed incorrect input token reporting during streaming when tools are used. Previously, input tokens were counted at `message_start` before tool execution, leading to inaccurate counts. Now input tokens are properly deferred until `message_delta` (completion), aligning with Anthropic's billing model and SDK expectations. **Before Fix:** - Streaming with tools: Input tokens = 0 ❌ - Non-streaming with tools: Input tokens = 472 ✅ **After Fix:** - Streaming with tools: Input tokens = 472 ✅ - Non-streaming with tools: Input tokens = 472 ✅ Aligns with Anthropic's SDK expectations. The SDK handles input token updates in `message_delta` events: ```python # https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py if event.usage.input_tokens is not None: current_snapshot.usage.input_tokens = event.usage.input_tokens ```
1 parent 2f32c44 commit d3d23e2

File tree

2 files changed

+310
-17
lines changed

2 files changed

+310
-17
lines changed

libs/partners/anthropic/langchain_anthropic/chat_models.py

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ class AnthropicTool(TypedDict):
7070
cache_control: NotRequired[dict[str, str]]
7171

7272

73+
class _CombinedUsage(BaseModel):
74+
"""Combined usage model for deferred token counting in streaming.
75+
76+
This mimics the Anthropic Usage structure while combining stored input usage
77+
with final output usage for accurate token reporting during streaming.
78+
"""
79+
80+
input_tokens: int = 0
81+
output_tokens: int = 0
82+
cache_creation_input_tokens: Optional[int] = None
83+
cache_read_input_tokens: Optional[int] = None
84+
cache_creation: Optional[dict[str, Any]] = None
85+
86+
7387
def _is_builtin_tool(tool: Any) -> bool:
7488
if not isinstance(tool, dict):
7589
return False
@@ -1493,12 +1507,18 @@ def _stream(
14931507
and not _thinking_in_params(payload)
14941508
)
14951509
block_start_event = None
1510+
stored_input_usage = None
14961511
for event in stream:
1497-
msg, block_start_event = _make_message_chunk_from_anthropic_event(
1512+
(
1513+
msg,
1514+
block_start_event,
1515+
stored_input_usage,
1516+
) = _make_message_chunk_from_anthropic_event(
14981517
event,
14991518
stream_usage=stream_usage,
15001519
coerce_content_to_string=coerce_content_to_string,
15011520
block_start_event=block_start_event,
1521+
stored_input_usage=stored_input_usage,
15021522
)
15031523
if msg is not None:
15041524
chunk = ChatGenerationChunk(message=msg)
@@ -1529,12 +1549,18 @@ async def _astream(
15291549
and not _thinking_in_params(payload)
15301550
)
15311551
block_start_event = None
1552+
stored_input_usage = None
15321553
async for event in stream:
1533-
msg, block_start_event = _make_message_chunk_from_anthropic_event(
1554+
(
1555+
msg,
1556+
block_start_event,
1557+
stored_input_usage,
1558+
) = _make_message_chunk_from_anthropic_event(
15341559
event,
15351560
stream_usage=stream_usage,
15361561
coerce_content_to_string=coerce_content_to_string,
15371562
block_start_event=block_start_event,
1563+
stored_input_usage=stored_input_usage,
15381564
)
15391565
if msg is not None:
15401566
chunk = ChatGenerationChunk(message=msg)
@@ -2167,22 +2193,40 @@ def _make_message_chunk_from_anthropic_event(
21672193
stream_usage: bool = True,
21682194
coerce_content_to_string: bool,
21692195
block_start_event: Optional[anthropic.types.RawMessageStreamEvent] = None,
2170-
) -> tuple[Optional[AIMessageChunk], Optional[anthropic.types.RawMessageStreamEvent]]:
2171-
"""Convert Anthropic event to AIMessageChunk.
2196+
stored_input_usage: Optional[BaseModel] = None,
2197+
) -> tuple[
2198+
Optional[AIMessageChunk],
2199+
Optional[anthropic.types.RawMessageStreamEvent],
2200+
Optional[BaseModel],
2201+
]:
2202+
"""Convert Anthropic event to ``AIMessageChunk``.
21722203
21732204
Note that not all events will result in a message chunk. In these cases
21742205
we return ``None``.
2206+
2207+
Args:
2208+
event: The Anthropic streaming event to convert.
2209+
stream_usage: Whether to include usage metadata in the chunk.
2210+
coerce_content_to_string: Whether to coerce content blocks to strings.
2211+
block_start_event: Previous content block start event for context.
2212+
stored_input_usage: Usage metadata from ``message_start`` event to be used
2213+
in ``message_delta`` event for accurate input token counts.
2214+
2215+
Returns:
2216+
Tuple of ``(message_chunk, block_start_event, stored_usage)``
2217+
21752218
"""
21762219
message_chunk: Optional[AIMessageChunk] = None
2220+
updated_stored_usage = stored_input_usage
21772221
# See https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/lib/streaming/_messages.py # noqa: E501
21782222
if event.type == "message_start" and stream_usage:
2179-
usage_metadata = _create_usage_metadata(event.message.usage)
2180-
# We pick up a cumulative count of output_tokens at the end of the stream,
2181-
# so here we zero out to avoid double counting.
2182-
usage_metadata["total_tokens"] = (
2183-
usage_metadata["total_tokens"] - usage_metadata["output_tokens"]
2223+
# Store input usage for later use in message_delta but don't emit tokens yet
2224+
updated_stored_usage = event.message.usage
2225+
usage_metadata = UsageMetadata(
2226+
input_tokens=0,
2227+
output_tokens=0,
2228+
total_tokens=0,
21842229
)
2185-
usage_metadata["output_tokens"] = 0
21862230
if hasattr(event.message, "model"):
21872231
response_metadata = {"model_name": event.message.model}
21882232
else:
@@ -2270,11 +2314,37 @@ def _make_message_chunk_from_anthropic_event(
22702314
tool_call_chunks=tool_call_chunks,
22712315
)
22722316
elif event.type == "message_delta" and stream_usage:
2273-
usage_metadata = UsageMetadata(
2274-
input_tokens=0,
2275-
output_tokens=event.usage.output_tokens,
2276-
total_tokens=event.usage.output_tokens,
2277-
)
2317+
# Create usage metadata combining stored input usage with final output usage
2318+
#
2319+
# Per Anthropic docs: "The token counts shown in the usage field of the
2320+
# message_delta event are cumulative." Thus, when MCP tools are called
2321+
# mid-stream, `input_tokens` may be updated with a higher cumulative count.
2322+
# We prioritize `event.usage.input_tokens` when available to handle this case.
2323+
if stored_input_usage is not None:
2324+
# Create a combined usage object that mimics the Anthropic Usage structure
2325+
combined_usage = _CombinedUsage(
2326+
input_tokens=event.usage.input_tokens
2327+
or getattr(stored_input_usage, "input_tokens", 0),
2328+
output_tokens=event.usage.output_tokens,
2329+
cache_creation_input_tokens=getattr(
2330+
stored_input_usage, "cache_creation_input_tokens", None
2331+
),
2332+
cache_read_input_tokens=getattr(
2333+
stored_input_usage, "cache_read_input_tokens", None
2334+
),
2335+
cache_creation=getattr(stored_input_usage, "cache_creation", None)
2336+
if hasattr(stored_input_usage, "cache_creation")
2337+
else None,
2338+
)
2339+
usage_metadata = _create_usage_metadata(combined_usage)
2340+
else:
2341+
# Fallback to just output tokens if no stored usage
2342+
usage_metadata = UsageMetadata(
2343+
input_tokens=event.usage.input_tokens or 0,
2344+
output_tokens=event.usage.output_tokens,
2345+
total_tokens=(event.usage.input_tokens or 0)
2346+
+ event.usage.output_tokens,
2347+
)
22782348
message_chunk = AIMessageChunk(
22792349
content="",
22802350
usage_metadata=usage_metadata,
@@ -2286,7 +2356,7 @@ def _make_message_chunk_from_anthropic_event(
22862356
else:
22872357
pass
22882358

2289-
return message_chunk, block_start_event
2359+
return message_chunk, block_start_event, updated_stored_usage
22902360

22912361

22922362
@deprecated(since="0.1.0", removal="1.0.0", alternative="ChatAnthropic")

0 commit comments

Comments
 (0)