Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void grpcInstrumentation() {
.hasAttributesSatisfyingExactly(
equalTo(
MessageIncubatingAttributes.MESSAGE_TYPE, "RECEIVED"),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L))),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L))),
span ->
span.hasName("example.Greeter/SayHello")
.hasKind(SpanKind.SERVER)
Expand All @@ -123,6 +123,6 @@ void grpcInstrumentation() {
.hasName("message")
.hasAttributesSatisfyingExactly(
equalTo(MessageIncubatingAttributes.MESSAGE_TYPE, "SENT"),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L)))));
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L)))));
}
}
1 change: 1 addition & 0 deletions instrumentation/grpc-1.6/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

| System property | Type | Default | Description |
|-------------------------------------------------------------|---------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `otel.instrumentation.grpc.message-events` | Boolean | `true` | Determines whether to add span event for each individual message received and sent. Set this to false in case of streaming large volumes of messages. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use otel.instrumentation.grpc.emit-message-events @trask should there also be experimental in the property name?
I'd change the description to use less opinionated language.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed the name and description. I think using experimental could suggest that the events are experimental maybe (which is not true)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trask should there also be experimental in the property name?

good question, I'm really not sure 😅, I opened #13487, but I think not having experimental in this property is probably the most consistent with the current (inconsistent) usage

| `otel.instrumentation.grpc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
| `otel.instrumentation.grpc.capture-metadata.client.request` | String | | A comma-separated list of request metadata keys. gRPC client instrumentation will capture metadata values corresponding to configured keys as span attributes. |
| `otel.instrumentation.grpc.capture-metadata.server.request` | String | | A comma-separated list of request metadata keys. gRPC server instrumentation will capture metadata values corresponding to configured keys as span attributes. |
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public final class GrpcSingletons {
private static final AtomicReference<Context.Storage> STORAGE_REFERENCE = new AtomicReference<>();

static {
boolean addMessageEvents =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.grpc.message-events", true);

boolean experimentalSpanAttributes =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.grpc.experimental-span-attributes", false);
Expand All @@ -40,6 +44,7 @@ public final class GrpcSingletons {

GrpcTelemetry telemetry =
GrpcTelemetry.builder(GlobalOpenTelemetry.get())
.setAddMessageEvents(addMessageEvents)
.setCaptureExperimentalSpanAttributes(experimentalSpanAttributes)
.setCapturedClientRequestMetadata(clientRequestMetadata)
.setCapturedServerRequestMetadata(serverRequestMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,36 @@ public static GrpcTelemetryBuilder builder(OpenTelemetry openTelemetry) {
private final Instrumenter<GrpcRequest, Status> clientInstrumenter;
private final ContextPropagators propagators;
private final boolean captureExperimentalSpanAttributes;
private final boolean addMessageEvents;

GrpcTelemetry(
Instrumenter<GrpcRequest, Status> serverInstrumenter,
Instrumenter<GrpcRequest, Status> clientInstrumenter,
ContextPropagators propagators,
boolean captureExperimentalSpanAttributes) {
boolean captureExperimentalSpanAttributes,
boolean addMessageEvents) {
this.serverInstrumenter = serverInstrumenter;
this.clientInstrumenter = clientInstrumenter;
this.propagators = propagators;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.addMessageEvents = addMessageEvents;
}

/**
* Returns a new {@link ClientInterceptor} for use with methods like {@link
* io.grpc.ManagedChannelBuilder#intercept(ClientInterceptor...)}.
*/
public ClientInterceptor newClientInterceptor() {
return new TracingClientInterceptor(clientInstrumenter, propagators);
return new TracingClientInterceptor(
clientInstrumenter, propagators, captureExperimentalSpanAttributes, addMessageEvents);
}

/**
* Returns a new {@link ServerInterceptor} for use with methods like {@link
* io.grpc.ServerBuilder#intercept(ServerInterceptor)}.
*/
public ServerInterceptor newServerInterceptor() {
return new TracingServerInterceptor(serverInstrumenter, captureExperimentalSpanAttributes);
return new TracingServerInterceptor(
serverInstrumenter, captureExperimentalSpanAttributes, addMessageEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class GrpcTelemetryBuilder {
additionalServerExtractors = new ArrayList<>();

private boolean captureExperimentalSpanAttributes;
private boolean addMessageEvents = true;
private List<String> capturedClientRequestMetadata = Collections.emptyList();
private List<String> capturedServerRequestMetadata = Collections.emptyList();

Expand Down Expand Up @@ -130,6 +131,16 @@ public GrpcTelemetryBuilder setPeerService(String peerService) {
return this;
}

/**
* Determines whether to add span event for each individual message received and sent. The default
* is true. Set this to false in case of streaming large volumes of messages.
*/
@CanIgnoreReturnValue
public GrpcTelemetryBuilder setAddMessageEvents(boolean addMessageEvents) {
this.addMessageEvents = addMessageEvents;
return this;
}

/**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by
Expand Down Expand Up @@ -211,6 +222,7 @@ public GrpcTelemetry build() {
// So we go ahead and inject manually in this instrumentation.
clientInstrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysClient()),
openTelemetry.getPropagators(),
captureExperimentalSpanAttributes);
captureExperimentalSpanAttributes,
addMessageEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,27 @@ final class TracingClientInterceptor implements ClientInterceptor {
private static final String RECEIVED = "RECEIVED";

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingClientCall> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "messageId");
private static final AtomicLongFieldUpdater<TracingClientCall> SENT_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "sentMessageId");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingClientCall> RECEIVED_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "receivedMessageId");

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final ContextPropagators propagators;
private final boolean captureExperimentalSpanAttributes;
private final boolean addMessageEvents;

TracingClientInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, ContextPropagators propagators) {
Instrumenter<GrpcRequest, Status> instrumenter,
ContextPropagators propagators,
boolean captureExperimentalSpanAttributes,
boolean addMessageEvents) {
this.instrumenter = instrumenter;
this.propagators = propagators;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.addMessageEvents = addMessageEvents;
}

@Override
Expand Down Expand Up @@ -75,9 +86,13 @@ final class TracingClientCall<REQUEST, RESPONSE>
private final Context context;
private final GrpcRequest request;

// Used by MESSAGE_ID_UPDATER
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;

// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;
volatile long receivedMessageId;

TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate,
Expand Down Expand Up @@ -113,10 +128,11 @@ public void sendMessage(REQUEST message) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
span.addEvent("message", attributes);
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
if (addMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
}

final class TracingClientCallListener
Expand All @@ -139,14 +155,11 @@ final class TracingClientCallListener

@Override
public void onMessage(RESPONSE message) {
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(
MESSAGE_TYPE,
RECEIVED,
MESSAGE_ID,
MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this));
span.addEvent("message", attributes);
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this);
if (addMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
}
Expand All @@ -155,6 +168,13 @@ public void onMessage(RESPONSE message) {
@Override
public void onClose(Status status, Metadata trailers) {
request.setPeerSocketAddress(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
"grpc.messages.received", RECEIVED_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
span.setAttribute(
"grpc.messages.sent", SENT_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
}
instrumenter.end(context, request, status, status.getCause());
try (Scope ignored = parentContext.makeCurrent()) {
delegate().onClose(status, trailers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,27 @@ final class TracingServerInterceptor implements ServerInterceptor {
private static final String RECEIVED = "RECEIVED";

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingServerCall> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "messageId");
private static final AtomicLongFieldUpdater<TracingServerCall> SENT_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "sentMessageId");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingServerCall> RECEIVED_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "receivedMessageId");

private static final Metadata.Key<String> AUTHORITY_KEY =
InternalMetadata.keyOf(":authority", Metadata.ASCII_STRING_MARSHALLER);

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final boolean captureExperimentalSpanAttributes;
private final boolean addMessageEvents;

TracingServerInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, boolean captureExperimentalSpanAttributes) {
Instrumenter<GrpcRequest, Status> instrumenter,
boolean captureExperimentalSpanAttributes,
boolean addMessageEvents) {
this.instrumenter = instrumenter;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.addMessageEvents = addMessageEvents;
}

@Override
Expand Down Expand Up @@ -85,9 +93,13 @@ final class TracingServerCall<REQUEST, RESPONSE>
private final GrpcRequest request;
private Status status;

// Used by MESSAGE_ID_UPDATER
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;

// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;
volatile long receivedMessageId;

TracingServerCall(
ServerCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
Expand All @@ -106,10 +118,11 @@ public void sendMessage(RESPONSE message) {
try (Scope ignored = context.makeCurrent()) {
super.sendMessage(message);
}
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
span.addEvent("message", attributes);
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
if (addMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
}

@Override
Expand All @@ -134,15 +147,27 @@ final class TracingServerCallListener
this.request = request;
}

private void end(Context context, GrpcRequest request, Status response, Throwable error) {
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
"grpc.messages.received", RECEIVED_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
span.setAttribute(
"grpc.messages.sent", SENT_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
if (Status.CANCELLED.equals(status)) {
span.setAttribute("grpc.canceled", true);
}
}
instrumenter.end(context, request, response, error);
}

@Override
public void onMessage(REQUEST message) {
Attributes attributes =
Attributes.of(
MESSAGE_TYPE,
RECEIVED,
MESSAGE_ID,
MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this));
Span.fromContext(context).addEvent("message", attributes);
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this);
if (addMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
delegate().onMessage(message);
}

Expand All @@ -160,36 +185,33 @@ public void onHalfClose() {
public void onCancel() {
try {
delegate().onCancel();
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute("grpc.canceled", true);
}
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
instrumenter.end(context, request, Status.CANCELLED, null);
end(context, request, Status.CANCELLED, null);
}

@Override
public void onComplete() {
try {
delegate().onComplete();
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
if (status == null) {
status = Status.UNKNOWN;
}
instrumenter.end(context, request, status, status.getCause());
end(context, request, status, status.getCause());
}

@Override
public void onReady() {
try {
delegate().onReady();
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
}
Expand Down
Loading
Loading