Skip to content

Commit 4a34024

Browse files
committed
Properly close span outselves for streaming
1 parent 42586cf commit 4a34024

File tree

1 file changed

+61
-40
lines changed
  • python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno

1 file changed

+61
-40
lines changed

python/instrumentation/openinference-instrumentation-agno/src/openinference/instrumentation/agno/_wrappers.py

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,49 +1035,67 @@ def run(
10351035

10361036
span_name = f"{function_name}"
10371037

1038-
with self._tracer.start_as_current_span(
1038+
# Start span manually (no 'with' context manager)
1039+
span = self._tracer.start_span(
10391040
span_name,
10401041
attributes={
10411042
OPENINFERENCE_SPAN_KIND: TOOL,
10421043
**dict(_input_value_and_mime_type_for_tool_span(function_arguments)),
10431044
**dict(_function_call_attributes(function_call)),
10441045
**dict(get_attributes_from_context()),
10451046
},
1046-
) as span:
1047-
response = wrapped(*args, **kwargs)
1047+
)
1048+
1049+
try:
1050+
# Set span as current for the wrapped call
1051+
with trace_api.use_span(span, end_on_exit=False):
1052+
response = wrapped(*args, **kwargs)
10481053

10491054
if response.status == "success":
1050-
function_result = ""
10511055
if isinstance(function_call.result, (GeneratorType, Iterator)):
1052-
# Create a streaming wrapper that preserves real-time flow
1056+
# Streaming case: wrapper will handle span closure after iteration completes
10531057
function_call.result = self._streaming_generator_wrapper(
1054-
function_call.result, span
1058+
function_call.result, span, should_close_span=True
10551059
)
10561060
response.result = function_call.result
1057-
# Note: span attributes will be set when streaming completes
10581061
return response
1059-
elif isinstance(function_call.result, ToolResult):
1060-
function_result = function_call.result.content
10611062
else:
1062-
function_result = function_call.result
1063-
span.set_status(trace_api.StatusCode.OK)
1064-
span.set_attributes(
1065-
dict(
1066-
_output_value_and_mime_type_for_tool_span(
1067-
result=function_result,
1068-
)
1069-
)
1070-
)
1063+
# Non-streaming case: handle immediately and close span
1064+
self._handle_non_streaming_success(function_call.result, span)
10711065
elif response.status == "failure":
10721066
function_error_message = function_call.error
10731067
span.set_status(trace_api.StatusCode.ERROR, function_error_message)
10741068
span.set_attribute(OUTPUT_VALUE, function_error_message)
10751069
span.set_attribute(OUTPUT_MIME_TYPE, TEXT)
1070+
span.end()
10761071
else:
10771072
span.set_status(trace_api.StatusCode.ERROR, "Unknown function call status")
1073+
span.end()
1074+
except Exception as e:
1075+
span.set_status(trace_api.StatusCode.ERROR, str(e))
1076+
span.end()
1077+
raise
10781078

10791079
return response
10801080

1081+
def _handle_non_streaming_success(self, result, span):
1082+
"""Handle non-streaming success case: set span attributes and close span."""
1083+
function_result = ""
1084+
if isinstance(result, ToolResult):
1085+
function_result = result.content
1086+
else:
1087+
function_result = result
1088+
1089+
span.set_status(trace_api.StatusCode.OK)
1090+
span.set_attributes(
1091+
dict(
1092+
_output_value_and_mime_type_for_tool_span(
1093+
result=function_result,
1094+
)
1095+
)
1096+
)
1097+
span.end()
1098+
10811099
async def arun(
10821100
self,
10831101
wrapped: Callable[..., Awaitable[Any]],
@@ -1095,55 +1113,53 @@ async def arun(
10951113

10961114
span_name = f"{function_name}"
10971115

1098-
with self._tracer.start_as_current_span(
1116+
# Start span manually (no 'with' context manager)
1117+
span = self._tracer.start_span(
10991118
span_name,
11001119
attributes={
11011120
OPENINFERENCE_SPAN_KIND: TOOL,
11021121
**dict(_input_value_and_mime_type_for_tool_span(function_arguments)),
11031122
**dict(_function_call_attributes(function_call)),
11041123
**dict(get_attributes_from_context()),
11051124
},
1106-
) as span:
1107-
response = await wrapped(*args, **kwargs)
1125+
)
1126+
1127+
try:
1128+
# Set span as current for the wrapped call
1129+
with trace_api.use_span(span, end_on_exit=False):
1130+
response = await wrapped(*args, **kwargs)
11081131

11091132
if response.status == "success":
1110-
function_result = ""
11111133
if isinstance(function_call.result, (AsyncGeneratorType, AsyncIterator)):
1112-
# Create a streaming wrapper that preserves real-time flow
1134+
# Async streaming case: wrapper will handle span closure after iteration completes
11131135
function_call.result = self._streaming_async_generator_wrapper(
1114-
function_call.result, span
1136+
function_call.result, span, should_close_span=True
11151137
)
11161138
response.result = function_call.result
1117-
# Note: span attributes will be set when streaming completes
11181139
return response
11191140
elif isinstance(function_call.result, (GeneratorType, Iterator)):
1120-
# Create a streaming wrapper that preserves real-time flow
1141+
# Sync streaming case: wrapper will handle span closure after iteration completes
11211142
function_call.result = self._streaming_generator_wrapper(
1122-
function_call.result, span
1143+
function_call.result, span, should_close_span=True
11231144
)
11241145
response.result = function_call.result
1125-
# Note: span attributes will be set when streaming completes
11261146
return response
1127-
elif isinstance(function_call.result, ToolResult):
1128-
function_result = function_call.result.content
11291147
else:
1130-
function_result = function_call.result
1131-
1132-
span.set_status(trace_api.StatusCode.OK)
1133-
span.set_attributes(
1134-
dict(
1135-
_output_value_and_mime_type_for_tool_span(
1136-
result=function_result,
1137-
)
1138-
)
1139-
)
1148+
# Non-streaming case: handle immediately and close span
1149+
self._handle_non_streaming_success(function_call.result, span)
11401150
elif response.status == "failure":
11411151
function_error_message = function_call.error
11421152
span.set_status(trace_api.StatusCode.ERROR, function_error_message)
11431153
span.set_attribute(OUTPUT_VALUE, function_error_message)
11441154
span.set_attribute(OUTPUT_MIME_TYPE, TEXT)
1155+
span.end()
11451156
else:
11461157
span.set_status(trace_api.StatusCode.ERROR, "Unknown function call status")
1158+
span.end()
1159+
except Exception as e:
1160+
span.set_status(trace_api.StatusCode.ERROR, str(e))
1161+
span.end()
1162+
raise
11471163

11481164
return response
11491165

@@ -1153,6 +1169,7 @@ def _streaming_generator_wrapper(
11531169
Iterator[Union[RunOutputEvent, TeamRunOutputEvent]], Iterator[Any]
11541170
],
11551171
span: trace_api.Span,
1172+
should_close_span: bool = False,
11561173
) -> Iterator[Union[RunOutputEvent, TeamRunOutputEvent, Any]]:
11571174
"""
11581175
Streaming wrapper that preserves real-time flow while collecting data for observability.
@@ -1186,6 +1203,10 @@ def _streaming_generator_wrapper(
11861203
except Exception as e:
11871204
span.set_status(trace_api.StatusCode.ERROR, str(e))
11881205
raise
1206+
finally:
1207+
# Close span if we're responsible for it
1208+
if should_close_span:
1209+
span.end()
11891210

11901211
async def _streaming_async_generator_wrapper(
11911212
self,

0 commit comments

Comments
 (0)