Skip to content

Commit 4f52cf2

Browse files
committed
WebSockets Next: use SPI to separate WS from OTel and MP extensions
1 parent c9c75cf commit 4f52cf2

File tree

25 files changed

+629
-337
lines changed

25 files changed

+629
-337
lines changed

bom/application/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2359,6 +2359,11 @@
23592359
<artifactId>quarkus-websockets-next-kotlin</artifactId>
23602360
<version>${project.version}</version>
23612361
</dependency>
2362+
<dependency>
2363+
<groupId>io.quarkus</groupId>
2364+
<artifactId>quarkus-websockets-next-spi</artifactId>
2365+
<version>${project.version}</version>
2366+
</dependency>
23622367
<dependency>
23632368
<groupId>io.quarkus</groupId>
23642369
<artifactId>quarkus-undertow-spi</artifactId>
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.quarkus.micrometer.deployment.binder;
2+
3+
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
4+
import io.quarkus.deployment.Capabilities;
5+
import io.quarkus.deployment.Capability;
6+
import io.quarkus.deployment.annotations.BuildProducer;
7+
import io.quarkus.deployment.annotations.BuildStep;
8+
import io.quarkus.deployment.annotations.BuildSteps;
9+
import io.quarkus.micrometer.deployment.MicrometerProcessor;
10+
import io.quarkus.micrometer.runtime.binder.websockets.WebSocketMetricsInterceptorProducerImpl;
11+
12+
/**
13+
* Add support for WebSockets Next instrumentation.
14+
*/
15+
@BuildSteps(onlyIf = MicrometerProcessor.MicrometerEnabled.class)
16+
public class WebSocketsBinderProcessor {
17+
18+
@BuildStep
19+
void registerWebSocketMetricsInterceptor(BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer,
20+
Capabilities capabilities) {
21+
if (capabilities.isPresent(Capability.WEBSOCKETS_NEXT)) {
22+
additionalBeanProducer
23+
.produce(AdditionalBeanBuildItem.unremovableOf(WebSocketMetricsInterceptorProducerImpl.class));
24+
}
25+
}
26+
27+
}
Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
package io.quarkus.websockets.next.runtime.telemetry;
1+
package io.quarkus.micrometer.runtime.binder.websockets;
22

3-
public final class TelemetryConstants {
3+
public final class WebSocketMetricConstants {
44

5-
private TelemetryConstants() {
5+
private WebSocketMetricConstants() {
66
// class with constants
77
}
88

9-
/**
10-
* OpenTelemetry attributes added to spans created for opened and closed connections.
11-
*/
12-
public static final String CONNECTION_ID_ATTR_KEY = "connection.id";
13-
public static final String CONNECTION_ENDPOINT_ATTR_KEY = "connection.endpoint.id";
14-
public static final String CONNECTION_CLIENT_ATTR_KEY = "connection.client.id";
15-
169
/**
1710
* Counts all the WebSockets client opened connections.
1811
*/
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.quarkus.micrometer.runtime.binder.websockets;
2+
3+
import static io.quarkus.micrometer.runtime.binder.websockets.WebSocketMetricConstants.DIRECTION_TAG_KEY;
4+
import static io.quarkus.micrometer.runtime.binder.websockets.WebSocketMetricConstants.Direction.INBOUND;
5+
import static io.quarkus.micrometer.runtime.binder.websockets.WebSocketMetricConstants.Direction.OUTBOUND;
6+
7+
import jakarta.enterprise.context.Dependent;
8+
9+
import io.micrometer.core.instrument.Counter;
10+
import io.micrometer.core.instrument.Meter;
11+
import io.micrometer.core.instrument.MeterRegistry;
12+
import io.quarkus.websockets.next.runtime.spi.telemetry.WebSocketMetricsInterceptorProducer;
13+
14+
@Dependent
15+
public final class WebSocketMetricsInterceptorProducerImpl implements WebSocketMetricsInterceptorProducer {
16+
17+
private static final String URI_TAG_KEY = "uri";
18+
private final MeterRegistry meterRegistry;
19+
20+
WebSocketMetricsInterceptorProducerImpl(MeterRegistry meterRegistry) {
21+
this.meterRegistry = meterRegistry;
22+
}
23+
24+
@Override
25+
public WebSocketMetricsInterceptor createServerMetricsInterceptor() {
26+
final Meter.MeterProvider<Counter> messagesCounter = Counter
27+
.builder(WebSocketMetricConstants.SERVER_COUNT)
28+
.description("Number of messages sent and received by server endpoints.")
29+
.withRegistry(meterRegistry);
30+
final Meter.MeterProvider<Counter> bytesCounter = Counter
31+
.builder(WebSocketMetricConstants.SERVER_BYTES)
32+
.description("Number of bytes sent and received by server endpoints.")
33+
.withRegistry(meterRegistry);
34+
final Meter.MeterProvider<Counter> closedConnectionCounter = Counter
35+
.builder(WebSocketMetricConstants.SERVER_CONNECTION_CLOSED)
36+
.description("Number of closed server WebSocket connections.")
37+
.withRegistry(meterRegistry);
38+
final Meter.MeterProvider<Counter> serverErrorsCounter = Counter
39+
.builder(WebSocketMetricConstants.SERVER_ENDPOINT_COUNT_ERRORS)
40+
.description("Counts all the WebSockets server endpoint errors.")
41+
.withRegistry(meterRegistry);
42+
final Meter.MeterProvider<Counter> connectionOpenCounter = Counter
43+
.builder(WebSocketMetricConstants.SERVER_CONNECTION_OPENED)
44+
.description("Number of opened server connections.")
45+
.withRegistry(meterRegistry);
46+
final Meter.MeterProvider<Counter> connectionOpeningFailedCounter = Counter
47+
.builder(WebSocketMetricConstants.SERVER_CONNECTION_ON_OPEN_ERROR)
48+
.description("Number of failures occurred when opening server connection failed.")
49+
.withRegistry(meterRegistry);
50+
return new WebSocketMetricsInterceptorImpl(messagesCounter, bytesCounter, closedConnectionCounter, serverErrorsCounter,
51+
connectionOpenCounter, connectionOpeningFailedCounter);
52+
}
53+
54+
@Override
55+
public WebSocketMetricsInterceptor createClientMetricsInterceptor() {
56+
final Meter.MeterProvider<Counter> messagesCounter = Counter
57+
.builder(WebSocketMetricConstants.CLIENT_COUNT)
58+
.description("Number of messages sent and received by client endpoints.")
59+
.withRegistry(meterRegistry);
60+
final Meter.MeterProvider<Counter> bytesCounter = Counter
61+
.builder(WebSocketMetricConstants.CLIENT_BYTES)
62+
.description("Number of bytes sent and received by client endpoints.")
63+
.withRegistry(meterRegistry);
64+
final Meter.MeterProvider<Counter> closedConnectionCounter = Counter
65+
.builder(WebSocketMetricConstants.CLIENT_CONNECTION_CLOSED)
66+
.description("Number of closed client WebSocket connections.")
67+
.withRegistry(meterRegistry);
68+
final Meter.MeterProvider<Counter> clientErrorsCounter = Counter
69+
.builder(WebSocketMetricConstants.CLIENT_ENDPOINT_COUNT_ERRORS)
70+
.description("Counts all the WebSockets client endpoint errors.")
71+
.withRegistry(meterRegistry);
72+
final Meter.MeterProvider<Counter> connectionOpenCounter = Counter
73+
.builder(WebSocketMetricConstants.CLIENT_CONNECTION_OPENED)
74+
.description("Number of opened client connections.")
75+
.withRegistry(meterRegistry);
76+
final Meter.MeterProvider<Counter> connectionOpeningFailedCounter = Counter
77+
.builder(WebSocketMetricConstants.CLIENT_CONNECTION_OPENED_ERROR)
78+
.description("Number of failures occurred when opening client connection failed.")
79+
.withRegistry(meterRegistry);
80+
return new WebSocketMetricsInterceptorImpl(messagesCounter, bytesCounter, closedConnectionCounter, clientErrorsCounter,
81+
connectionOpenCounter, connectionOpeningFailedCounter);
82+
}
83+
84+
private static final class WebSocketMetricsInterceptorImpl implements WebSocketMetricsInterceptor {
85+
86+
private final Meter.MeterProvider<Counter> messagesCounter;
87+
private final Meter.MeterProvider<Counter> bytesCounter;
88+
private final Meter.MeterProvider<Counter> closedConnectionCounter;
89+
private final Meter.MeterProvider<Counter> errorsCounter;
90+
private final Meter.MeterProvider<Counter> connectionOpenCounter;
91+
private final Meter.MeterProvider<Counter> connectionOpeningFailedCounter;
92+
93+
private WebSocketMetricsInterceptorImpl(Meter.MeterProvider<Counter> messagesCounter,
94+
Meter.MeterProvider<Counter> bytesCounter, Meter.MeterProvider<Counter> closedConnectionCounter,
95+
Meter.MeterProvider<Counter> errorsCounter, Meter.MeterProvider<Counter> connectionOpenCounter,
96+
Meter.MeterProvider<Counter> connectionOpeningFailedCounter) {
97+
this.messagesCounter = messagesCounter;
98+
this.bytesCounter = bytesCounter;
99+
this.closedConnectionCounter = closedConnectionCounter;
100+
this.errorsCounter = errorsCounter;
101+
this.connectionOpenCounter = connectionOpenCounter;
102+
this.connectionOpeningFailedCounter = connectionOpeningFailedCounter;
103+
}
104+
105+
@Override
106+
public void onError(String path) {
107+
errorsCounter.withTag(URI_TAG_KEY, path).increment();
108+
}
109+
110+
@Override
111+
public void onMessageSent(byte[] data, String path) {
112+
messagesCounter.withTags(URI_TAG_KEY, path, DIRECTION_TAG_KEY, OUTBOUND.toString()).increment();
113+
bytesCounter.withTags(URI_TAG_KEY, path, DIRECTION_TAG_KEY, OUTBOUND.toString()).increment(data.length);
114+
}
115+
116+
@Override
117+
public void onMessageReceived(byte[] data, String path) {
118+
messagesCounter.withTags(URI_TAG_KEY, path, DIRECTION_TAG_KEY, INBOUND.toString()).increment();
119+
bytesCounter.withTags(URI_TAG_KEY, path, DIRECTION_TAG_KEY, INBOUND.toString()).increment(data.length);
120+
}
121+
122+
@Override
123+
public void onConnectionOpened(String path) {
124+
connectionOpenCounter.withTag(URI_TAG_KEY, path).increment();
125+
}
126+
127+
@Override
128+
public void onConnectionOpeningFailed(String path) {
129+
connectionOpeningFailedCounter.withTag(URI_TAG_KEY, path).increment();
130+
}
131+
132+
@Override
133+
public void onConnectionClosed(String path) {
134+
closedConnectionCounter.withTag(URI_TAG_KEY, path).increment();
135+
}
136+
}
137+
}

extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/TracerProcessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig.SecurityEvents.SecurityEventType;
5050
import io.quarkus.opentelemetry.runtime.tracing.TracerRecorder;
5151
import io.quarkus.opentelemetry.runtime.tracing.cdi.TracerProducer;
52+
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.websockets.WebSocketTracesInterceptorImpl;
5253
import io.quarkus.opentelemetry.runtime.tracing.security.EndUserSpanProcessor;
5354
import io.quarkus.opentelemetry.runtime.tracing.security.SecurityEventUtil;
5455
import io.quarkus.vertx.http.deployment.spi.FrameworkEndpointsBuildItem;
@@ -220,6 +221,14 @@ void registerEndUserAttributesEventObserver(Capabilities capabilities,
220221
}
221222
}
222223

224+
@BuildStep
225+
void registerWebSocketTracesInterceptor(BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer,
226+
Capabilities capabilities) {
227+
if (capabilities.isPresent(Capability.WEBSOCKETS_NEXT)) {
228+
additionalBeanProducer.produce(AdditionalBeanBuildItem.unremovableOf(WebSocketTracesInterceptorImpl.class));
229+
}
230+
}
231+
223232
private static ObserverConfiguratorBuildItem createEventObserver(
224233
ObserverRegistrationPhaseBuildItem observerRegistrationPhase, SecurityEventType eventType, String utilMethodName) {
225234
return new ObserverConfiguratorBuildItem(observerRegistrationPhase.getContext()

extensions/opentelemetry/runtime/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
<artifactId>nativeimage</artifactId>
5353
<scope>provided</scope>
5454
</dependency>
55+
<dependency>
56+
<groupId>io.quarkus</groupId>
57+
<artifactId>quarkus-websockets-next-spi</artifactId>
58+
</dependency>
5559

5660
<!-- Optional Dependencies -->
5761
<dependency>
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.websockets;
2+
3+
import java.util.Map;
4+
5+
import jakarta.enterprise.context.Dependent;
6+
7+
import io.opentelemetry.api.trace.Span;
8+
import io.opentelemetry.api.trace.SpanContext;
9+
import io.opentelemetry.api.trace.SpanKind;
10+
import io.opentelemetry.api.trace.Tracer;
11+
import io.opentelemetry.semconv.UrlAttributes;
12+
import io.quarkus.websockets.next.runtime.spi.telemetry.EndpointKind;
13+
import io.quarkus.websockets.next.runtime.spi.telemetry.WebSocketEndpointContext;
14+
import io.quarkus.websockets.next.runtime.spi.telemetry.WebSocketTracesInterceptor;
15+
16+
@Dependent
17+
public final class WebSocketTracesInterceptorImpl implements WebSocketTracesInterceptor {
18+
19+
/**
20+
* OpenTelemetry attributes added to spans created for opened and closed connections.
21+
*/
22+
public static final String CONNECTION_ID_ATTR_KEY = "connection.id";
23+
public static final String CONNECTION_ENDPOINT_ATTR_KEY = "connection.endpoint.id";
24+
public static final String CONNECTION_CLIENT_ATTR_KEY = "connection.client.id";
25+
private static final String CONNECTION_OPENED_SPAN_CTX = "io.quarkus.opentelemetry.ws.connection-opened-span-ctx";
26+
27+
private final Tracer tracer;
28+
29+
WebSocketTracesInterceptorImpl(Tracer tracer) {
30+
this.tracer = tracer;
31+
}
32+
33+
@Override
34+
public Map<String, Object> onConnectionOpened(String path, EndpointKind endpointKind) {
35+
var span = tracer.spanBuilder("OPEN " + path)
36+
.setSpanKind(getSpanKind(endpointKind))
37+
.addLink(previousSpanContext())
38+
.setAttribute(UrlAttributes.URL_PATH, path)
39+
.startSpan();
40+
try (var ignored = span.makeCurrent()) {
41+
return Map.of(CONNECTION_OPENED_SPAN_CTX, span.getSpanContext());
42+
} finally {
43+
span.end();
44+
}
45+
}
46+
47+
@Override
48+
public void onConnectionOpeningFailed(Throwable cause, String path, EndpointKind endpointKind,
49+
Map<String, Object> connectionOpenedContext) {
50+
tracer.spanBuilder("OPEN " + path)
51+
.setSpanKind(getSpanKind(endpointKind))
52+
.addLink(getOnOpenSpanContext(connectionOpenedContext))
53+
.setAttribute(UrlAttributes.URL_PATH, path)
54+
.startSpan()
55+
.recordException(cause)
56+
.end();
57+
}
58+
59+
@Override
60+
public void onConnectionClosed(WebSocketEndpointContext ctx) {
61+
tracer.spanBuilder("CLOSE " + ctx.path())
62+
.setSpanKind(getSpanKind(ctx))
63+
.addLink(getOnOpenSpanContext(ctx))
64+
.setAttribute(CONNECTION_ID_ATTR_KEY, ctx.connectionId())
65+
.setAttribute(UrlAttributes.URL_PATH, ctx.path())
66+
.setAttribute(getTargetIdKey(ctx), ctx.targetId())
67+
.startSpan()
68+
.end();
69+
}
70+
71+
@Override
72+
public void onConnectionClosingFailed(Throwable throwable, WebSocketEndpointContext ctx) {
73+
tracer.spanBuilder("CLOSE " + ctx.path())
74+
.setSpanKind(getSpanKind(ctx))
75+
.addLink(getOnOpenSpanContext(ctx))
76+
.setAttribute(CONNECTION_ID_ATTR_KEY, ctx.connectionId())
77+
.setAttribute(UrlAttributes.URL_PATH, ctx.path())
78+
.setAttribute(getTargetIdKey(ctx), ctx.targetId())
79+
.startSpan()
80+
.recordException(throwable)
81+
.end();
82+
}
83+
84+
private static SpanContext getOnOpenSpanContext(WebSocketEndpointContext ctx) {
85+
return getOnOpenSpanContext(ctx.connectionContextStorage());
86+
}
87+
88+
private static SpanContext getOnOpenSpanContext(Map<String, Object> connectionContextStorage) {
89+
if (connectionContextStorage == null) {
90+
return null;
91+
}
92+
return (SpanContext) connectionContextStorage.get(CONNECTION_OPENED_SPAN_CTX);
93+
}
94+
95+
private static String getTargetIdKey(WebSocketEndpointContext ctx) {
96+
return getTargetIdKey(ctx.endpointKind());
97+
}
98+
99+
private static String getTargetIdKey(EndpointKind endpointKind) {
100+
return switch (endpointKind) {
101+
case CLIENT -> CONNECTION_CLIENT_ATTR_KEY;
102+
case SERVER -> CONNECTION_ENDPOINT_ATTR_KEY;
103+
};
104+
}
105+
106+
private static SpanKind getSpanKind(WebSocketEndpointContext context) {
107+
return getSpanKind(context.endpointKind());
108+
}
109+
110+
private static SpanKind getSpanKind(EndpointKind endpointKind) {
111+
return switch (endpointKind) {
112+
case CLIENT -> SpanKind.CLIENT;
113+
case SERVER -> SpanKind.SERVER;
114+
};
115+
}
116+
117+
private static SpanContext previousSpanContext() {
118+
var span = Span.current();
119+
if (span.getSpanContext().isValid()) {
120+
return span.getSpanContext();
121+
}
122+
return null;
123+
}
124+
}

0 commit comments

Comments
 (0)