22
33import java .util .List ;
44import java .util .Optional ;
5+ import java .util .concurrent .Executor ;
56
67import jakarta .enterprise .context .ApplicationScoped ;
78import jakarta .inject .Inject ;
1112import org .eclipse .microprofile .reactive .messaging .Message ;
1213
1314import io .smallrye .mutiny .Multi ;
15+ import io .smallrye .mutiny .helpers .ParameterValidation ;
16+ import io .smallrye .mutiny .operators .multi .AbstractMultiOperator ;
17+ import io .smallrye .mutiny .operators .multi .MultiOperatorProcessor ;
18+ import io .smallrye .mutiny .subscription .MultiSubscriber ;
1419import io .smallrye .reactive .messaging .PublisherDecorator ;
1520import io .smallrye .reactive .messaging .SubscriberDecorator ;
1621
@@ -32,7 +37,7 @@ public ConnectorContextPropagationDecorator(
3237 public Multi <? extends Message <?>> decorate (Multi <? extends Message <?>> publisher , List <String > channelName ,
3338 boolean isConnector ) {
3439 if (isConnector ) {
35- return publisher . emitOn ( tc . currentContextExecutor () );
40+ return new ContextPropagationOperator <>( publisher , tc );
3641 }
3742 return publisher ;
3843 }
@@ -42,4 +47,42 @@ public int getPriority() {
4247 // Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0
4348 return -100 ;
4449 }
50+
51+ public static class ContextPropagationOperator <T > extends AbstractMultiOperator <T , T > {
52+
53+ private final ThreadContext tc ;
54+
55+ /**
56+ * Creates a new {@link AbstractMultiOperator} with the passed {@link Multi} as upstream.
57+ *
58+ * @param upstream the upstream, must not be {@code null}
59+ */
60+ public ContextPropagationOperator (Multi <? extends T > upstream , ThreadContext tc ) {
61+ super (upstream );
62+ this .tc = tc ;
63+ }
64+
65+ @ Override
66+ public void subscribe (MultiSubscriber <? super T > downstream ) {
67+ ParameterValidation .nonNullNpe (downstream , "subscriber" );
68+ upstream .subscribe ().withSubscriber (new ContextPropagationProcessor <>(downstream , tc ));
69+ }
70+
71+ static final class ContextPropagationProcessor <T > extends MultiOperatorProcessor <T , T > {
72+
73+ private final Executor tcExecutor ;
74+
75+ public ContextPropagationProcessor (MultiSubscriber <? super T > downstream , ThreadContext tc ) {
76+ super (downstream );
77+ this .tcExecutor = tc .currentContextExecutor ();
78+ }
79+
80+ @ Override
81+ public void onItem (T item ) {
82+ // Even though the executor is called, this is a synchronous call
83+ tcExecutor .execute (() -> super .onItem (item ));
84+ }
85+
86+ }
87+ }
4588}
0 commit comments