Skip to content

Commit 42586cf

Browse files
committed
There is a bug here where we are consuming the entire iterator FIRST only to return a new iterator, causing massive streaming lags
1 parent 57a9758 commit 42586cf

File tree

1 file changed

+94
-44
lines changed
  • python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno

1 file changed

+94
-44
lines changed

python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py

Lines changed: 94 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,19 +1049,13 @@ def run(
10491049
if response.status == "success":
10501050
function_result = ""
10511051
if isinstance(function_call.result, (GeneratorType, Iterator)):
1052-
events = []
1053-
for item in function_call.result:
1054-
if isinstance(item, RunContentEvent) or isinstance(
1055-
item, TeamRunContentEvent
1056-
):
1057-
function_result += self._parse_content(item.content)
1058-
else:
1059-
function_result += str(item)
1060-
events.append(item)
1061-
1062-
# Convert back to iterator for downstream use
1063-
function_call.result = self._generator_wrapper(events)
1052+
# Create a streaming wrapper that preserves real-time flow
1053+
function_call.result = self._streaming_generator_wrapper(
1054+
function_call.result, span
1055+
)
10641056
response.result = function_call.result
1057+
# Note: span attributes will be set when streaming completes
1058+
return response
10651059
elif isinstance(function_call.result, ToolResult):
10661060
function_result = function_call.result.content
10671061
else:
@@ -1115,31 +1109,21 @@ async def arun(
11151109
if response.status == "success":
11161110
function_result = ""
11171111
if isinstance(function_call.result, (AsyncGeneratorType, AsyncIterator)):
1118-
events = []
1119-
async for item in function_call.result:
1120-
if isinstance(item, RunContentEvent) or isinstance(
1121-
item, TeamRunContentEvent
1122-
):
1123-
function_result += self._parse_content(item.content)
1124-
else:
1125-
function_result += str(item)
1126-
events.append(item)
1127-
# Convert back to iterator for downstream use
1128-
function_call.result = self._async_generator_wrapper(events)
1112+
# Create a streaming wrapper that preserves real-time flow
1113+
function_call.result = self._streaming_async_generator_wrapper(
1114+
function_call.result, span
1115+
)
11291116
response.result = function_call.result
1117+
# Note: span attributes will be set when streaming completes
1118+
return response
11301119
elif isinstance(function_call.result, (GeneratorType, Iterator)):
1131-
events = []
1132-
for item in function_call.result:
1133-
if isinstance(item, RunContentEvent) or isinstance(
1134-
item, TeamRunContentEvent
1135-
):
1136-
function_result += self._parse_content(item.content)
1137-
else:
1138-
function_result += str(item)
1139-
events.append(item)
1140-
# Convert back to iterator for downstream use
1141-
function_call.result = self._generator_wrapper(events)
1120+
# Create a streaming wrapper that preserves real-time flow
1121+
function_call.result = self._streaming_generator_wrapper(
1122+
function_call.result, span
1123+
)
11421124
response.result = function_call.result
1125+
# Note: span attributes will be set when streaming completes
1126+
return response
11431127
elif isinstance(function_call.result, ToolResult):
11441128
function_result = function_call.result.content
11451129
else:
@@ -1163,19 +1147,85 @@ async def arun(
11631147

11641148
return response
11651149

1166-
def _generator_wrapper(
1150+
def _streaming_generator_wrapper(
11671151
self,
1168-
events: List[Union[RunOutputEvent, TeamRunOutputEvent]],
1169-
) -> Iterator[Union[RunOutputEvent, TeamRunOutputEvent]]:
1170-
for event in events:
1171-
yield event
1152+
original_generator: Union[
1153+
Iterator[Union[RunOutputEvent, TeamRunOutputEvent]], Iterator[Any]
1154+
],
1155+
span: trace_api.Span,
1156+
) -> Iterator[Union[RunOutputEvent, TeamRunOutputEvent, Any]]:
1157+
"""
1158+
Streaming wrapper that preserves real-time flow while collecting data for observability.
1159+
Yields items immediately to maintain streaming UX, collects content for logging.
1160+
"""
1161+
function_result = ""
1162+
try:
1163+
for item in original_generator:
1164+
# Yield immediately to preserve streaming
1165+
yield item
1166+
1167+
# Collect for logging (non-blocking)
1168+
try:
1169+
if isinstance(item, (RunContentEvent, TeamRunContentEvent)):
1170+
function_result += self._parse_content(item.content)
1171+
else:
1172+
function_result += str(item)
1173+
except Exception:
1174+
# Don't break streaming if logging fails
1175+
pass
11721176

1173-
async def _async_generator_wrapper(
1177+
# Set span attributes after streaming completes
1178+
span.set_status(trace_api.StatusCode.OK)
1179+
span.set_attributes(
1180+
dict(
1181+
_output_value_and_mime_type_for_tool_span(
1182+
result=function_result,
1183+
)
1184+
)
1185+
)
1186+
except Exception as e:
1187+
span.set_status(trace_api.StatusCode.ERROR, str(e))
1188+
raise
1189+
1190+
async def _streaming_async_generator_wrapper(
11741191
self,
1175-
events: List[Union[RunOutputEvent, TeamRunOutputEvent]],
1176-
) -> AsyncIterator[Union[RunOutputEvent, TeamRunOutputEvent]]:
1177-
for event in events:
1178-
yield event
1192+
original_generator: Union[
1193+
AsyncIterator[Union[RunOutputEvent, TeamRunOutputEvent]], AsyncIterator[Any]
1194+
],
1195+
span: trace_api.Span,
1196+
) -> AsyncIterator[Union[RunOutputEvent, TeamRunOutputEvent, Any]]:
1197+
"""
1198+
Async streaming wrapper that preserves real-time flow while collecting data for observability.
1199+
Yields items immediately to maintain streaming UX, collects content for logging.
1200+
"""
1201+
function_result = ""
1202+
try:
1203+
async for item in original_generator:
1204+
# Yield immediately to preserve streaming
1205+
yield item
1206+
1207+
# Collect for logging (non-blocking)
1208+
try:
1209+
if isinstance(item, (RunContentEvent, TeamRunContentEvent)):
1210+
function_result += self._parse_content(item.content)
1211+
else:
1212+
function_result += str(item)
1213+
except Exception:
1214+
# Don't break streaming if logging fails
1215+
pass
1216+
1217+
# Set span attributes after streaming completes
1218+
span.set_status(trace_api.StatusCode.OK)
1219+
span.set_attributes(
1220+
dict(
1221+
_output_value_and_mime_type_for_tool_span(
1222+
result=function_result,
1223+
)
1224+
)
1225+
)
1226+
except Exception as e:
1227+
span.set_status(trace_api.StatusCode.ERROR, str(e))
1228+
raise
11791229

11801230
def _parse_content(self, content: Any) -> str:
11811231
from pydantic import BaseModel

0 commit comments

Comments
 (0)