Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/telemetry-micrometer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ Refer to the xref:./management-interface-reference.adoc[management interface ref
** Camel Messaging
* https://quarkus.io/guides/stork-reference[`quarkus-smallrye-stork`]
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]

== Configuration Reference

Expand Down
9 changes: 9 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,15 @@ quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

When the Micrometer extension is present, Quarkus can collect metrics for messages, errors and bytes transferred.
If you require a WebSocket metrics, you can enable the metrics like in the example below:

[source, properties]
----
quarkus.websockets-next.server.metrics.enabled=true
quarkus.websockets-next.client.metrics.enabled=true
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.test.utils.WSClient.ReceiverMode.BINARY;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;

import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketConnectOptions;

record Connection(URI uri, String[] messagesToSend, boolean binaryMode,
String[] expectedResponses) {

static Connection of(URI uri, boolean binaryMode, String[] sentMessages, String[] expectedResponses) {
return new Connection(uri, sentMessages, binaryMode, expectedResponses);
}

static Connection of(URI uri, String expectedResponse, boolean binaryMode, String... messages) {
return new Connection(uri, messages, binaryMode, new String[] { expectedResponse });
}

void openConnectionThenSend(Vertx vertx) {
final WSClient client = binaryMode() ? new WSClient(vertx, BINARY) : new WSClient(vertx);
try (client) {
client.connect(new WebSocketConnectOptions(), uri());
for (String message : messagesToSend()) {
if (binaryMode()) {
client.sendAndAwait(Buffer.buffer(message));
} else {
client.sendAndAwait(message);
}
}
var expectedResponses = expectedResponses();
if (expectedResponses.length != 0) {
client.waitForMessages(expectedResponses.length);
Set<String> actualResponses = client.getMessages().stream().map(Buffer::toString).collect(Collectors.toSet());

for (String expectedResponse : expectedResponses) {
assertTrue(actualResponses.contains(expectedResponse),
() -> "Expected response '%s' not found, was: %s".formatted(expectedResponse, actualResponses));
}

client.getMessages().clear();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.websockets.next.test.telemetry;

import java.util.Arrays;

public interface ExpectedServerEndpointResponse {

String[] NO_RESPONSE = new String[] {};
EchoExpectedServerEndpointResponse ECHO_RESPONSE = new EchoExpectedServerEndpointResponse();
DoubleEchoExpectedServerEndpointResponse DOUBLE_ECHO_RESPONSE = new DoubleEchoExpectedServerEndpointResponse();

/**
* Received message is prefixed with 'echo 0: ' and returned.
*/
final class EchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages).map(msg -> "echo 0: " + msg).toArray(String[]::new);
}

}

/**
* For each received message 'msg' endpoint returns 'echo 0: msg' and 'echo 1: msg'
*/
final class DoubleEchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages)
.mapMulti((msg, consumer) -> {
consumer.accept("echo 0: " + msg);
consumer.accept("echo 1: " + msg);
})
.toArray(String[]::new);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_BYTES;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_CONNECTION_CLOSED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_CONNECTION_OPENED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_COUNT;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_ENDPOINT_COUNT_ERRORS;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_BYTES;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_CLOSED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_OPENED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_OPENED_ERROR;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_COUNT;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_ENDPOINT_COUNT_ERRORS;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.Direction.INBOUND;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.Direction.OUTBOUND;
import static io.quarkus.websockets.next.test.telemetry.AbstractWebSocketsOnMessageTest.getMetrics;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

import org.awaitility.Awaitility;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;

import io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.Direction;

public final class MetricsAsserter {

int serverSentCount;
int serverReceivedCount;
int serverReceivedCountBytes;
int serverSentCountBytes;
int clientReceivedCount;
int clientSentCount;
int clientSentCountBytes;
int clientReceivedCountBytes;
int clientErrorCount;
int serverErrorCount;
int clientConnectionOpenedCount;
int serverConnectionOpenedCount;

void assertTotalMetricsForAllPaths(int serverErrorsDelta, int clientErrorsDelta, int serverReceivedCountDelta,
int serverReceivedCountBytesDelta, int serverSentCountBytesDelta, int clientSentCountDelta,
int clientSentCountBytesDelta, int clientReceivedCountBytesDelta, int serverSentCountDelta,
int clientReceivedCountDelta) {
serverReceivedCount += serverReceivedCountDelta;
serverReceivedCountBytes += serverReceivedCountBytesDelta;
serverSentCount += serverSentCountDelta;
serverSentCountBytes += serverSentCountBytesDelta;
clientSentCount += clientSentCountDelta;
clientSentCountBytes += clientSentCountBytesDelta;
clientReceivedCount += clientReceivedCountDelta;
clientReceivedCountBytes += clientReceivedCountBytesDelta;
clientErrorCount += clientErrorsDelta;
serverErrorCount += serverErrorsDelta;

Awaitility.await().atMost(Duration.ofSeconds(12)).untilAsserted(() -> getMetrics()
.body(assertServerConnectionOpenedTotal(serverConnectionOpenedCount))
.body(assertClientConnectionOpenedTotal(clientConnectionOpenedCount))
.body(assertServerErrorTotal(serverErrorCount))
.body(assertClientErrorTotal(clientErrorCount))
.body(assertClientMessagesCountReceived(clientReceivedCount))
.body(assertClientMessagesCountBytesSent(clientSentCountBytes))
.body(assertClientMessagesCountBytesReceived(clientReceivedCountBytes))
.body(assertClientMessagesCountSent(clientSentCount))
.body(assertServerMessagesCountBytesReceived(serverReceivedCountBytes))
.body(assertServerMessagesCountBytesSent(serverSentCountBytes))
.body(assertServerMessagesCountReceived(serverReceivedCount))
.body(assertServerMessagesCountSent(serverSentCount)));
}

static Matcher<String> assertClientMessagesCountBytesSent(String path, int clientSentCountBytes) {
return assertTotal(CLIENT_BYTES, clientSentCountBytes, path, OUTBOUND);
}

static Matcher<String> assertClientMessagesCountBytesReceived(String path, int clientReceivedCountBytes) {
return assertTotal(CLIENT_BYTES, clientReceivedCountBytes, path, INBOUND);
}

static Matcher<String> assertClientMessagesCountSent(String path, int clientSentCount) {
return assertTotal(CLIENT_COUNT, clientSentCount, path, OUTBOUND);
}

static Matcher<String> assertClientMessagesCountReceived(int clientSentCount) {
return assertTotal(CLIENT_COUNT, clientSentCount, null, INBOUND);
}

static Matcher<String> assertClientMessagesCountReceived(String path, int clientSentCount) {
return assertTotal(CLIENT_COUNT, clientSentCount, path, INBOUND);
}

static Matcher<String> assertServerMessagesCountSent(int serverReceivedCount) {
return assertServerMessagesCountSent(null, serverReceivedCount);
}

static Matcher<String> assertServerMessagesCountSent(String path, int serverReceivedCount) {
return assertTotal(SERVER_COUNT, serverReceivedCount, path, OUTBOUND);
}

static Matcher<String> assertServerMessagesCountReceived(String path, int serverReceivedCount) {
return assertTotal(SERVER_COUNT, serverReceivedCount, path, INBOUND);
}

static Matcher<String> assertServerMessagesCountBytesSent(String path, int serverSentCountBytes) {
return assertTotal(SERVER_BYTES, serverSentCountBytes, path, OUTBOUND);
}

static Matcher<String> assertServerMessagesCountBytesReceived(String path, int serverReceivedCountBytes) {
return assertTotal(SERVER_BYTES, serverReceivedCountBytes, path, INBOUND);
}

static Matcher<String> assertServerErrorTotal(String path, int serverErrorCount) {
return assertTotal(SERVER_ENDPOINT_COUNT_ERRORS, serverErrorCount, path, null);
}

static Matcher<String> assertClientErrorTotal(String path, int clientErrorCount) {
return assertTotal(CLIENT_ENDPOINT_COUNT_ERRORS, clientErrorCount, path, null);
}

static Matcher<String> assertServerConnectionOpeningFailedTotal(String path, int serverConnectionOpeningFailedCount) {
return assertTotal(SERVER_CONNECTION_OPENED_ERROR, serverConnectionOpeningFailedCount, path, null);
}

static Matcher<String> assertServerConnectionOpenedTotal(int serverConnectionOpenedCount) {
return assertServerConnectionOpenedTotal(null, serverConnectionOpenedCount);
}

static Matcher<String> assertClientConnectionOpenedTotal(int clientConnectionOpenedCount) {
return assertClientConnectionOpenedTotal(null, clientConnectionOpenedCount);
}

static Matcher<String> assertClientMessagesCountBytesSent(int clientSentCountBytes) {
return assertClientMessagesCountBytesSent(null, clientSentCountBytes);
}

static Matcher<String> assertClientMessagesCountBytesReceived(int clientReceivedCountBytes) {
return assertClientMessagesCountBytesReceived(null, clientReceivedCountBytes);
}

static Matcher<String> assertClientMessagesCountSent(int clientSentCount) {
return assertClientMessagesCountSent(null, clientSentCount);
}

static Matcher<String> assertServerMessagesCountReceived(int serverReceivedCount) {
return assertServerMessagesCountReceived(null, serverReceivedCount);
}

static Matcher<String> assertServerMessagesCountBytesSent(int serverSentCountBytes) {
return assertServerMessagesCountBytesSent(null, serverSentCountBytes);
}

static Matcher<String> assertServerMessagesCountBytesReceived(int serverReceivedCountBytes) {
return assertServerMessagesCountBytesReceived(null, serverReceivedCountBytes);
}

static Matcher<String> assertServerErrorTotal(int serverErrorCount) {
return assertServerErrorTotal(null, serverErrorCount);
}

static Matcher<String> assertClientErrorTotal(int clientErrorCount) {
return assertClientErrorTotal(null, clientErrorCount);
}

static Matcher<String> assertServerConnectionOpenedTotal(String path, int serverConnectionOpenedCount) {
return assertTotal(SERVER_CONNECTION_OPENED, serverConnectionOpenedCount, path, null);
}

static Matcher<String> assertClientConnectionOpenedTotal(String path, int clientConnectionOpenedCount) {
return assertTotal(CLIENT_CONNECTION_OPENED, clientConnectionOpenedCount, path, null);
}

static Matcher<String> assertServerConnectionClosedTotal(String path, int serverConnectionClosedCount) {
return assertTotal(SERVER_CONNECTION_CLOSED, serverConnectionClosedCount, path, null);
}

static Matcher<String> assertClientConnectionClosedTotal(String path, int clientConnectionClosedCount) {
return assertTotal(CLIENT_CONNECTION_CLOSED, clientConnectionClosedCount, path, null);
}

private static Matcher<String> assertTotal(String metricKey, int expectedCount, String path, Direction direction) {
var prometheusFormatKey = "%s_total".formatted(toPrometheusFormat(metricKey));
return new BaseMatcher<>() {
@Override
public boolean matches(Object o) {
if (o instanceof String str) {
var sameKeyMultipleTags = str
.lines()
.filter(l -> l.contains(prometheusFormatKey))
.filter(l -> path == null || l.contains(path)) // filter by path
.filter(l -> direction == null || l.contains(direction.toString()))
.map(String::trim)
.toList();
// quarkus_websockets_server_messages_count_received_total{<<some path tag>>} 2.0
// quarkus_websockets_server_messages_count_received_total{<<different path tag>>} 5.0
// = 7
var totalSum = sameKeyMultipleTags
.stream()
.map(l -> l.substring(l.lastIndexOf(" ")).trim())
.map(Double::parseDouble)
.map(Double::intValue)
.reduce(0, Integer::sum);
return totalSum == expectedCount;
}
return false;
}

@Override
public void describeTo(Description description) {
description.appendText(
"Key '%s' with value '%d' and direction '%s'".formatted(prometheusFormatKey, expectedCount, direction));
}
};
}

private static String toPrometheusFormat(String dottedMicrometerFormat) {
return dottedMicrometerFormat.replace(".", "_").replace("-", "_");
}

static int stringToBytes(String... messages) {
return Arrays.stream(messages).map(msg -> msg.getBytes(StandardCharsets.UTF_8)).map(s -> s.length).reduce(0,
Integer::sum);
}
}
Loading
Loading