Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
:quarkus-version: 3.21.3
:quarkus-version: 3.22.3
:project-version: 2.5.0
:maven-version: 3.8.1+

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<jandex-plugin.version>3.3.0</jandex-plugin.version>
<quarkus.version>3.21.3</quarkus.version>
<quarkus.version>3.22.3</quarkus.version>
<assertj.version>3.27.3</assertj.version>
<mutiny-zero.version>1.1.1</mutiny-zero.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ void handle(RoutingContext event) {
}

private void addProcessor(ConfigType streamConfig) {
StrictQueueSizeGuard guard = new StrictQueueSizeGuard(streamConfig.bufferSize);
// bufferSize is “how many to keep in the buffer” *beyond* the one actively in-flight
StrictQueueSizeGuard guard = new StrictQueueSizeGuard(streamConfig.bufferSize + 1);
Bundle<MessageType> bundle = new Bundle<>(guard);

Multi<MessageType> processor = Multi.createFrom()
// emitter with an unbounded queue, we control the size ourselves, with the guard
.<MessageType> emitter(bundle::setEmitter, BackPressureStrategy.BUFFER)
.onItem().invoke(guard::dequeue);
.<MessageType> emitter(bundle::setEmitter, BackPressureStrategy.BUFFER);
bundle.setProcessor(processor);
bundle.setPath(streamConfig.path);
bundle.setDeserializerName(streamConfig.deserializerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,26 @@ protected void handleRequest(RoutingContext event, MultiEmitter<? super HttpMess
if (emitter == null) {
onUnexpectedError(event, null,
"No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
emitter.emit(new HttpMessage<>(
deserializerFactory.getDeserializer(deserializerName)
.map(d -> d.deserialize(event.body().buffer()))
.orElse(event.body().buffer()),
new IncomingHttpMetadata(event),
() -> {
if (!event.response().ended()) {
event.response().setStatusCode(202).end();
}
},
error -> onUnexpectedError(event, error, "Failed to process message.")));
} catch (Exception any) {
guard.dequeue();
onUnexpectedError(event, any, "Emitting message failed");
}
return;
}

Object payload = deserializerFactory.getDeserializer(deserializerName)
.map(d -> d.deserialize(event.body().buffer()))
.orElse(event.body().buffer());
HttpMessage<?> msg = new HttpMessage<>(payload,
new IncomingHttpMetadata(event),
() -> {
if (!event.response().ended()) {
guard.dequeue();
event.response().setStatusCode(202).end();
}
},
error -> {
guard.dequeue();
onUnexpectedError(event, error, "Failed to process message.");
});
if (guard.prepareToEmit()) {
emitter.emit(msg);
} else {
event.response().setStatusCode(503).end();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocke
deserializerFactory.getDeserializer(deserializerName)
.map(d -> d.deserialize(b)).orElse(b),
new RequestMetadata(event),
() -> serverWebSocket.write(Buffer.buffer("ACK")),
error -> onUnexpectedError(serverWebSocket, error,
"Failed to process incoming web socket message.")));
() -> {
guard.dequeue();
serverWebSocket.write(Buffer.buffer("ACK"));
},
error -> {
guard.dequeue();
onUnexpectedError(serverWebSocket, error,
"Failed to process incoming web socket message.");
}));
} catch (Exception error) {
guard.dequeue();
onUnexpectedError(serverWebSocket, error, "Emitting message failed");
Expand Down