Skip to content

Commit bd33558

Browse files
committed
Preserve active request on subscriber when using messaging-kotlin
1 parent 829f805 commit bd33558

File tree

4 files changed

+330
-11
lines changed

4 files changed

+330
-11
lines changed

extensions/smallrye-reactive-messaging/deployment/pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@
9191
<artifactId>quarkus-resteasy-deployment</artifactId>
9292
<scope>test</scope>
9393
</dependency>
94+
95+
96+
<dependency>
97+
<groupId>org.jetbrains.kotlinx</groupId>
98+
<artifactId>kotlinx-coroutines-test</artifactId>
99+
<scope>test</scope>
100+
</dependency>
94101
</dependencies>
95102

96103
<build>
@@ -112,6 +119,33 @@
112119
</execution>
113120
</executions>
114121
</plugin>
122+
<plugin>
123+
<groupId>org.jetbrains.kotlin</groupId>
124+
<artifactId>kotlin-maven-plugin</artifactId>
125+
<version>${kotlin.version}</version>
126+
<executions>
127+
<execution>
128+
<id>compile</id>
129+
<goals>
130+
<goal>compile</goal>
131+
</goals>
132+
</execution>
133+
<execution>
134+
<id>test-compile</id>
135+
<goals>
136+
<goal>test-compile</goal>
137+
</goals>
138+
<configuration>
139+
<sourceDirs>
140+
<source>src/test/kotlin</source>
141+
</sourceDirs>
142+
</configuration>
143+
</execution>
144+
</executions>
145+
<configuration>
146+
<jvmTarget>${maven.compiler.target}</jvmTarget>
147+
</configuration>
148+
</plugin>
115149
</plugins>
116150
</build>
117151
</project>
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package io.quarkus.smallrye.reactivemessaging
2+
3+
import io.quarkus.arc.Arc
4+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter
5+
import io.quarkus.test.QuarkusUnitTest
6+
import io.smallrye.common.vertx.VertxContext
7+
import io.vertx.core.Vertx
8+
import jakarta.enterprise.context.ApplicationScoped
9+
import jakarta.enterprise.context.RequestScoped
10+
import jakarta.inject.Inject
11+
import kotlinx.coroutines.delay
12+
import org.eclipse.microprofile.reactive.messaging.Channel
13+
import org.eclipse.microprofile.reactive.messaging.Emitter
14+
import org.eclipse.microprofile.reactive.messaging.Incoming
15+
import org.jboss.shrinkwrap.api.spec.JavaArchive
16+
import org.junit.jupiter.api.AfterEach
17+
import org.junit.jupiter.api.Assertions.assertEquals
18+
import org.junit.jupiter.api.Test
19+
import org.junit.jupiter.api.extension.RegisterExtension
20+
import java.io.File
21+
import java.util.concurrent.CompletableFuture
22+
import java.util.concurrent.LinkedBlockingQueue
23+
import java.util.concurrent.TimeUnit
24+
import java.util.function.Consumer
25+
26+
class KotlinSubscriberTest {
27+
28+
companion object {
29+
@RegisterExtension
30+
@JvmStatic
31+
val config: QuarkusUnitTest? = QuarkusUnitTest()
32+
.withApplicationRoot(Consumer { jar: JavaArchive? ->
33+
jar!!
34+
.addClasses(
35+
KotlinSubscriber::class.java
36+
)
37+
.addAsResource(
38+
File("src/test/resources/config/worker-config.properties"),
39+
"application.properties"
40+
)
41+
})
42+
}
43+
44+
@Inject
45+
lateinit var requestData: RequestData
46+
47+
@Inject
48+
lateinit var kotlinSubscriber: KotlinSubscriber
49+
50+
@Inject
51+
@Channel("contextual-in")
52+
lateinit var contextualEmitter: ContextualEmitter<String>
53+
54+
@Inject
55+
@Channel("non-contextual-in")
56+
lateinit var nonContextualEmitter: Emitter<String>
57+
58+
@Inject
59+
lateinit var vertx: Vertx
60+
61+
@AfterEach
62+
fun cleanup() {
63+
kotlinSubscriber.reset()
64+
Arc.container().requestContext().terminate()
65+
}
66+
67+
@Test
68+
fun `preserve request context on suspend when publisher doesn't have a request context`() {
69+
// GIVEN No request context is active
70+
Arc.container().requestContext().terminate()
71+
72+
// AND a message
73+
val message = "preserve this context when no context on publisher"
74+
75+
// AND a publisher's context (duplicated vertx context)
76+
val publisherContext = VertxContext.createNewDuplicatedContext(vertx.orCreateContext)
77+
78+
// WHEN we publish a message through the emitter
79+
publisherContext.runOnContext {
80+
nonContextualEmitter.send(message)
81+
}
82+
83+
// THEN the subscriber receives the message and preserves the request context
84+
kotlinSubscriber.verifyReceivedDataIsPreservedWhenSuspendingAndResuming(message)
85+
86+
}
87+
88+
@Test
89+
fun `preserve request context on suspend when publisher does have a request context`() {
90+
// GIVEN that request context is active
91+
val requestContext = Arc.container().requestContext()
92+
val state = requestContext.activate()
93+
94+
// AND a message
95+
val message = "preserve this context when publisher has a request context"
96+
97+
// AND a publisher's context (duplicated vertx context)
98+
val publisherContext = VertxContext.createNewDuplicatedContext(vertx.orCreateContext)
99+
publisherContext.runOnContext {
100+
requestContext.activate(state)
101+
}
102+
103+
// WHEN we publish a message through the emitter
104+
publisherContext.runOnContext {
105+
contextualEmitter.send(message).subscribe().with { }
106+
}
107+
108+
// THEN the subscriber receives the message and preserves the request context
109+
kotlinSubscriber.verifyReceivedDataIsPreservedWhenSuspendingAndResuming(message)
110+
111+
// AND the request data is preserved in the test's context
112+
assertEquals(message, requestData.messageValue)
113+
114+
// AND the request data is preserved in the publisher's context
115+
val messageInContext = CompletableFuture<String>()
116+
publisherContext.runOnContext {
117+
messageInContext.complete(requestData.messageValue)
118+
}
119+
assertEquals(message, messageInContext.get(30, TimeUnit.SECONDS))
120+
}
121+
122+
@Test
123+
fun `preserve request context on suspend when publisher publishing to two request contexts`() {
124+
// GIVEN that request context is active
125+
val requestContext = Arc.container().requestContext()
126+
val states = listOf(requestContext.activate(), requestContext.activate())
127+
128+
// AND a message
129+
val messages = listOf(
130+
"preserve this context when publisher has a request context 1",
131+
"preserve this context when publisher has a request context 2",
132+
)
133+
134+
// AND a publisher's context (duplicated vertx context)
135+
val vertxContext = vertx.orCreateContext
136+
val publisherContexts = listOf(
137+
VertxContext.createNewDuplicatedContext(vertxContext),
138+
VertxContext.createNewDuplicatedContext(vertxContext),
139+
)
140+
states.forEachIndexed { index, state ->
141+
val publisherContext = publisherContexts[index]
142+
publisherContext.runOnContext {
143+
requestContext.activate(state)
144+
}
145+
}
146+
147+
// WHEN we publish a message through the emitter on both contexts
148+
states.forEachIndexed { index, state ->
149+
val message = messages[index]
150+
val publisherContext = publisherContexts[index]
151+
publisherContext.runOnContext {
152+
contextualEmitter.send(message).subscribe().with { }
153+
}
154+
}
155+
156+
states.forEachIndexed { index, state ->
157+
val message = messages[index]
158+
val publisherContext = publisherContexts[index]
159+
requestContext.activate(state)
160+
161+
162+
// THEN the subscriber receives the message and preserves the request context
163+
kotlinSubscriber.verifyReceivedDataIsPreservedWhenSuspendingAndResuming(message)
164+
165+
// AND the request data is preserved in the test's context
166+
assertEquals(message, requestData.messageValue)
167+
168+
// AND the request data is preserved in the publisher's context
169+
val messageInContext = CompletableFuture<String>()
170+
publisherContext.runOnContext {
171+
requestContext.activate(state)
172+
messageInContext.complete(requestData.messageValue)
173+
}
174+
assertEquals(message, messageInContext.get(30, TimeUnit.SECONDS))
175+
}
176+
177+
// cleanup
178+
states.forEach { state ->
179+
requestContext.activate(state)
180+
requestContext.terminate()
181+
}
182+
}
183+
184+
@RequestScoped
185+
class RequestData {
186+
var messageValue: String = ""
187+
188+
suspend fun sleep() {
189+
delay(10)
190+
}
191+
}
192+
193+
@RequestScoped
194+
class ConsumedRequestData {
195+
val messageValue = CompletableFuture<String>()
196+
}
197+
198+
@ApplicationScoped
199+
class KotlinSubscriber @Inject constructor(
200+
private val requestData: RequestData,
201+
private val consumedRequestData: ConsumedRequestData
202+
) {
203+
204+
private val preSuspendReceivedData = LinkedBlockingQueue<String>()
205+
private val postSuspendReceivedData = LinkedBlockingQueue<String>()
206+
private val requestContext = Arc.container().requestContext()
207+
208+
@Incoming("contextual-in")
209+
suspend fun consumeFromContextual(message: String) {
210+
return consume(message)
211+
}
212+
213+
@Incoming("non-contextual-in")
214+
suspend fun consumeFromNonContextual(message: String) {
215+
return consume(message)
216+
}
217+
218+
private suspend fun consume(message: String) {
219+
220+
requestData.messageValue = message
221+
222+
// capture the data before suspension
223+
preSuspendReceivedData.offer(requestData.messageValue)
224+
225+
// force a suspension
226+
delay(10)
227+
228+
// capture the data after suspension
229+
if (requestContext.isActive) {
230+
consumedRequestData.messageValue.complete(message)
231+
}
232+
postSuspendReceivedData.offer(requestData.messageValue)
233+
234+
}
235+
236+
fun reset() {
237+
preSuspendReceivedData.clear()
238+
postSuspendReceivedData.clear()
239+
}
240+
241+
fun verifyReceivedDataIsPreservedWhenSuspendingAndResuming(
242+
expectedMessage: String
243+
) {
244+
245+
val preMessage = preSuspendReceivedData.poll(30, TimeUnit.SECONDS)
246+
assertEquals(expectedMessage, preMessage)
247+
248+
val postMessage = postSuspendReceivedData.poll(30, TimeUnit.SECONDS)
249+
assertEquals(expectedMessage, postMessage)
250+
251+
if (requestContext.isActive) {
252+
val consumedMessage = consumedRequestData.messageValue.get(30, TimeUnit.SECONDS)
253+
assertEquals(expectedMessage, consumedMessage)
254+
}
255+
}
256+
}
257+
}

extensions/smallrye-reactive-messaging/kotlin/src/main/kotlin/io/quarkus/smallrye/reactivemessaging/runtime/kotlin/AbstractSubscribingCoroutineInvoker.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,27 @@ import io.quarkus.arc.Arc
44
import io.smallrye.reactive.messaging.Invoker
55
import io.vertx.core.Vertx
66
import java.util.concurrent.CompletableFuture
7-
import kotlinx.coroutines.CoroutineDispatcher
87
import kotlinx.coroutines.async
98
import kotlinx.coroutines.future.asCompletableFuture
109

1110
abstract class AbstractSubscribingCoroutineInvoker(private val beanInstance: Any) : Invoker {
1211

1312
override fun invoke(vararg args: Any?): CompletableFuture<Any?> {
1413
val coroutineScope = Arc.container().instance(ApplicationCoroutineScope::class.java).get()
15-
val dispatcher: CoroutineDispatcher =
14+
val dispatcher =
1615
Vertx.currentContext()?.let(::VertxDispatcher)
1716
?: throw IllegalStateException(
1817
"No Vertx context found. Consider using @NonBlocking on the caller method, or make sure the upstream emits items on the Vert.x context"
1918
)
2019

2120
return coroutineScope
22-
.async(context = dispatcher) { invokeBean(beanInstance, args) }
21+
.async(context = dispatcher) {
22+
try {
23+
invokeBean(beanInstance, args)
24+
} finally {
25+
dispatcher.cleanup()
26+
}
27+
}
2328
.asCompletableFuture()
2429
}
2530

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,47 @@
11
package io.quarkus.smallrye.reactivemessaging.runtime.kotlin
22

33
import io.quarkus.arc.Arc
4+
import io.quarkus.arc.InjectableContext
45
import io.vertx.core.Context
56
import kotlin.coroutines.CoroutineContext
67
import kotlinx.coroutines.CoroutineDispatcher
78

89
class VertxDispatcher(private val vertxContext: Context) : CoroutineDispatcher() {
10+
private val requestContext = Arc.container().requestContext()
11+
private val state: InjectableContext.ContextState
12+
private val destroyState: Boolean
13+
14+
init {
15+
if (requestContext.isActive) {
16+
state = requestContext.state
17+
destroyState = false
18+
} else {
19+
destroyState = true
20+
state = requestContext.activate()
21+
requestContext.deactivate()
22+
}
23+
}
24+
925
override fun dispatch(context: CoroutineContext, block: Runnable) {
10-
val requestContext = Arc.container().requestContext()
1126
vertxContext.runOnContext {
12-
if (requestContext.isActive) {
27+
val previousState = requestContext.stateIfActive
28+
requestContext.activate(state)
29+
try {
1330
block.run()
14-
} else {
15-
try {
16-
requestContext.activate()
17-
block.run()
18-
} finally {
19-
requestContext.terminate()
31+
} finally {
32+
if (previousState != null) {
33+
requestContext.activate(previousState)
34+
} else {
35+
requestContext.deactivate()
2036
}
2137
}
2238
}
2339
}
40+
41+
fun cleanup() {
42+
if (destroyState) {
43+
requestContext.activate(state)
44+
requestContext.destroy()
45+
}
46+
}
2447
}

0 commit comments

Comments
 (0)