Skip to content

Commit 40e1913

Browse files
committed
add captured-headers
1 parent 5c4d6da commit 40e1913

File tree

10 files changed

+95
-75
lines changed

10 files changed

+95
-75
lines changed

docs/supported-libraries.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ These are the supported libraries and frameworks:
105105
| [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none |
106106
| [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics] [6] |
107107
| [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none |
108-
| [NATS](https://github.com/nats-io/nats.java) | 3.8+ | [nats-2.21](../instrumentation/nats/nats-2.21/library) | [Messaging Spans] |
108+
| [NATS](https://github.com/nats-io/nats.java) | 2.17.2+ | [nats-2.21](../instrumentation/nats/nats-2.21/library) | [Messaging Spans] |
109109
| [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
110110
| [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics] [6] |
111111
| [OkHttp](https://github.com/square/okhttp/) | 2.2+ | [opentelemetry-okhttp-3.0](../instrumentation/okhttp/okhttp-3.0/library) | [HTTP Client Spans], [HTTP Client Metrics] |

instrumentation/nats/nats-2.21/javaagent/build.gradle.kts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@ dependencies {
1919
}
2020

2121
tasks {
22-
val testMessagingReceive by registering(Test::class) {
22+
val testExperimental by registering(Test::class) {
2323
filter {
24-
includeTestsMatching("NatsInstrumentationMessagingReceiveTest")
24+
includeTestsMatching("NatsInstrumentationExperimentalTest")
2525
}
2626
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
27+
jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=captured-header")
2728
}
2829

2930
test {
3031
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
3132
filter {
32-
excludeTestsMatching("NatsInstrumentationMessagingReceiveTest")
33+
excludeTestsMatching("NatsInstrumentationExperimentalTest")
3334
}
3435
}
3536

3637
check {
37-
dependsOn(testMessagingReceive)
38+
dependsOn(testExperimental)
3839
}
3940
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsSingletons.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,28 @@
1111

1212
import io.opentelemetry.api.GlobalOpenTelemetry;
1313
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14-
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1514
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
15+
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
16+
import java.util.List;
1617

1718
public final class NatsSingletons {
1819

1920
private static final boolean messagingReceiveInstrumentationEnabled =
20-
ConfigPropertiesUtil.getBoolean(
21-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false);
21+
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
22+
23+
private static final List<String> capturedHeaders =
24+
ExperimentalConfig.get().getMessagingHeaders();
2225

2326
public static final Instrumenter<NatsRequest, NatsRequest> PRODUCER_INSTRUMENTER =
24-
createProducerInstrumenter(GlobalOpenTelemetry.get());
27+
createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
2528

2629
public static final Instrumenter<NatsRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER =
2730
createConsumerReceiveInstrumenter(
28-
GlobalOpenTelemetry.get(), messagingReceiveInstrumentationEnabled);
31+
GlobalOpenTelemetry.get(), messagingReceiveInstrumentationEnabled, capturedHeaders);
2932

3033
public static final Instrumenter<NatsRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
3134
createConsumerProcessInstrumenter(
32-
GlobalOpenTelemetry.get(), messagingReceiveInstrumentationEnabled);
35+
GlobalOpenTelemetry.get(), messagingReceiveInstrumentationEnabled, capturedHeaders);
3336

3437
private NatsSingletons() {}
3538
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.nats.client.Connection;
99
import io.nats.client.Nats;
10-
import io.opentelemetry.instrumentation.nats.v2_21.AbstractNatsInstrumentationMessagingReceiveTest;
10+
import io.opentelemetry.instrumentation.nats.v2_21.AbstractNatsInstrumentationExperimentalTest;
1111
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1212
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1313
import java.io.IOException;
@@ -18,8 +18,7 @@
1818
import org.testcontainers.containers.GenericContainer;
1919
import org.testcontainers.utility.DockerImageName;
2020

21-
class NatsInstrumentationMessagingReceiveTest
22-
extends AbstractNatsInstrumentationMessagingReceiveTest {
21+
class NatsInstrumentationExperimentalTest extends AbstractNatsInstrumentationExperimentalTest {
2322

2423
@RegisterExtension
2524
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetryBuilder.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@
55

66
package io.opentelemetry.instrumentation.nats.v2_21;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.google.errorprone.annotations.CanIgnoreReturnValue;
911
import io.opentelemetry.api.OpenTelemetry;
1012
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory;
13+
import java.util.ArrayList;
14+
import java.util.Collection;
15+
import java.util.List;
1116

1217
public final class NatsTelemetryBuilder {
1318

1419
private final OpenTelemetry openTelemetry;
1520
private boolean messagingReceiveInstrumentationEnabled = false;
21+
private List<String> capturedHeaders = emptyList();
1622

1723
NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
1824
this.openTelemetry = openTelemetry;
@@ -24,12 +30,18 @@ public NatsTelemetryBuilder setMessagingReceiveInstrumentationEnabled(boolean en
2430
return this;
2531
}
2632

33+
@CanIgnoreReturnValue
34+
public NatsTelemetryBuilder setCapturedHeaders(Collection<String> capturedHeaders) {
35+
this.capturedHeaders = new ArrayList<>(capturedHeaders);
36+
return this;
37+
}
38+
2739
public NatsTelemetry build() {
2840
return new NatsTelemetry(
29-
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry),
41+
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders),
3042
NatsInstrumenterFactory.createConsumerReceiveInstrumenter(
31-
openTelemetry, messagingReceiveInstrumentationEnabled),
43+
openTelemetry, messagingReceiveInstrumentationEnabled, capturedHeaders),
3244
NatsInstrumenterFactory.createConsumerProcessInstrumenter(
33-
openTelemetry, messagingReceiveInstrumentationEnabled));
45+
openTelemetry, messagingReceiveInstrumentationEnabled, capturedHeaders));
3446
}
3547
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/internal/NatsInstrumenterFactory.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
1414
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
1515
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
16+
import java.util.List;
1617

1718
/**
1819
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
@@ -24,44 +25,53 @@ public final class NatsInstrumenterFactory {
2425
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.21";
2526

2627
public static Instrumenter<NatsRequest, NatsRequest> createProducerInstrumenter(
27-
OpenTelemetry openTelemetry) {
28+
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
2829
return Instrumenter.<NatsRequest, NatsRequest>builder(
2930
openTelemetry,
3031
INSTRUMENTATION_NAME,
3132
MessagingSpanNameExtractor.create(
3233
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.PUBLISH))
3334
.addAttributesExtractor(
34-
MessagingAttributesExtractor.create(
35-
NatsRequestMessagingAttributesGetter.NATS_REQUEST_INSTANCE,
36-
MessageOperation.PUBLISH))
35+
MessagingAttributesExtractor.builder(
36+
NatsRequestMessagingAttributesGetter.NATS_REQUEST_INSTANCE,
37+
MessageOperation.PUBLISH)
38+
.setCapturedHeaders(capturedHeaders)
39+
.build())
3740
.buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE);
3841
}
3942

4043
public static Instrumenter<NatsRequest, Void> createConsumerReceiveInstrumenter(
41-
OpenTelemetry openTelemetry, boolean enabled) {
44+
OpenTelemetry openTelemetry, boolean enabled, List<String> capturedHeaders) {
4245
return Instrumenter.<NatsRequest, Void>builder(
4346
openTelemetry,
4447
INSTRUMENTATION_NAME,
4548
MessagingSpanNameExtractor.create(
4649
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.RECEIVE))
4750
.addAttributesExtractor(
48-
MessagingAttributesExtractor.create(
49-
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.RECEIVE))
51+
MessagingAttributesExtractor.builder(
52+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.RECEIVE)
53+
.setCapturedHeaders(capturedHeaders)
54+
.build())
5055
.setEnabled(enabled)
5156
.buildConsumerInstrumenter(NatsRequestTextMapGetter.INSTANCE);
5257
}
5358

5459
public static Instrumenter<NatsRequest, Void> createConsumerProcessInstrumenter(
55-
OpenTelemetry openTelemetry, boolean messagingReceiveInstrumentationEnabled) {
60+
OpenTelemetry openTelemetry,
61+
boolean messagingReceiveInstrumentationEnabled,
62+
List<String> capturedHeaders) {
5663
InstrumenterBuilder<NatsRequest, Void> builder =
5764
Instrumenter.<NatsRequest, Void>builder(
5865
openTelemetry,
5966
INSTRUMENTATION_NAME,
6067
MessagingSpanNameExtractor.create(
6168
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.PROCESS))
6269
.addAttributesExtractor(
63-
MessagingAttributesExtractor.create(
64-
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.PROCESS));
70+
MessagingAttributesExtractor.builder(
71+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE,
72+
MessageOperation.PROCESS)
73+
.setCapturedHeaders(capturedHeaders)
74+
.build());
6575

6676
if (messagingReceiveInstrumentationEnabled) {
6777
builder.addSpanLinksExtractor(
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55

66
package io.opentelemetry.instrumentation.nats.v2_21;
77

8+
import static java.util.Collections.singletonList;
9+
810
import io.nats.client.Connection;
911
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1012
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
1113
import org.junit.jupiter.api.extension.RegisterExtension;
1214

13-
class NatsInstrumentationMessaingReceiveTest
14-
extends AbstractNatsInstrumentationMessagingReceiveTest {
15+
class NatsInstrumentationExperimentalTest extends AbstractNatsInstrumentationExperimentalTest {
1516

1617
@RegisterExtension
1718
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@@ -23,6 +24,7 @@ class NatsInstrumentationMessaingReceiveTest
2324
telemetry =
2425
NatsTelemetry.builder(testing.getOpenTelemetry())
2526
.setMessagingReceiveInstrumentationEnabled(true)
27+
.setCapturedHeaders(singletonList("captured-header"))
2628
.build();
2729
natsConnection = telemetry.wrap(new TestConnection());
2830
}
Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,26 @@
66
package io.opentelemetry.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.instrumentation.nats.v2_21.NatsInstrumentationTestHelper.messagingAttributes;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
10+
import static java.util.Collections.singletonList;
911
import static org.assertj.core.api.Assertions.assertThat;
1012

1113
import io.nats.client.Connection;
1214
import io.nats.client.Dispatcher;
15+
import io.nats.client.Message;
1316
import io.nats.client.impl.Headers;
1417
import io.nats.client.impl.NatsMessage;
18+
import io.opentelemetry.api.common.AttributeKey;
1519
import io.opentelemetry.api.trace.Span;
1620
import io.opentelemetry.api.trace.SpanKind;
1721
import io.opentelemetry.context.Context;
1822
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
23+
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
1924
import org.junit.jupiter.api.BeforeEach;
2025
import org.junit.jupiter.api.Test;
2126

2227
@SuppressWarnings("deprecation") // using deprecated semconv
23-
public abstract class AbstractNatsInstrumentationMessagingReceiveTest {
28+
public abstract class AbstractNatsInstrumentationExperimentalTest {
2429

2530
protected abstract InstrumentationExtension testing();
2631

@@ -34,42 +39,51 @@ void beforeEach() {
3439
}
3540

3641
@Test
37-
void testSubscribe() {
42+
void testMessagingReceiveAndCapturedHeaders() {
3843
// given
3944
Dispatcher dispatcher = connection().createDispatcher(msg -> {}).subscribe("sub");
4045

4146
// when
47+
Headers headers = new Headers();
48+
headers.put("captured-header", "value");
4249
String traceId =
4350
testing()
4451
.runWithSpan(
4552
"parent",
4653
() -> {
47-
connection()
48-
.publish(
49-
NatsMessage.builder()
50-
.subject("sub")
51-
.headers(new Headers())
52-
.data("x")
53-
.build());
54+
Message message =
55+
NatsMessage.builder().subject("sub").headers(headers).data("x").build();
56+
connection().publish(message);
5457
return Span.fromContext(Context.current()).getSpanContext().getTraceId();
5558
});
5659
connection().closeDispatcher(dispatcher);
5760

5861
// then
62+
AttributeAssertion[] headerAssert =
63+
new AttributeAssertion[] {
64+
equalTo(
65+
AttributeKey.stringArrayKey("messaging.header.captured_header"),
66+
singletonList("value"))
67+
};
68+
5969
testing()
6070
.waitAndAssertTraces(
6171
trace ->
6272
trace.hasSpansSatisfyingExactly(
6373
span -> span.hasName("parent").hasNoParent(),
64-
span -> span.hasName("sub publish").hasParent(trace.getSpan(0))),
74+
span ->
75+
span.hasName("sub publish")
76+
.hasParent(trace.getSpan(0))
77+
.hasAttributesSatisfyingExactly(
78+
messagingAttributes("publish", "sub", clientId, headerAssert))),
6579
trace ->
6680
trace.hasSpansSatisfyingExactly(
6781
span ->
6882
span.hasName("sub receive")
6983
.hasKind(SpanKind.CONSUMER)
7084
.hasNoParent()
7185
.hasAttributesSatisfyingExactly(
72-
messagingAttributes("receive", "sub", clientId)),
86+
messagingAttributes("receive", "sub", clientId, headerAssert)),
7387
span ->
7488
span.hasName("sub process")
7589
.hasKind(SpanKind.CONSUMER)

0 commit comments

Comments
 (0)