Skip to content

Commit 956912d

Browse files
authored
OTLP HTTP trace exporter (#3418)
* Draft of OTLP HTTP trace exporter * Cleanup some TODOs, shade okhttp dependency * Get all tests working * Drop support for otlp/http json format * Split OtlpHttpSpanExporter to its own module. * drop @NotNull annotations * Switch to armeria mock server, respond to PR feedback. * Remove :exporter:otlp-http:trace shadowJar and other PR feedback. * Extract constant for application/x-protobuf media type
1 parent a35b286 commit 956912d

File tree

9 files changed

+686
-1
lines changed

9 files changed

+686
-1
lines changed

dependencyManagement/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ val DEPENDENCY_BOMS = listOf(
1919
"com.google.protobuf:protobuf-bom:3.17.2",
2020
"com.fasterxml.jackson:jackson-bom:2.12.3",
2121
"org.junit:junit-bom:5.7.2",
22-
"io.zipkin.reporter2:zipkin-reporter-bom:2.16.3"
22+
"io.zipkin.reporter2:zipkin-reporter-bom:2.16.3",
23+
"com.squareup.okhttp3:okhttp-bom:4.9.0"
2324
)
2425

2526
val DEPENDENCY_SETS = listOf(
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
subprojects {
2+
val proj = this
3+
plugins.withId("java") {
4+
configure<BasePluginConvention> {
5+
archivesBaseName = "opentelemetry-exporter-otlp-http-${proj.name}"
6+
}
7+
}
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# OpenTelemetry - OTLP Trace Exporter - HTTP
2+
3+
[![Javadocs][javadoc-image]][javadoc-url]
4+
5+
This is the OpenTelemetry exporter, sending span data to OpenTelemetry collector via HTTP without gRPC.
6+
7+
[javadoc-image]: https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporters-otlp.svg
8+
[javadoc-url]: https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporters-otlp
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
// TODO: uncomment once ready to publish
4+
// id("otel.publish-conventions")
5+
6+
id("otel.animalsniffer-conventions")
7+
}
8+
9+
description = "OpenTelemetry Protocol HTTP Trace Exporter"
10+
otelJava.moduleName.set("io.opentelemetry.exporter.otlp.http.trace")
11+
12+
dependencies {
13+
api(project(":sdk:trace"))
14+
15+
implementation(project(":exporters:otlp:common"))
16+
17+
implementation("com.squareup.okhttp3:okhttp")
18+
implementation("com.squareup.okhttp3:okhttp-tls")
19+
implementation("com.squareup.okio:okio")
20+
21+
testImplementation(project(":sdk:testing"))
22+
23+
testImplementation("com.linecorp.armeria:armeria-junit5")
24+
}
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.exporter.otlp.http.trace;
7+
8+
import com.google.rpc.Code;
9+
import com.google.rpc.Status;
10+
import io.opentelemetry.api.metrics.BoundLongCounter;
11+
import io.opentelemetry.api.metrics.GlobalMeterProvider;
12+
import io.opentelemetry.api.metrics.LongCounter;
13+
import io.opentelemetry.api.metrics.Meter;
14+
import io.opentelemetry.api.metrics.common.Labels;
15+
import io.opentelemetry.exporter.otlp.internal.SpanAdapter;
16+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
17+
import io.opentelemetry.sdk.common.CompletableResultCode;
18+
import io.opentelemetry.sdk.internal.ThrottlingLogger;
19+
import io.opentelemetry.sdk.trace.data.SpanData;
20+
import io.opentelemetry.sdk.trace.export.SpanExporter;
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
import java.util.logging.Level;
24+
import java.util.logging.Logger;
25+
import javax.annotation.concurrent.ThreadSafe;
26+
import okhttp3.Call;
27+
import okhttp3.Callback;
28+
import okhttp3.Headers;
29+
import okhttp3.MediaType;
30+
import okhttp3.OkHttpClient;
31+
import okhttp3.Request;
32+
import okhttp3.RequestBody;
33+
import okhttp3.Response;
34+
import okhttp3.ResponseBody;
35+
import okio.BufferedSink;
36+
import okio.GzipSink;
37+
import okio.Okio;
38+
39+
/** Exports spans using OTLP via HTTP, using OpenTelemetry's protobuf model. */
40+
@ThreadSafe
41+
public final class OtlpHttpSpanExporter implements SpanExporter {
42+
43+
private static final String EXPORTER_NAME = OtlpHttpSpanExporter.class.getSimpleName();
44+
private static final Labels EXPORTER_NAME_LABELS = Labels.of("exporter", EXPORTER_NAME);
45+
private static final Labels EXPORT_SUCCESS_LABELS =
46+
Labels.of("exporter", EXPORTER_NAME, "success", "true");
47+
private static final Labels EXPORT_FAILURE_LABELS =
48+
Labels.of("exporter", EXPORTER_NAME, "success", "false");
49+
50+
private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse("application/x-protobuf");
51+
52+
private final ThrottlingLogger logger =
53+
new ThrottlingLogger(Logger.getLogger(OtlpHttpSpanExporter.class.getName()));
54+
55+
private final BoundLongCounter spansSeen;
56+
private final BoundLongCounter spansExportedSuccess;
57+
private final BoundLongCounter spansExportedFailure;
58+
59+
private final OkHttpClient client;
60+
private final String endpoint;
61+
private final Headers headers;
62+
private final boolean isCompressionEnabled;
63+
64+
OtlpHttpSpanExporter(
65+
OkHttpClient client, String endpoint, Headers headers, boolean isCompressionEnabled) {
66+
Meter meter = GlobalMeterProvider.getMeter("io.opentelemetry.exporters.otlp-http");
67+
this.spansSeen =
68+
meter.longCounterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS);
69+
LongCounter spansExportedCounter = meter.longCounterBuilder("spansExportedByExporter").build();
70+
this.spansExportedSuccess = spansExportedCounter.bind(EXPORT_SUCCESS_LABELS);
71+
this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_LABELS);
72+
73+
this.client = client;
74+
this.endpoint = endpoint;
75+
this.headers = headers;
76+
this.isCompressionEnabled = isCompressionEnabled;
77+
}
78+
79+
/**
80+
* Submits all the given spans in a single batch to the OpenTelemetry collector.
81+
*
82+
* @param spans the list of sampled Spans to be exported.
83+
* @return the result of the operation
84+
*/
85+
@Override
86+
public CompletableResultCode export(Collection<SpanData> spans) {
87+
spansSeen.add(spans.size());
88+
ExportTraceServiceRequest exportTraceServiceRequest =
89+
ExportTraceServiceRequest.newBuilder()
90+
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(spans))
91+
.build();
92+
93+
Request.Builder requestBuilder = new Request.Builder().url(endpoint);
94+
if (headers != null) {
95+
requestBuilder.headers(headers);
96+
}
97+
RequestBody requestBody =
98+
RequestBody.create(exportTraceServiceRequest.toByteArray(), PROTOBUF_MEDIA_TYPE);
99+
if (isCompressionEnabled) {
100+
requestBuilder.addHeader("Content-Encoding", "gzip");
101+
requestBuilder.post(gzipRequestBody(requestBody));
102+
} else {
103+
requestBuilder.post(requestBody);
104+
}
105+
106+
CompletableResultCode result = new CompletableResultCode();
107+
108+
client
109+
.newCall(requestBuilder.build())
110+
.enqueue(
111+
new Callback() {
112+
@Override
113+
public void onFailure(Call call, IOException e) {
114+
spansExportedFailure.add(spans.size());
115+
result.fail();
116+
logger.log(
117+
Level.SEVERE,
118+
"Failed to export spans. The request could not be executed. Full error message: "
119+
+ e.getMessage());
120+
}
121+
122+
@Override
123+
public void onResponse(Call call, Response response) {
124+
if (response.isSuccessful()) {
125+
spansExportedSuccess.add(spans.size());
126+
result.succeed();
127+
return;
128+
}
129+
130+
spansExportedFailure.add(spans.size());
131+
int code = response.code();
132+
133+
Status status = extractErrorStatus(response);
134+
135+
logger.log(
136+
Level.WARNING,
137+
"Failed to export spans. Server responded with HTTP status code "
138+
+ code
139+
+ ". Error message: "
140+
+ status.getMessage());
141+
result.fail();
142+
}
143+
});
144+
145+
return result;
146+
}
147+
148+
private static RequestBody gzipRequestBody(RequestBody requestBody) {
149+
return new RequestBody() {
150+
@Override
151+
public MediaType contentType() {
152+
return requestBody.contentType();
153+
}
154+
155+
@Override
156+
public long contentLength() {
157+
return -1;
158+
}
159+
160+
@Override
161+
public void writeTo(BufferedSink bufferedSink) throws IOException {
162+
BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink));
163+
requestBody.writeTo(gzipSink);
164+
gzipSink.close();
165+
}
166+
};
167+
}
168+
169+
private static Status extractErrorStatus(Response response) {
170+
ResponseBody responseBody = response.body();
171+
if (responseBody == null) {
172+
return Status.newBuilder()
173+
.setMessage("Response body missing, HTTP status message: " + response.message())
174+
.setCode(Code.UNKNOWN.getNumber())
175+
.build();
176+
}
177+
try {
178+
return Status.parseFrom(responseBody.bytes());
179+
} catch (IOException e) {
180+
return Status.newBuilder()
181+
.setMessage("Unable to parse response body, HTTP status message: " + response.message())
182+
.setCode(Code.UNKNOWN.getNumber())
183+
.build();
184+
}
185+
}
186+
187+
/**
188+
* The OTLP exporter does not batch spans, so this method will immediately return with success.
189+
*
190+
* @return always Success
191+
*/
192+
@Override
193+
public CompletableResultCode flush() {
194+
return CompletableResultCode.ofSuccess();
195+
}
196+
197+
/**
198+
* Returns a new builder instance for this exporter.
199+
*
200+
* @return a new builder instance for this exporter.
201+
*/
202+
public static OtlpHttpSpanExporterBuilder builder() {
203+
return new OtlpHttpSpanExporterBuilder();
204+
}
205+
206+
/**
207+
* Returns a new {@link OtlpHttpSpanExporter} using the default values.
208+
*
209+
* @return a new {@link OtlpHttpSpanExporter} instance.
210+
*/
211+
public static OtlpHttpSpanExporter getDefault() {
212+
return builder().build();
213+
}
214+
215+
/** Shutdown the exporter. */
216+
@Override
217+
public CompletableResultCode shutdown() {
218+
final CompletableResultCode result = CompletableResultCode.ofSuccess();
219+
client.dispatcher().cancelAll();
220+
this.spansSeen.unbind();
221+
this.spansExportedSuccess.unbind();
222+
this.spansExportedFailure.unbind();
223+
return result;
224+
}
225+
}

0 commit comments

Comments
 (0)