Skip to content

Commit d39a7d1

Browse files
committed
Add OpenTelemetry metrics publisher for AWS SDK instrumentation
1 parent 7e3d81f commit d39a7d1

File tree

9 files changed

+569
-2
lines changed

9 files changed

+569
-2
lines changed

instrumentation/aws-sdk/aws-sdk-2.2/library/README.md

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,20 @@ To instrument all AWS SDK clients include the `opentelemetry-aws-sdk-2.2-autocon
99
To register instrumentation only on a specific SDK client, register the interceptor when creating it.
1010

1111
```java
12-
AwsSdkTelemetry telemetry = AwsSdkTelemetry.create(openTelemetry).build();
12+
// For tracing
13+
AwsSdkTelemetry telemetry = AwsSdkTelemetry.create(openTelemetry);
1314
DynamoDbClient client = DynamoDbClient.builder()
1415
.overrideConfiguration(ClientOverrideConfiguration.builder()
15-
.addExecutionInterceptor(telemetry.newExecutionInterceptor()))
16+
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
17+
.build())
18+
.build();
19+
20+
// For metrics (can be used independently of tracing)
21+
MetricPublisher metricPublisher = new OpenTelemetryMetricPublisher(openTelemetry);
22+
23+
DynamoDbClient client = DynamoDbClient.builder()
24+
.overrideConfiguration(ClientOverrideConfiguration.builder()
25+
.addMetricPublisher(metricPublisher)
1626
.build())
1727
.build();
1828
```
@@ -29,6 +39,40 @@ SqsAsyncClientBuilder sqsAsyncClientBuilder = SqsAsyncClient.builder();
2939
SqsAsyncClient sqsAsyncClient = telemetry.wrap(sqsAsyncClientBuilder.build());
3040
```
3141

42+
## Metrics
43+
44+
The AWS SDK instrumentation can publish AWS SDK metrics to OpenTelemetry. This includes metrics like:
45+
- API call latencies
46+
- Retry counts
47+
- Throttling events
48+
- And other AWS SDK metrics
49+
50+
To enable metrics, instantiate `OpenTelemetryMetricPublisher` (or your own decorator around it)
51+
and register it with the client:
52+
53+
```java
54+
// Minimal setup – uses the common ForkJoinPool and the default metric prefix "aws.sdk"
55+
MetricPublisher metricPublisher = new OpenTelemetryMetricPublisher(openTelemetry);
56+
57+
// Optional customization
58+
Executor executor = Executors.newFixedThreadPool(4);
59+
String metricPrefix = "mycompany.aws.sdk"; // will be used as Meter instrumentation scope
60+
Attributes staticAttributes = Attributes.builder().put("env", "prod").build();
61+
MetricPublisher metricPublisher = new OpenTelemetryMetricPublisher(openTelemetry, metricPrefix, executor, staticAttributes);
62+
63+
DynamoDbClient client = DynamoDbClient.builder()
64+
.overrideConfiguration(ClientOverrideConfiguration.builder()
65+
.addMetricPublisher(metricPublisher)
66+
.build())
67+
.build();
68+
```
69+
70+
The publisher emits the full hierarchy of AWS-SDK metrics (per-request, per-attempt, and HTTP)
71+
to the OpenTelemetry SDK. Instrument names are prefixed with your `metricPrefix` (default
72+
`aws.sdk.`) and attributes include service, operation name, retry count, success flag,
73+
HTTP status code and error type when relevant. Attribute objects are cached internally
74+
to minimize GC overhead.
75+
3276
## Trace propagation
3377

3478
The AWS SDK instrumentation always injects the trace header into the request

instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies {
1010
library("software.amazon.awssdk:lambda:2.2.0")
1111
library("software.amazon.awssdk:sns:2.2.0")
1212
library("software.amazon.awssdk:aws-json-protocol:2.2.0")
13+
library("software.amazon.awssdk:metrics-spi:2.2.0")
1314
// json-utils was added in 2.17.0
1415
compileOnly("software.amazon.awssdk:json-utils:2.17.0")
1516
compileOnly(project(":muzzle")) // For @NoMuzzle
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.common.Attributes;
4+
import io.opentelemetry.api.metrics.DoubleHistogram;
5+
import io.opentelemetry.api.metrics.Meter;
6+
import software.amazon.awssdk.metrics.MetricRecord;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
9+
10+
public class DoubleHistogramStrategy implements MetricStrategy {
11+
private static final Logger logger = Logger.getLogger(DoubleHistogramStrategy.class.getName());
12+
private final DoubleHistogram histogram;
13+
14+
public DoubleHistogramStrategy(Meter meter, String metricName, String description) {
15+
this.histogram = meter.histogramBuilder(metricName)
16+
.setDescription(description)
17+
.build();
18+
}
19+
20+
@Override
21+
public void record(MetricRecord<?> metricRecord, Attributes attributes) {
22+
if (metricRecord.value() instanceof Double) {
23+
Double value = (Double) metricRecord.value();
24+
histogram.record(value, attributes);
25+
} else {
26+
logger.log(
27+
Level.WARNING,"Invalid value type for a DoubleHistogram metric: {}", metricRecord.metric().name());
28+
}
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.common.Attributes;
4+
import io.opentelemetry.api.metrics.LongHistogram;
5+
import io.opentelemetry.api.metrics.Meter;
6+
import software.amazon.awssdk.metrics.MetricRecord;
7+
8+
import java.time.Duration;
9+
import java.util.logging.Level;
10+
import java.util.logging.Logger;
11+
12+
public class DurationStrategy implements MetricStrategy {
13+
private static final Logger logger = Logger.getLogger(DurationStrategy.class.getName());
14+
private final LongHistogram histogram;
15+
16+
public DurationStrategy(Meter meter, String metricName, String description) {
17+
this.histogram = meter.histogramBuilder(metricName)
18+
.setDescription(description)
19+
.setUnit("ns")
20+
.ofLongs()
21+
.build();
22+
}
23+
24+
@Override
25+
public void record(MetricRecord<?> metricRecord, Attributes attributes) {
26+
if (metricRecord.value() instanceof Duration) {
27+
Duration duration = (Duration) metricRecord.value();
28+
histogram.record(duration.toNanos(), attributes);
29+
} else {
30+
logger.log(Level.WARNING, "Invalid value type for duration metric: {}", metricRecord.metric().name());
31+
}
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.common.Attributes;
4+
import io.opentelemetry.api.metrics.LongHistogram;
5+
import io.opentelemetry.api.metrics.Meter;
6+
import software.amazon.awssdk.metrics.MetricRecord;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
9+
10+
public class LongHistogramStrategy implements MetricStrategy {
11+
private static final Logger logger = Logger.getLogger(LongHistogramStrategy.class.getName());
12+
private final LongHistogram histogram;
13+
14+
public LongHistogramStrategy(Meter meter, String metricName, String description) {
15+
this.histogram = meter.histogramBuilder(metricName)
16+
.setDescription(description)
17+
.ofLongs()
18+
.build();
19+
}
20+
21+
@Override
22+
public void record(MetricRecord<?> metricRecord, Attributes attributes) {
23+
if (metricRecord.value() instanceof Number) {
24+
Number value = (Number) metricRecord.value();
25+
histogram.record(value.longValue(), attributes);
26+
} else {
27+
logger.log(Level.WARNING, "Invalid value type for a LongHistogram metric: {}", metricRecord.metric().name());
28+
}
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.metrics.Meter;
4+
import java.util.function.BiFunction;
5+
import software.amazon.awssdk.core.metrics.CoreMetric;
6+
import software.amazon.awssdk.http.HttpMetric;
7+
8+
/**
9+
* Catalogue of AWS-SDK metric definitions that this instrumentation recognizes.
10+
* <p>
11+
* Each enum constant knows: (1) the SDK metric identifier, (2) the scope in the
12+
* request/attempt/http hierarchy, and (3) how to build the {@link MetricStrategy}
13+
* that records the metric.
14+
*/
15+
public enum MetricSpec {
16+
// per-request metrics
17+
API_CALL_DURATION(
18+
CoreMetric.API_CALL_DURATION.name(),
19+
Scope.REQUEST,
20+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "api_call_duration", "The total time taken to finish a request (inclusive of all retries)")
21+
),
22+
CREDENTIALS_FETCH_DURATION(
23+
CoreMetric.CREDENTIALS_FETCH_DURATION.name(),
24+
Scope.REQUEST,
25+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "credentials_fetch_duration", "Time taken to fetch AWS signing credentials for the request")
26+
),
27+
ENDPOINT_RESOLVE_DURATION(
28+
CoreMetric.ENDPOINT_RESOLVE_DURATION.name(),
29+
Scope.REQUEST,
30+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "endpoint_resolve_duration", "Time it took to resolve the endpoint used for the API call")
31+
),
32+
MARSHALLING_DURATION(
33+
CoreMetric.MARSHALLING_DURATION.name(),
34+
Scope.REQUEST,
35+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "marshalling_duration", "Time it takes to marshall an SDK request to an HTTP request")
36+
),
37+
TOKEN_FETCH_DURATION(
38+
CoreMetric.TOKEN_FETCH_DURATION.name(),
39+
Scope.REQUEST,
40+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "token_fetch_duration", "Time taken to fetch token signing credentials for the request")
41+
),
42+
43+
// per-attempt metrics
44+
BACKOFF_DELAY_DURATION(
45+
CoreMetric.BACKOFF_DELAY_DURATION.name(),
46+
Scope.ATTEMPT,
47+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "backoff_delay_duration", "Duration of time the SDK waited before this API call attempt")
48+
),
49+
READ_THROUGHPUT(
50+
CoreMetric.READ_THROUGHPUT.name(),
51+
Scope.ATTEMPT,
52+
(meter, metricPrefix) -> new DoubleHistogramStrategy(meter, metricPrefix + "read_throughput", "Read throughput of the client in bytes/second")
53+
),
54+
SERVICE_CALL_DURATION(
55+
CoreMetric.SERVICE_CALL_DURATION.name(),
56+
Scope.ATTEMPT,
57+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "service_call_duration", "Time to connect, send the request and receive the HTTP status code and header")
58+
),
59+
SIGNING_DURATION(
60+
CoreMetric.SIGNING_DURATION.name(),
61+
Scope.ATTEMPT,
62+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "signing_duration", "Time it takes to sign the HTTP request")
63+
),
64+
TIME_TO_FIRST_BYTE(
65+
CoreMetric.TIME_TO_FIRST_BYTE.name(),
66+
Scope.ATTEMPT,
67+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "time_to_first_byte", "Elapsed time from sending the HTTP request to receiving the first byte of the headers")
68+
),
69+
TIME_TO_LAST_BYTE(
70+
CoreMetric.TIME_TO_LAST_BYTE.name(),
71+
Scope.ATTEMPT,
72+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "time_to_last_byte", "Elapsed time from sending the HTTP request to receiving the last byte of the response")
73+
),
74+
UNMARSHALLING_DURATION(
75+
CoreMetric.UNMARSHALLING_DURATION.name(),
76+
Scope.ATTEMPT,
77+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "unmarshalling_duration", "Time it takes to unmarshall an HTTP response to an SDK response")
78+
),
79+
80+
// HTTP metrics
81+
AVAILABLE_CONCURRENCY(
82+
HttpMetric.AVAILABLE_CONCURRENCY.name(),
83+
Scope.HTTP,
84+
(meter, metricPrefix) -> new LongHistogramStrategy(meter, metricPrefix + "available_concurrency", "Remaining concurrent requests that can be supported without a new connection")
85+
),
86+
CONCURRENCY_ACQUIRE_DURATION(
87+
HttpMetric.CONCURRENCY_ACQUIRE_DURATION.name(),
88+
Scope.HTTP,
89+
(meter, metricPrefix) -> new DurationStrategy(meter, metricPrefix + "concurrency_acquire_duration", "Time taken to acquire a channel from the connection pool")
90+
),
91+
LEASED_CONCURRENCY(
92+
HttpMetric.LEASED_CONCURRENCY.name(),
93+
Scope.HTTP,
94+
(meter, metricPrefix) -> new LongHistogramStrategy(meter, metricPrefix + "leased_concurrency", "Number of requests currently being executed by the HTTP client")
95+
),
96+
MAX_CONCURRENCY(
97+
HttpMetric.MAX_CONCURRENCY.name(),
98+
Scope.HTTP,
99+
(meter, metricPrefix) -> new LongHistogramStrategy(meter, metricPrefix + "max_concurrency", "Maximum number of concurrent requests supported by the HTTP client")
100+
),
101+
PENDING_CONCURRENCY_ACQUIRES(
102+
HttpMetric.PENDING_CONCURRENCY_ACQUIRES.name(),
103+
Scope.HTTP,
104+
(meter, metricPrefix) -> new LongHistogramStrategy(meter, metricPrefix + "pending_concurrency_acquires", "Number of requests waiting for a connection or stream to be available")
105+
);
106+
107+
private final String sdkMetricName;
108+
private final Scope scope;
109+
private final BiFunction<Meter, String, MetricStrategy> strategyFactory;
110+
111+
MetricSpec(String sdkMetricName, Scope scope, BiFunction<Meter, String, MetricStrategy> strategyFactory) {
112+
this.sdkMetricName = sdkMetricName;
113+
this.scope = scope;
114+
this.strategyFactory = strategyFactory;
115+
}
116+
117+
public String getSdkMetricName() {
118+
return sdkMetricName;
119+
}
120+
121+
public Scope getScope() {
122+
return scope;
123+
}
124+
125+
/** Create a {@link MetricStrategy} for this metric. */
126+
public MetricStrategy create(Meter meter, String metricPrefix) {
127+
return strategyFactory.apply(meter, metricPrefix);
128+
}
129+
130+
/** Denotes where in the AWS-SDK metric hierarchy the metric lives. */
131+
public enum Scope { REQUEST, ATTEMPT, HTTP }
132+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.common.Attributes;
4+
import software.amazon.awssdk.metrics.MetricRecord;
5+
6+
@FunctionalInterface
7+
public interface MetricStrategy {
8+
void record(MetricRecord<?> metricRecord, Attributes attributes);
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
2+
3+
import io.opentelemetry.api.common.Attributes;
4+
import software.amazon.awssdk.metrics.MetricRecord;
5+
import java.util.logging.Level;
6+
import java.util.logging.Logger;
7+
8+
/**
9+
* A {@link MetricStrategy} that delegates to another {@link MetricStrategy} and catches any exceptions that occur
10+
* during the delegation. If an exception occurs, it logs a warning and continues.
11+
*/
12+
public class MetricStrategyWithoutErrors implements MetricStrategy {
13+
private static final Logger logger = Logger.getLogger(MetricStrategyWithoutErrors.class.getName());
14+
15+
private final MetricStrategy delegate;
16+
17+
public MetricStrategyWithoutErrors(MetricStrategy delegate) {
18+
this.delegate = delegate;
19+
}
20+
21+
@Override
22+
public void record(MetricRecord<?> metricRecord, Attributes attributes) {
23+
if (metricRecord == null) {
24+
log.warn("Received null metric record");
25+
return;
26+
}
27+
28+
try {
29+
delegate.record(metricRecord, attributes);
30+
} catch (Exception e) {
31+
String metricName = metricRecord.metric() == null ? "null" : metricRecord.metric().name();
32+
logger.log(Level.WARNING, e, () -> String.format("Failed to record metric: %s", metricName));
33+
}
34+
}
35+
}

0 commit comments

Comments
 (0)