Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,10 @@ dd-java-agent/instrumentation/spring-security-5/ @DataDog/asm-java
# @DataDog/data-jobs-monitoring
dd-java-agent/instrumentation/spark/ @DataDog/data-jobs-monitoring
dd-java-agent/instrumentation/spark-executor/ @DataDog/data-jobs-monitoring

# @DataDog/data-streams-monitoring
dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/datastreams @DataDog/data-streams-monitoring
dd-trace-core/src/main/java/datadog/trace/core/datastreams @DataDog/data-streams-monitoring
dd-trace-core/src/test/groovy/datadog/trace/core/datastreams @DataDog/data-streams-monitoring
internal-api/src/main/java/datadog/trace/api/datastreams @DataDog/data-streams-monitoring
internal-api/src/test/groovy/datadog/trace/api/datastreams @DataDog/data-streams-monitoring
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static datadog.trace.api.cache.RadixTreeCache.UNSET_STATUS;
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.decorator.http.HttpResourceDecorator.HTTP_RESOURCE_DECORATOR;
Expand Down Expand Up @@ -148,7 +149,7 @@ public AgentSpan startSpan(
}
AgentPropagation.ContextVisitor<REQUEST_CARRIER> getter = getter();
if (null != carrier && null != getter) {
tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
tracer().getDataStreamsMonitoring().setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import datadog.trace.api.DDSpanId;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.AKKA_CLIENT_REQUEST;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientHelpers.AkkaHttpHeaders;
Expand All @@ -17,9 +17,9 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import net.bytebuddy.asm.Advice;
import scala.concurrent.Future;

Expand Down Expand Up @@ -79,10 +79,8 @@ public static AgentScope methodEnter(
DECORATE.onRequest(span, request);

if (request != null) {
defaultPropagator().inject(span, request, headers);
propagate()
.injectPathwayContext(
span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;

import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import net.bytebuddy.asm.Advice;
import scala.concurrent.Future;

Expand All @@ -29,10 +29,8 @@ public static AgentScope methodEnter(
AkkaHttpClientDecorator.DECORATE.onRequest(span, request);

if (request != null) {
defaultPropagator().inject(span, request, headers);
propagate()
.injectPathwayContext(
span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package datadog.trace.instrumentation.apachehttpasyncclient;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpasyncclient.ApacheHttpAsyncClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpasyncclient.HttpHeadersInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import java.io.IOException;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -35,9 +35,8 @@ public HttpRequest generateRequest() throws IOException, HttpException {
final HttpRequest request = delegate.generateRequest();
DECORATE.onRequest(span, new HostAndRequestAsHttpUriRequest(delegate.getTarget(), request));

defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, SETTER);

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient.ApacheHttpClientDecorator.HTTP_REQUEST;
import static datadog.trace.instrumentation.apachehttpclient.HttpHeadersInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -46,10 +46,8 @@ private static AgentScope activateHttpSpan(final HttpUriRequest request) {

// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!awsClientCall) {
defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(
span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, SETTER);
}

return scope;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package datadog.trace.instrumentation.apachehttpclient5;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient5.HttpHeadersInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import java.io.IOException;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
Expand All @@ -28,9 +28,8 @@ public void sendRequest(HttpRequest request, EntityDetails entityDetails, HttpCo
throws HttpException, IOException {
DECORATE.onRequest(span, request);

defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, SETTER);
delegate.sendRequest(request, entityDetails, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.HTTP_REQUEST;
import static datadog.trace.instrumentation.apachehttpclient5.HttpHeadersInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
Expand Down Expand Up @@ -46,10 +46,8 @@ private static AgentScope activateHttpSpan(final HttpRequest request) {
final boolean awsClientCall = request.containsHeader("amz-sdk-invocation-id");
// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!awsClientCall) {
defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(
span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), request, SETTER);
}

return scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.DECORATE;
Expand All @@ -21,6 +20,7 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.agent.tooling.muzzle.Reference;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand Down Expand Up @@ -121,8 +121,8 @@ public static <T> AgentScope before(
if (null != responseListener && null != headers) {
span = InstrumentationContext.get(ClientCall.class, AgentSpan.class).get(call);
if (null != span) {
defaultPropagator().inject(span, headers, SETTER);
propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
defaultPropagator().inject(span.with(dsmContext), headers, SETTER);
return activateSpan(span);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.armeria.grpc.server;

import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -71,7 +72,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
.setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));

RequestContext reqContext = span.getRequestContext();
if (reqContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package datadog.trace.instrumentation.avro;

import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Schema;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package datadog.trace.instrumentation.aws.v2.eventbridge;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -86,12 +86,13 @@ private String getTraceContextToInject(
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append('{');

// Inject trace context
defaultPropagator().inject(span, jsonBuilder, SETTER);

// Inject context
datadog.context.Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(eventBusName));
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(eventBusName));
context = context.with(dsmContext);
}
defaultPropagator().inject(context, jsonBuilder, SETTER);

// Add bus name and start time
jsonBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;

Expand Down Expand Up @@ -264,7 +265,7 @@ public AgentSpan onServiceResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, responseSize);
.setCheckpoint(span, create(sortedTags, 0, responseSize));
}

if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
Expand All @@ -285,7 +286,7 @@ public AgentSpan onServiceResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, payloadSize);
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
Expand All @@ -18,11 +19,12 @@
import com.amazonaws.handlers.RequestHandler2;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -122,8 +124,8 @@ public void afterResponse(final Request<?> request, final Response<?> response)
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
pathwayContext.setCheckpoint(
sortedTags, dataStreamsMonitoring::add, arrivalTime.getTime());
DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0);
pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add);
if (!span.context().getPathwayContext().isStarted()) {
span.context().mergePathwayContext(pathwayContext);
}
Expand Down
Loading
Loading