-
Notifications
You must be signed in to change notification settings - Fork 3k
Context propagation for Messaging (take 2) #46174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Context propagation for Messaging (take 2) #46174
Conversation
This comment has been minimized.
This comment has been minimized.
|
🙈 The PR is closed and the preview is expired. |
This comment has been minimized.
This comment has been minimized.
|
@holly-cummins fyi the fix for #46047 |
688aebe to
b062fd1
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - a few minor comments
...g/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java
Show resolved
Hide resolved
...g/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java
Show resolved
Hide resolved
...g/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java
Show resolved
Hide resolved
...ntime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterImpl.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterImpl.java
Outdated
Show resolved
Hide resolved
...eactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerConsumers.java
Show resolved
Hide resolved
b062fd1 to
4452e6f
Compare
|
Rebased and made changes for comments |
Status for workflow
|
Status for workflow
|
|
@FroMage replying to #46256 (comment) here as it is more relevant :
As I linked in the description, creating an empty request context state for "cleared" is written in the spec: https://github.com/microprofile/microprofile-context-propagation/blob/aed4d96d81675a0502185e340ec42d88b8bf2339/api/src/main/java/org/eclipse/microprofile/context/ThreadContext.java#L326 and Arc request scopes are implemented that way.
It is helpful in the current state of things, and provides consistent behavior when using the |
|
Well the spec doesn't say anything about creating a new context or when it's terminated. That's the bit that is problematic here. |
You beat me to it. |
Indeed there is a specific test for this case : |
|
@FroMage do you think it is ok to merge? |
Take 2 after #45827 has been reverted for flaky tests.
quarkus.messaging.connector-context-propagationruntime property to configure propagated context to connector channels, defaults to NONE.quarkus.messaging.request-scoped.enabledbuild time property to enable request scoped context for consumed incoming messages, destroys state/deactivates the context once the message is (n)acked, defaults to falseContextualEmitter, a Quarkus-specific emitter type for better handling the request-scoped context propagation.Note on previous PR flaky tests and
ContextualEmitter:When context propagation is configured to clear (not propagate) the request-scoped (CDI) context, the currently active contexts are still available to the contextualized call, with their state erased. https://github.com/microprofile/microprofile-context-propagation/blob/aed4d96d81675a0502185e340ec42d88b8bf2339/api/src/main/java/org/eclipse/microprofile/context/ThreadContext.java#L326
Once the contextualized call returns, the intermediate state is destroyed.
When using internal channels with emitters to dispatch messages locally, the message is emitted asynchronously and dispatched on the duplicated message context. This duplicated context references a fresh CDI state.
The race condition is,
Tricks like
@ActivateRequestScopecreate more confusion because sometimes there already is an active context, other times it creates a new one.The
ContextualEmittermakes sure the duplicated context is captured on the contextualized call, and it returns (destroying the state) before the message emit happens.It ensures that, if the caller method doesn't propagate CDI context (ex. is annotated with
@CurrentThreadContext(propagated = {})) the processing of the messaged dispatched by the emitter won't have an active CDI context.WARNING: All this doesn't resolve issues like calls that propagate a request-scoped context to an emitter, that doesn't wait until the end of processing, destroying the context state before or during the locally dispatched message processing.
Fix #46256