Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions tests/contrib/anthropic/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import mock
import pytest

from ddtrace._trace.pin import Pin
from ddtrace.contrib.internal.anthropic.patch import patch
from ddtrace.contrib.internal.anthropic.patch import unpatch
from ddtrace.llmobs import LLMObs
Expand All @@ -24,21 +23,13 @@ def ddtrace_global_config():


@pytest.fixture
def snapshot_tracer(anthropic):
pin = Pin.get_from(anthropic)
yield pin.tracer


@pytest.fixture
def mock_tracer(ddtrace_global_config, tracer, anthropic):
def test_spans(ddtrace_global_config, test_spans):
try:
pin = Pin.get_from(anthropic)
pin._override(anthropic, tracer=tracer)
if ddtrace_global_config.get("_llmobs_enabled", False):
# Have to disable and re-enable LLMObs to use to mock tracer.
LLMObs.disable()
LLMObs.enable(_tracer=tracer, integrations_enabled=False)
yield tracer
LLMObs.enable(integrations_enabled=False)
yield test_spans
finally:
LLMObs.disable()

Expand Down
9 changes: 4 additions & 5 deletions tests/contrib/anthropic/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest

from ddtrace.internal.utils.version import parse_version
from tests.utils import TracerSpanContainer
from tests.utils import override_global_config

from .utils import tools
Expand All @@ -13,7 +12,7 @@
ANTHROPIC_VERSION = parse_version(anthropic_module.__version__)


def test_global_tags(ddtrace_config_anthropic, anthropic, request_vcr, mock_tracer):
def test_global_tags(ddtrace_config_anthropic, anthropic, request_vcr, test_spans):
"""
When the global config UST tags are set
The service name should be used for all data
Expand All @@ -30,7 +29,7 @@ def test_global_tags(ddtrace_config_anthropic, anthropic, request_vcr, mock_trac
messages=[{"role": "user", "content": "What does Nietzsche mean by 'God is dead'?"}],
)

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert span.resource == "Messages.create"
assert span.service == "test-svc"
assert span.get_tag("env") == "staging"
Expand Down Expand Up @@ -278,7 +277,7 @@ def test_anthropic_llm_sync_tools_full_use(anthropic, request_vcr, snapshot_cont


@pytest.mark.asyncio
async def test_global_tags_async(ddtrace_config_anthropic, anthropic, request_vcr, mock_tracer):
async def test_global_tags_async(ddtrace_config_anthropic, anthropic, request_vcr, test_spans):
"""
When the global config UST tags are set
The service name should be used for all data
Expand All @@ -295,7 +294,7 @@ async def test_global_tags_async(ddtrace_config_anthropic, anthropic, request_vc
messages=[{"role": "user", "content": "What does Nietzsche mean by 'God is dead'?"}],
)

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert span.resource == "AsyncMessages.create"
assert span.service == "test-svc"
assert span.get_tag("env") == "staging"
Expand Down
67 changes: 33 additions & 34 deletions tests/contrib/anthropic/test_anthropic_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from tests.llmobs._utils import anext_stream
from tests.llmobs._utils import iterate_stream
from tests.llmobs._utils import next_stream
from tests.utils import TracerSpanContainer


WEATHER_PROMPT = "What is the weather in San Francisco, CA?"
Expand Down Expand Up @@ -76,7 +75,7 @@ def test_completion_proxy(
anthropic,
ddtrace_global_config,
mock_llmobs_writer,
mock_tracer,
test_spans,
request_vcr,
):
llm = anthropic.Anthropic(base_url="http://localhost:4000")
Expand All @@ -97,7 +96,7 @@ def test_completion_proxy(
temperature=0.8,
messages=messages,
)
span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_non_llm_span_event(
Expand Down Expand Up @@ -128,11 +127,11 @@ def test_completion_proxy(
temperature=0.8,
messages=messages,
)
span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 2
assert mock_llmobs_writer.enqueue.call_args_list[1].args[0]["meta"]["span"]["kind"] == "llm"

def test_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
def test_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for completion endpoints when configured.

Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation.
Expand All @@ -154,7 +153,7 @@ def test_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer,
}
],
)
span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand All @@ -174,7 +173,7 @@ def test_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer,
)

def test_completion_with_multiple_system_prompts(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr
):
"""Ensure llmobs records are emitted for completion endpoints with a list of messages as the system prompt.

Expand Down Expand Up @@ -203,7 +202,7 @@ def test_completion_with_multiple_system_prompts(
}
],
)
span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand Down Expand Up @@ -232,7 +231,7 @@ def test_completion_with_multiple_system_prompts(
)
)

def test_error(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
def test_error(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an error.

Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation.
Expand All @@ -256,7 +255,7 @@ def test_error(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_
],
)

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand All @@ -279,7 +278,7 @@ def test_error(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_

@pytest.mark.parametrize("consume_stream", [iterate_stream, next_stream])
def test_stream(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr, consume_stream
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr, consume_stream
):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Expand All @@ -306,7 +305,7 @@ def test_stream(
)
consume_stream(stream)

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand All @@ -333,7 +332,7 @@ def test_stream(

@pytest.mark.parametrize("consume_stream", [iterate_stream, next_stream])
def test_stream_helper(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr, consume_stream
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr, consume_stream
):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Expand Down Expand Up @@ -365,7 +364,7 @@ def test_stream_helper(
message = stream.get_final_text()
assert message is not None

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand All @@ -390,7 +389,7 @@ def test_stream_helper(
)
)

def test_image(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
def test_image(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an image input.

Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation.
Expand Down Expand Up @@ -422,7 +421,7 @@ def test_image(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_
],
)

span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
assert mock_llmobs_writer.enqueue.call_count == 1
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand All @@ -446,7 +445,7 @@ def test_image(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_
)

@pytest.mark.skipif(ANTHROPIC_VERSION < (0, 27), reason="Anthropic Tools not available until 0.27.0, skipping.")
def test_tools_sync(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
def test_tools_sync(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation.
Expand All @@ -462,7 +461,7 @@ def test_tools_sync(self, anthropic, ddtrace_global_config, mock_llmobs_writer,
)
assert message is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_1 = traces[0][0]
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand Down Expand Up @@ -512,7 +511,7 @@ def test_tools_sync(self, anthropic, ddtrace_global_config, mock_llmobs_writer,
)
assert response is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_2 = traces[0][0]
assert mock_llmobs_writer.enqueue.call_count == 2
mock_llmobs_writer.enqueue.assert_called_with(
Expand Down Expand Up @@ -548,7 +547,7 @@ def test_tools_sync(self, anthropic, ddtrace_global_config, mock_llmobs_writer,

@pytest.mark.asyncio
@pytest.mark.skipif(ANTHROPIC_VERSION < (0, 27), reason="Anthropic Tools not available until 0.27.0, skipping.")
async def test_tools_async(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
async def test_tools_async(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation.
Expand All @@ -564,7 +563,7 @@ async def test_tools_async(self, anthropic, ddtrace_global_config, mock_llmobs_w
)
assert message is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_1 = traces[0][0]
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand Down Expand Up @@ -614,7 +613,7 @@ async def test_tools_async(self, anthropic, ddtrace_global_config, mock_llmobs_w
)
assert response is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_2 = traces[0][0]
assert mock_llmobs_writer.enqueue.call_count == 2
mock_llmobs_writer.enqueue.assert_called_with(
Expand Down Expand Up @@ -647,7 +646,7 @@ async def test_tools_async(self, anthropic, ddtrace_global_config, mock_llmobs_w
@pytest.mark.skipif(ANTHROPIC_VERSION < (0, 27), reason="Anthropic Tools not available until 0.27.0, skipping.")
@pytest.mark.parametrize("consume_stream", [iterate_stream, next_stream])
def test_tools_sync_stream(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr, consume_stream
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr, consume_stream
):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Expand Down Expand Up @@ -682,7 +681,7 @@ def test_tools_sync_stream(
},
]

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_1 = traces[0][0]
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand Down Expand Up @@ -736,7 +735,7 @@ def test_tools_sync_stream(
for _ in response:
pass

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_2 = traces[0][0]
assert mock_llmobs_writer.enqueue.call_count == 2
mock_llmobs_writer.enqueue.assert_called_with(
Expand Down Expand Up @@ -767,7 +766,7 @@ def test_tools_sync_stream(
@pytest.mark.skipif(ANTHROPIC_VERSION < (0, 27), reason="Anthropic Tools not available until 0.27.0, skipping.")
@pytest.mark.parametrize("consume_stream", [aiterate_stream, anext_stream])
async def test_tools_async_stream_helper(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr, consume_stream
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr, consume_stream
):
"""Ensure llmobs records are emitted for completion endpoints when configured and there is an stream input.

Expand All @@ -789,7 +788,7 @@ async def test_tools_async_stream_helper(
raw_message = await stream.get_final_text()
assert raw_message is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_1 = traces[0][0]
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
Expand Down Expand Up @@ -847,7 +846,7 @@ async def test_tools_async_stream_helper(
raw_message = await stream.get_final_text()
assert raw_message is not None

traces = TracerSpanContainer(mock_tracer).pop_traces()
traces = test_spans.pop_traces()
span_2 = traces[0][0]
assert mock_llmobs_writer.enqueue.call_count == 2
mock_llmobs_writer.enqueue.assert_called_with(
Expand Down Expand Up @@ -875,7 +874,7 @@ async def test_tools_async_stream_helper(
)

def test_completion_prompt_caching(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr
):
llm = anthropic.Anthropic()
"""Test that prompt caching metrics are properly captured for both cache creation and cache read."""
Expand All @@ -900,7 +899,7 @@ def test_completion_prompt_caching(
)
with request_vcr.use_cassette("anthropic_completion_cache_read.yaml"):
llm.messages.create(**inference_args, messages=[{"role": "user", "content": "What is a system"}])
spans = TracerSpanContainer(mock_tracer).pop_traces()
spans = test_spans.pop_traces()
span1, span2 = spans[0][0], spans[1][0]
assert mock_llmobs_writer.enqueue.call_count == 2

Expand Down Expand Up @@ -970,7 +969,7 @@ def test_completion_prompt_caching(
)

def test_completion_stream_prompt_caching(
self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr
self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr
):
"""Test that prompt caching metrics are properly captured for streamed completions."""
large_system_prompt = [
Expand Down Expand Up @@ -1001,7 +1000,7 @@ def test_completion_stream_prompt_caching(
for _ in stream2:
pass

spans = TracerSpanContainer(mock_tracer).pop_traces()
spans = test_spans.pop_traces()
span1, span2 = spans[0][0], spans[1][0]
assert mock_llmobs_writer.enqueue.call_count == 2

Expand Down Expand Up @@ -1071,7 +1070,7 @@ def test_completion_stream_prompt_caching(
)

@pytest.mark.skipif(ANTHROPIC_VERSION < (0, 37), reason=BETA_SKIP_REASON)
def test_beta_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer, mock_tracer, request_vcr):
def test_beta_completion(self, anthropic, ddtrace_global_config, mock_llmobs_writer, test_spans, request_vcr):
"""Ensure llmobs records are emitted for beta completion endpoints."""
llm = anthropic.Anthropic()
with request_vcr.use_cassette("anthropic_completion.yaml"):
Expand All @@ -1080,7 +1079,7 @@ def test_beta_completion(self, anthropic, ddtrace_global_config, mock_llmobs_wri
max_tokens=15,
messages=[{"role": "user", "content": "What does Nietzsche mean by 'God is dead'?"}],
)
span = TracerSpanContainer(mock_tracer).pop_traces()[0][0]
span = test_spans.pop_traces()[0][0]
mock_llmobs_writer.enqueue.assert_called_with(
_expected_llmobs_llm_span_event(
span,
Expand Down
Loading