Skip to content

Commit 4f815b9

Browse files
authored
Merge pull request #46174 from ozangunalp/messaging_context_propagation_take2
Context propagation for Messaging (take 2)
2 parents ecdc140 + 4452e6f commit 4f815b9

File tree

30 files changed

+2074
-29
lines changed

30 files changed

+2074
-29
lines changed

docs/src/main/asciidoc/context-propagation.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ If you also need to customize your instance, you can do so using `@ManagedExecut
317317
ThreadContext sameContext;
318318
----
319319

320+
[[context-propagation-for-cdi]]
320321
== Context Propagation for CDI
321322

322323
In terms of CDI, `@RequestScoped`, `@ApplicationScoped` and `@Singleton` beans get propagated and are available in other threads.

docs/src/main/asciidoc/messaging.adoc

Lines changed: 192 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ For more control, using link:{mutiny}[Mutiny] APIs, you can use the `MutinyEmitt
179179
[source, java]
180180
----
181181
import io.smallrye.mutiny.Multi;
182+
import io.smallrye.reactive.messaging.MutinyEmitter;
183+
182184
import org.eclipse.microprofile.reactive.messaging.Channel;
183-
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;
184185
185186
@ApplicationScoped
186187
@Path("/")
@@ -316,6 +317,7 @@ Some messaging technologies allow consumers to subscribe to a set of topics or q
316317
If you are sure you need to configure and create clients dynamically at runtime, you should consider using the low-level clients directly.
317318
====
318319

320+
[[internal-channels]]
319321
==== Internal Channels
320322

321323
In some use cases, it is convenient to use messaging patterns to transfer messages inside the same application.
@@ -562,6 +564,7 @@ public class StreamProcessor {
562564
}
563565
----
564566

567+
[[execution_model]]
565568
== Execution Model
566569

567570
Quarkus Messaging sits on top of the xref:quarkus-reactive-architecture.adoc#engine[reactive engine] of Quarkus and leverages link:{eclipse-vertx}[Eclipse Vert.x] to dispatch messages for processing.
@@ -634,6 +637,163 @@ Depending on the broker technology, this can be useful to increase the applicati
634637
while still preserving the partial order of messages received in different copies.
635638
This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.
636639

640+
== Context Propagation
641+
642+
In Quarkus Messaging, the default mechanism for propagating context between different processing stages is the
643+
link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context].
644+
This provides a consistent way to pass context information along with the message as it flows through different stages.
645+
646+
When integrating with other extensions, notably using Emitters, it relies on the Mutiny context propagation:
647+
648+
=== Interaction with Mutiny and MicroProfile Context Propagation
649+
650+
Mutiny, which is the foundation of reactive programming in Quarkus, is integrated with the MicroProfile Context Propagation.
651+
This integration enables automatic capturing and restoring of context across asynchronous boundaries.
652+
To learn more about context propagation in Quarkus and Mutiny, refer to the xref:context-propagation.adoc[Context Propagation] guide.
653+
654+
To ensure consistent behavior, Quarkus Messaging disables the propagation of any context during message dispatching through inbound or outbound connectors.
655+
This means that context captured through Emitters won't be propagated to the outgoing channel, and incoming channels won't dispatch messages by activating a context (e.g. the request context).
656+
This behaviour can be configured using `quarkus.messaging.connector-context-propagation` configuration property, by listing the context types to propagate.
657+
For example `quarkus.messaging.connector-context-propagation=CDI` will only propagate the CDI context.
658+
659+
<<internal-channels>> however do propagate the context, as they are part of the same application and the context is not lost.
660+
661+
For example, you might want to propagate the caller context from an incoming HTTP request to the message processing stage.
662+
For emitters, it is recommended to use the `MutinyEmitter`, as it exposes methods such as `sendAndAwait` that makes sure to wait until a message processing is terminated.
663+
664+
[WARNING]
665+
====
666+
The execution context to which the RequestScoped context is bound, in the previous example the REST call, controls the lifecycle of the context.
667+
This means that when the REST call is completed the RequestScoped context is destroyed.
668+
Therefore, you need to make sure that your processing or message dispatch is completed before the REST call completes.
669+
670+
For more information check the xref:context-propagation.adoc#context-propagation-for-cdi[Context Propagation] guide.
671+
====
672+
673+
For example, let `RequestScopedBean` a request-scoped bean, `MutinyEmitter` can be used to dispatch messages locally through the internal channel `app`:
674+
675+
[source, java]
676+
----
677+
import jakarta.inject.Inject;
678+
import jakarta.ws.rs.Consumes;
679+
import jakarta.ws.rs.POST;
680+
import jakarta.ws.rs.Path;
681+
import jakarta.ws.rs.core.MediaType;
682+
683+
import org.eclipse.microprofile.reactive.messaging.Channel;
684+
import io.smallrye.reactive.messaging.MutinyEmitter;
685+
686+
import io.quarkus.logging.Log;
687+
688+
import io.smallrye.mutiny.Uni;
689+
import io.vertx.core.Context;
690+
import io.vertx.core.Vertx;
691+
692+
@Path("/")
693+
public class Resource {
694+
695+
@Channel("app")
696+
MutinyEmitter<String> emitter;
697+
698+
@Inject
699+
RequestScopedBean requestScopedBean;
700+
701+
@POST
702+
@Path("/send")
703+
public void send(String message) {
704+
requestScopedBean.setValue("Hello");
705+
emitter.sendAndAwait(message);
706+
}
707+
708+
}
709+
----
710+
711+
Then the request-scoped bean can be accessed in the message processing stage, regardless of the <<execution_model>>:
712+
713+
[source, java]
714+
----
715+
import jakarta.enterprise.context.ApplicationScoped;
716+
import jakarta.inject.Inject;
717+
718+
import org.eclipse.microprofile.reactive.messaging.Incoming;
719+
720+
import io.quarkus.logging.Log;
721+
import io.smallrye.reactive.messaging.annotations.Blocking;
722+
723+
724+
@ApplicationScoped
725+
public class Processor {
726+
727+
@Inject
728+
RequestScopedBean requestScopedBean;
729+
730+
@Incoming("app")
731+
@Blocking
732+
public void process(String message) {
733+
Log.infof("Message %s from request %s", message, requestScopedBean.getValue());
734+
}
735+
736+
}
737+
----
738+
739+
[TIP]
740+
====
741+
You can use the context propagation annotation `@CurrentThreadContext` to configure the contexts that will be propagated from an _emitter_ method.
742+
The annotation configures the contexts that will be captured and propagated from that method,
743+
and needs to be present on the propagator method, i.e. the caller of the emitter, not the processing method.
744+
745+
Because Quarkus Messaging dispatches messages on link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context],
746+
propagation plans with cleared or not propagated contexts can lead to race conditions using emitters in <<internal-channels,internal channels>>.
747+
It is recommended to use `ContextualEmitter` to ensure the context propagation plan is applied correctly.
748+
749+
The following example shows how to avoid propagating any context to the message processing stage:
750+
751+
[source, java]
752+
----
753+
import jakarta.inject.Inject;
754+
import jakarta.ws.rs.Consumes;
755+
import jakarta.ws.rs.POST;
756+
import jakarta.ws.rs.Path;
757+
import jakarta.ws.rs.core.MediaType;
758+
759+
import org.eclipse.microprofile.reactive.messaging.Channel;
760+
761+
import io.quarkus.logging.Log;
762+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;
763+
764+
import io.smallrye.mutiny.Uni;
765+
import io.vertx.core.Context;
766+
import io.vertx.core.Vertx;
767+
768+
@Path("/")
769+
public class Resource {
770+
771+
@Channel("app")
772+
ContextualEmitter<String> emitter;
773+
774+
@Inject
775+
RequestScopedBean requestScopedBean;
776+
777+
@POST
778+
@Path("/send")
779+
@CurrentThreadContext(propagated = {})
780+
public void send(String message) {
781+
requestScopedBean.setValue("Hello");
782+
emitter.sendAndAwait(message);
783+
}
784+
785+
}
786+
----
787+
788+
====
789+
790+
=== Request Context Activation
791+
792+
In some cases, you might need to activate the request context while processing messages consumed from a broker.
793+
While using `@ActivateRequestContext` on the `@Incoming` method is an option, its lifecycle does not follow that of a Quarkus Messaging message.
794+
For incoming channels, you can enable the request scope activation with the build time property `quarkus.messaging.request-scoped.enabled=true`.
795+
This will activate the request context for each message processed by incoming channels, and close the context once the message is processed.
796+
637797
== Health Checks
638798

639799
Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel.
@@ -868,9 +1028,39 @@ The `quarkus-test-vertx` dependency provides the `@io.quarkus.test.vertx.RunOnVe
8681028
8691029
If your tests are dependent on context propagation, you can configure the in-memory connector channels with `run-on-vertx-context` attribute to dispatch events, including messages and acknowledgements, on a Vert.x context.
8701030
Alternatively you can switch this behaviour using the `InMemorySource#runOnVertxContext` method.
871-
8721031
====
8731032

1033+
=== Channel Decorators
1034+
1035+
https://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[Channel decorators] is a way to intercept and decorate the reactive streams corresponding to messaging channels.
1036+
This can be useful for adding custom behavior to the channels, such as logging, metrics, or error handling.
1037+
1038+
It is therefore possible to implement a bean implementing `PublisherDecorator` for incoming channels, and `SubscriberDecorator` for outgoing channels.
1039+
Since two APIs are symmetric, you can implement both interfaces in the same bean.
1040+
These beans are automatically discovered by Quarkus and applied by priority (from the least value to the greatest).
1041+
1042+
Some decorators are included by default by Quarkus extensions.
1043+
1044+
Incoming channels (PublisherDecorator) in the order of priority:
1045+
1046+
- `io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator` (-100): Clears the context propagation for incoming channels
1047+
- `io.smallrye.reactive.messaging.providers.locals.ContextDecorator` (0): Ensures messages are dispatched on the message context
1048+
- `io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator` (100): Handles pausable channels
1049+
- `io.smallrye.reactive.messaging.providers.IncomingInterceptorDecorator` (500): Handles `IncomingInterceptor` beans
1050+
- `io.smallrye.reactive.messaging.providers.metrics.MetricDecorator` (1000): MicroProfile Metrics support, enabled with `quarkus-smallrye-metrics` extension
1051+
- `io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator` (1000): Micrometer Metrics support, enabled with `quarkus-micrometer` extension
1052+
- `io.smallrye.reactive.messaging.providers.extension.ObservationDecorator` (1000): Message observation support for incoming channels
1053+
- `io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator` (1000): Handles pausable channels
1054+
- `io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingIncomingDecorator` (1000): Included with `quarkus-opentelemetry` extension, propagates tracing information
1055+
1056+
Outgoing channels (SubscriberDecorator):
1057+
1058+
- `io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator` (-100): Clears the context propagation for outgoing channels
1059+
- `io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator` (1000): Message observation support for outgoing channels
1060+
- `io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator` (1000): Handles pausable channels
1061+
- `io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator` (1000): Included with `quarkus-opentelemetry` extension, propagates tracing information
1062+
- `io.smallrye.reactive.messaging.providers.OutgoingInterceptorDecorator` (2000): Handles `OutgoingInterceptor` beans
1063+
8741064
== Going further
8751065

8761066
This guide shows the general principles of Quarkus Messaging extensions.

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ final class DotNames {
1515

1616
static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
1717
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
18+
static final DotName CONTEXTUAL_EMITTER = DotName.createSimple(io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter.class.getName());
1819
static final DotName KAFKA_TRANSACTIONS_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
1920
static final DotName KAFKA_REQUEST_REPLY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply.class.getName());
2021

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
538538
return null;
539539
}
540540

541-
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)
541+
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isContextualEmitter(injectionPointType)
542542
|| isKafkaTransactionsEmitter(injectionPointType)) {
543543
return injectionPointType.asParameterizedType().arguments().get(0);
544544
} else {
@@ -695,6 +695,13 @@ private static boolean isMutinyEmitter(Type type) {
695695
&& type.asParameterizedType().arguments().size() == 1;
696696
}
697697

698+
private static boolean isContextualEmitter(Type type) {
699+
// raw type ContextualEmitter is wrong, must be ContextualEmitter<Something>
700+
return DotNames.CONTEXTUAL_EMITTER.equals(type.name())
701+
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
702+
&& type.asParameterizedType().arguments().size() == 1;
703+
}
704+
698705
private static boolean isKafkaTransactionsEmitter(Type type) {
699706
// raw type KafkaTransactions is wrong, must be KafkaTransactions<Something>
700707
return DotNames.KAFKA_TRANSACTIONS_EMITTER.equals(type.name())

extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ final class DotNames {
1515

1616
static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
1717
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
18+
static final DotName CONTEXTUAL_EMITTER = DotName.createSimple(io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter.class.getName());
1819
static final DotName PULSAR_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions.class.getName());
1920

2021
static final DotName TARGETED = DotName.createSimple(io.smallrye.reactive.messaging.Targeted.class.getName());

extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
300300
return null;
301301
}
302302

303-
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isPulsarEmitter(injectionPointType)) {
303+
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)
304+
|| isContextualEmitter(injectionPointType) || isPulsarEmitter(injectionPointType)) {
304305
return injectionPointType.asParameterizedType().arguments().get(0);
305306
} else {
306307
return null;
@@ -467,6 +468,13 @@ private static boolean isMutinyEmitter(Type type) {
467468
&& type.asParameterizedType().arguments().size() == 1;
468469
}
469470

471+
private static boolean isContextualEmitter(Type type) {
472+
// raw type MutinyEmitter is wrong, must be MutinyEmitter<Something>
473+
return DotNames.CONTEXTUAL_EMITTER.equals(type.name())
474+
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
475+
&& type.asParameterizedType().arguments().size() == 1;
476+
}
477+
470478
private static boolean isPulsarEmitter(Type type) {
471479
// raw type PulsarTransactions is wrong, must be PulsarTransactions<Something>
472480
return DotNames.PULSAR_EMITTER.equals(type.name())

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,11 @@ public interface ReactiveMessagingBuildTimeConfig {
2727
@WithName("auto-connector-attachment")
2828
@WithDefault("true")
2929
boolean autoConnectorAttachment();
30+
31+
/**
32+
* Whether to enable the RequestScope context on a message context
33+
*/
34+
@WithName("request-scoped.enabled")
35+
@WithDefault("false")
36+
boolean activateRequestScopeEnabled();
3037
}

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,16 @@
7070
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
7171
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
7272
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
73+
import io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator;
74+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitterFactory;
7375
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactory;
7476
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor;
7577
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterFilter;
7678
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterInterceptor;
7779
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
7880
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
7981
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
82+
import io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator;
8083
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
8184
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
8285
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
@@ -112,11 +115,14 @@ FeatureBuildItem feature() {
112115
}
113116

114117
@BuildStep
115-
AdditionalBeanBuildItem beans() {
118+
void beans(BuildProducer<AdditionalBeanBuildItem> additionalBean, ReactiveMessagingBuildTimeConfig buildTimeConfig) {
116119
// We add the connector and channel qualifiers to make them part of the index.
117-
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
120+
additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
118121
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
119-
QuarkusWorkerPoolRegistry.class);
122+
QuarkusWorkerPoolRegistry.class, ConnectorContextPropagationDecorator.class, ContextualEmitterFactory.class));
123+
if (buildTimeConfig.activateRequestScopeEnabled()) {
124+
additionalBean.produce(new AdditionalBeanBuildItem(RequestScopedDecorator.class));
125+
}
120126
}
121127

122128
@BuildStep
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.quarkus.smallrye.reactivemessaging.runtime;
2+
3+
import java.util.List;
4+
import java.util.Optional;
5+
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
import jakarta.inject.Inject;
8+
9+
import org.eclipse.microprofile.config.inject.ConfigProperty;
10+
import org.eclipse.microprofile.context.ThreadContext;
11+
import org.eclipse.microprofile.reactive.messaging.Message;
12+
13+
import io.smallrye.mutiny.Multi;
14+
import io.smallrye.reactive.messaging.PublisherDecorator;
15+
import io.smallrye.reactive.messaging.SubscriberDecorator;
16+
17+
@ApplicationScoped
18+
public class ConnectorContextPropagationDecorator implements PublisherDecorator, SubscriberDecorator {
19+
20+
private final ThreadContext tc;
21+
22+
@Inject
23+
public ConnectorContextPropagationDecorator(
24+
@ConfigProperty(name = "quarkus.messaging.connector-context-propagation") Optional<List<String>> propagation) {
25+
tc = ThreadContext.builder()
26+
.propagated(propagation.map(l -> l.toArray(String[]::new)).orElse(ThreadContext.NONE))
27+
.cleared(ThreadContext.ALL_REMAINING)
28+
.build();
29+
}
30+
31+
@Override
32+
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, List<String> channelName,
33+
boolean isConnector) {
34+
if (isConnector) {
35+
return publisher.emitOn(tc.currentContextExecutor());
36+
}
37+
return publisher;
38+
}
39+
40+
@Override
41+
public int getPriority() {
42+
// Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0
43+
return -100;
44+
}
45+
}

0 commit comments

Comments
 (0)