Skip to content

Commit 1126e5e

Browse files
gary-huangNayeem Kamalnayeem-kamal
authored
Implement LLM Observability SDK (#8781)
* add APIs for llm obs sdk (#8135) * add APIs for llm obs * add llm message class to support llm spans * follow java convention of naming Id instead of ID * add codeowners * implement LLM Obs SDK spans APIs (#8390) * add APIs for llm obs * add llm message class to support llm spans * add llm message class to support llm spans * impl llmobs agent and llmobs apis * support llm messages with tool calls * handle default model name and provider * rm unneeded file * spotless * add APIs for llm obs sdk (#8135) * add APIs for llm obs * add llm message class to support llm spans * follow java convention of naming Id instead of ID * add codeowners * rename ID to Id according to java naming conventions * Undo change to integrations-core submodule * fix build gradle * rm empty line * fix test * LLM Obs SDK Mapper (#8372) * add APIs for llm obs * add llm message class to support llm spans * add llm message class to support llm spans * impl llmobs agent and llmobs apis * support llm messages with tool calls * handle default model name and provider * rm unneeded file * impl llmobs agent and llmobs apis * impl llmobs agent * working writer * add support for llm message and tool calls * cleaned up whitespace * resolve merge conflicts * remaining merge conflicts * fix bad method call * fixed llmobs intake creation if llmobs not enabled * removed print statements * added tests for llmobsspanmapper * fixed coverage for tags --------- Co-authored-by: Nayeem Kamal <[email protected]> * updated to master submodule * LLM Obs SDK use context API for parent children span linkage (#8711) * add APIs for llm obs * add llm message class to support llm spans * add llm message class to support llm spans * impl llmobs agent and llmobs apis * support llm messages with tool calls * handle default model name and provider * rm unneeded file * impl llmobs agent and llmobs apis * impl llmobs agent * working writer * add support for llm message and tool calls * impl llmobs agent and llmobs apis * use new ctx api to track parent span * cleaned up whitespace * resolve merge conflicts * remaining merge conflicts * fix bad method call * fixed llmobs intake creation if llmobs not enabled * removed print statements * ran spotless * added tests for llmobsspanmapper * fixed coverage for tags --------- Co-authored-by: Nayeem Kamal <[email protected]> Co-authored-by: Nayeem Kamal <[email protected]> * LLM Obs SDK evaluation metrics submission (#8688) * add APIs for llm obs * add llm message class to support llm spans * add llm message class to support llm spans * impl llmobs agent and llmobs apis * support llm messages with tool calls * handle default model name and provider * rm unneeded file * impl llmobs agent and llmobs apis * impl llmobs agent * working writer * add support for llm message and tool calls * impl llmobs agent and llmobs apis * use new ctx api to track parent span * add api for evals * working impl supporting both agentless and agent * handle null tags and default to default ml app if null or empty string provided in the override * cleaned up whitespace * resolve merge conflicts * remaining merge conflicts * fix bad method call * fixed llmobs intake creation if llmobs not enabled * removed print statements * ran spotless * ran spotless * added tests for llmobsspanmapper * fixed coverage for tags --------- Co-authored-by: Nayeem Kamal <[email protected]> Co-authored-by: Nayeem Kamal <[email protected]> * fix CODEOWNERS --------- Co-authored-by: Nayeem Kamal <[email protected]> Co-authored-by: Nayeem Kamal <[email protected]>
1 parent 391c8ae commit 1126e5e

File tree

32 files changed

+2366
-8
lines changed

32 files changed

+2366
-8
lines changed

.github/CODEOWNERS

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,9 @@
110110
/internal-api/src/main/java/datadog/trace/api/EndpointCheckpointer.java @DataDog/profiling-java
111111
/internal-api/src/main/java/datadog/trace/api/EndpointTracker.java @DataDog/profiling-java
112112
/dd-smoke-tests/profiling-integration-tests/ @DataDog/profiling-java
113+
114+
# @DataDog/ml-observability
115+
dd-trace-api/src/main/java/datadog/trace/api/llmobs/ @DataDog/ml-observability
116+
dd-java-agent/agent-llmobs/ @DataDog/ml-observability
117+
dd-trace-core/src/main/java/datadog/trace/llmobs/ @DataDog/ml-observability
118+
dd-trace-core/src/test/groovy/datadog/trace/llmobs/ @DataDog/ml-observability

communication/src/main/java/datadog/communication/BackendApiFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private HttpUrl getAgentlessUrl(Intake intake) {
7272

7373
public enum Intake {
7474
API("api", "v2", Config::isCiVisibilityAgentlessEnabled, Config::getCiVisibilityAgentlessUrl),
75+
LLMOBS_API("api", "v2", Config::isLlmObsAgentlessEnabled, Config::getLlMObsAgentlessUrl),
7576
LOGS(
7677
"http-intake.logs",
7778
"v2",

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import datadog.trace.api.config.GeneralConfig;
2727
import datadog.trace.api.config.IastConfig;
2828
import datadog.trace.api.config.JmxFetchConfig;
29+
import datadog.trace.api.config.LlmObsConfig;
2930
import datadog.trace.api.config.ProfilingConfig;
3031
import datadog.trace.api.config.RemoteConfigConfig;
3132
import datadog.trace.api.config.TraceInstrumentationConfig;
@@ -42,6 +43,7 @@
4243
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
4344
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.TracerAPI;
4445
import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration;
46+
import datadog.trace.bootstrap.instrumentation.api.WriterConstants;
4547
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling;
4648
import datadog.trace.util.AgentTaskScheduler;
4749
import datadog.trace.util.AgentThreadFactory.AgentThread;
@@ -109,7 +111,9 @@ private enum AgentFeature {
109111
EXCEPTION_REPLAY(DebuggerConfig.EXCEPTION_REPLAY_ENABLED, false),
110112
CODE_ORIGIN(TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED, false),
111113
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
112-
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false);
114+
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
115+
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
116+
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false);
113117

114118
private final String configKey;
115119
private final String systemProp;
@@ -156,6 +160,8 @@ public boolean isEnabledByDefault() {
156160
private static boolean iastFullyDisabled;
157161
private static boolean cwsEnabled = false;
158162
private static boolean ciVisibilityEnabled = false;
163+
private static boolean llmObsEnabled = false;
164+
private static boolean llmObsAgentlessEnabled = false;
159165
private static boolean usmEnabled = false;
160166
private static boolean telemetryEnabled = true;
161167
private static boolean dynamicInstrumentationEnabled = false;
@@ -290,6 +296,25 @@ public static void start(
290296
exceptionReplayEnabled = isFeatureEnabled(AgentFeature.EXCEPTION_REPLAY);
291297
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
292298
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
299+
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
300+
301+
// setup writers when llmobs is enabled to accomodate apm and llmobs
302+
if (llmObsEnabled) {
303+
// for llm obs spans, use agent proxy by default, apm spans will use agent writer
304+
setSystemPropertyDefault(
305+
propertyNameToSystemPropertyName(TracerConfig.WRITER_TYPE),
306+
WriterConstants.MULTI_WRITER_TYPE
307+
+ ":"
308+
+ WriterConstants.DD_INTAKE_WRITER_TYPE
309+
+ ","
310+
+ WriterConstants.DD_AGENT_WRITER_TYPE);
311+
if (llmObsAgentlessEnabled) {
312+
// use API writer only
313+
setSystemPropertyDefault(
314+
propertyNameToSystemPropertyName(TracerConfig.WRITER_TYPE),
315+
WriterConstants.DD_INTAKE_WRITER_TYPE);
316+
}
317+
}
293318

294319
if (profilingEnabled) {
295320
if (!isOracleJDK8()) {
@@ -578,6 +603,7 @@ public void execute() {
578603

579604
maybeStartAppSec(scoClass, sco);
580605
maybeStartCiVisibility(instrumentation, scoClass, sco);
606+
maybeStartLLMObs(instrumentation, scoClass, sco);
581607
// start debugger before remote config to subscribe to it before starting to poll
582608
maybeStartDebugger(instrumentation, scoClass, sco);
583609
maybeStartRemoteConfig(scoClass, sco);
@@ -933,6 +959,24 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
933959
}
934960
}
935961

962+
private static void maybeStartLLMObs(Instrumentation inst, Class<?> scoClass, Object sco) {
963+
if (llmObsEnabled) {
964+
StaticEventLogger.begin("LLM Observability");
965+
966+
try {
967+
final Class<?> llmObsSysClass =
968+
AGENT_CLASSLOADER.loadClass("datadog.trace.llmobs.LLMObsSystem");
969+
final Method llmObsInstallerMethod =
970+
llmObsSysClass.getMethod("start", Instrumentation.class, scoClass);
971+
llmObsInstallerMethod.invoke(null, inst, sco);
972+
} catch (final Throwable e) {
973+
log.warn("Not starting LLM Observability subsystem", e);
974+
}
975+
976+
StaticEventLogger.end("LLM Observability");
977+
}
978+
}
979+
936980
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
937981
if (agentlessLogSubmissionEnabled) {
938982
StaticEventLogger.begin("Logs Intake");
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
6+
dependencies {
7+
classpath group: 'org.jetbrains.kotlin', name: 'kotlin-gradle-plugin', version: libs.versions.kotlin.get()
8+
}
9+
}
10+
11+
plugins {
12+
id 'com.gradleup.shadow'
13+
id 'java-test-fixtures'
14+
}
15+
16+
apply from: "$rootDir/gradle/java.gradle"
17+
apply from: "$rootDir/gradle/version.gradle"
18+
apply from: "$rootDir/gradle/test-with-kotlin.gradle"
19+
20+
minimumBranchCoverage = 0.0
21+
minimumInstructionCoverage = 0.0
22+
23+
dependencies {
24+
api libs.slf4j
25+
implementation libs.jctools
26+
27+
implementation project(':communication')
28+
implementation project(':components:json')
29+
implementation project(':internal-api')
30+
31+
testImplementation project(":utils:test-utils")
32+
33+
testFixturesApi project(':dd-java-agent:testing')
34+
testFixturesApi project(':utils:test-utils')
35+
}
36+
37+
shadowJar {
38+
dependencies deps.excludeShared
39+
}
40+
41+
jar {
42+
archiveClassifier = 'unbundled'
43+
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package datadog.trace.llmobs;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
5+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
6+
7+
import com.squareup.moshi.JsonAdapter;
8+
import com.squareup.moshi.Moshi;
9+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
10+
import datadog.communication.ddagent.SharedCommunicationObjects;
11+
import datadog.communication.http.HttpRetryPolicy;
12+
import datadog.communication.http.OkHttpUtils;
13+
import datadog.trace.api.Config;
14+
import datadog.trace.llmobs.domain.LLMObsEval;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
18+
import okhttp3.Headers;
19+
import okhttp3.HttpUrl;
20+
import okhttp3.OkHttpClient;
21+
import okhttp3.Request;
22+
import okhttp3.RequestBody;
23+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class EvalProcessingWorker implements AutoCloseable {
28+
29+
private static final String EVAL_METRIC_API_DOMAIN = "api";
30+
private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric";
31+
32+
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
33+
private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY";
34+
35+
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
36+
37+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
38+
private final Thread serializerThread;
39+
40+
public EvalProcessingWorker(
41+
final int capacity,
42+
final long flushInterval,
43+
final TimeUnit timeUnit,
44+
final SharedCommunicationObjects sco,
45+
Config config) {
46+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
47+
48+
boolean isAgentless = config.isLlmObsAgentlessEnabled();
49+
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
50+
log.error("Agentless eval metric submission requires an API key");
51+
}
52+
53+
Headers headers;
54+
HttpUrl submissionUrl;
55+
if (isAgentless) {
56+
submissionUrl =
57+
HttpUrl.get(
58+
"https://"
59+
+ EVAL_METRIC_API_DOMAIN
60+
+ "."
61+
+ config.getSite()
62+
+ "/"
63+
+ EVAL_METRIC_API_PATH);
64+
headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey());
65+
} else {
66+
submissionUrl =
67+
HttpUrl.get(
68+
sco.agentUrl.toString()
69+
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
70+
+ EVAL_METRIC_API_PATH);
71+
headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN);
72+
}
73+
74+
EvalSerializingHandler serializingHandler =
75+
new EvalSerializingHandler(queue, flushInterval, timeUnit, submissionUrl, headers);
76+
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler);
77+
}
78+
79+
public void start() {
80+
this.serializerThread.start();
81+
}
82+
83+
public boolean addToQueue(final LLMObsEval eval) {
84+
return queue.offer(eval);
85+
}
86+
87+
@Override
88+
public void close() {
89+
serializerThread.interrupt();
90+
try {
91+
serializerThread.join(THREAD_JOIN_TIMOUT_MS);
92+
} catch (InterruptedException ignored) {
93+
}
94+
}
95+
96+
public static class EvalSerializingHandler implements Runnable {
97+
98+
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
99+
private static final int FLUSH_THRESHOLD = 50;
100+
101+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
102+
private final long ticksRequiredToFlush;
103+
private long lastTicks;
104+
105+
private final Moshi moshi;
106+
private final JsonAdapter<LLMObsEval.Request> evalJsonAdapter;
107+
private final OkHttpClient httpClient;
108+
private final HttpUrl submissionUrl;
109+
private final Headers headers;
110+
111+
private final List<LLMObsEval> buffer = new ArrayList<>();
112+
113+
public EvalSerializingHandler(
114+
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
115+
final long flushInterval,
116+
final TimeUnit timeUnit,
117+
final HttpUrl submissionUrl,
118+
final Headers headers) {
119+
this.queue = queue;
120+
this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build();
121+
122+
this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class);
123+
this.httpClient = new OkHttpClient();
124+
this.submissionUrl = submissionUrl;
125+
this.headers = headers;
126+
127+
this.lastTicks = System.nanoTime();
128+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
129+
130+
log.debug("starting eval metric serializer, url={}", submissionUrl);
131+
}
132+
133+
@Override
134+
public void run() {
135+
try {
136+
runDutyCycle();
137+
} catch (InterruptedException e) {
138+
Thread.currentThread().interrupt();
139+
}
140+
log.debug(
141+
"eval processor worker exited. submitting evals stopped. unsubmitted evals left: "
142+
+ !queuesAreEmpty());
143+
}
144+
145+
private void runDutyCycle() throws InterruptedException {
146+
Thread thread = Thread.currentThread();
147+
while (!thread.isInterrupted()) {
148+
consumeBatch();
149+
flushIfNecessary();
150+
}
151+
}
152+
153+
private void consumeBatch() {
154+
queue.drain(buffer::add, queue.size());
155+
}
156+
157+
protected void flushIfNecessary() {
158+
if (buffer.isEmpty()) {
159+
return;
160+
}
161+
if (shouldFlush()) {
162+
LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer);
163+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
164+
165+
String reqBod = evalJsonAdapter.toJson(llmobsEvalReq);
166+
167+
RequestBody requestBody =
168+
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
169+
Request request =
170+
new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build();
171+
172+
try (okhttp3.Response response =
173+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
174+
175+
if (response.isSuccessful()) {
176+
log.debug("successfully flushed evaluation request with {} evals", this.buffer.size());
177+
this.buffer.clear();
178+
} else {
179+
log.error(
180+
"Could not submit eval metrics (HTTP code "
181+
+ response.code()
182+
+ ")"
183+
+ (response.body() != null ? ": " + response.body().string() : ""));
184+
}
185+
} catch (Exception e) {
186+
log.error("Could not submit eval metrics", e);
187+
}
188+
}
189+
}
190+
191+
private boolean shouldFlush() {
192+
long nanoTime = System.nanoTime();
193+
long ticks = nanoTime - lastTicks;
194+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
195+
lastTicks = nanoTime;
196+
return true;
197+
}
198+
return false;
199+
}
200+
201+
protected boolean queuesAreEmpty() {
202+
return queue.isEmpty();
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)