Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation for Messaging (take 2) #46174

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/context-propagation.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ If you also need to customize your instance, you can do so using `@ManagedExecut
ThreadContext sameContext;
----

[[context-propagation-for-cdi]]
== Context Propagation for CDI

In terms of CDI, `@RequestScoped`, `@ApplicationScoped` and `@Singleton` beans get propagated and are available in other threads.
Expand Down
192 changes: 190 additions & 2 deletions docs/src/main/asciidoc/messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ For more control, using link:{mutiny}[Mutiny] APIs, you can use the `MutinyEmitt
[source, java]
----
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.MutinyEmitter;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;

@ApplicationScoped
@Path("/")
Expand Down Expand Up @@ -562,6 +563,7 @@ public class StreamProcessor {
}
----

[[execution_model]]
== Execution Model

Quarkus Messaging sits on top of the xref:quarkus-reactive-architecture.adoc#engine[reactive engine] of Quarkus and leverages link:{eclipse-vertx}[Eclipse Vert.x] to dispatch messages for processing.
Expand Down Expand Up @@ -634,6 +636,162 @@ Depending on the broker technology, this can be useful to increase the applicati
while still preserving the partial order of messages received in different copies.
This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.

== Context Propagation

In Quarkus Messaging, the default mechanism for propagating context between different processing stages is the
link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context].
This provides a consistent way to pass context information along with the message as it flows through different stages.

When integrating with other extensions, notably using Emitters, it relies on the Mutiny context propagation:

=== Interaction with Mutiny and MicroProfile Context Propagation

Mutiny, which is the foundation of reactive programming in Quarkus, is integrated with the MicroProfile Context Propagation.
This integration enables automatic capturing and restoring of context across asynchronous boundaries.
To learn more about context propagation in Quarkus and Mutiny, refer to the xref:context-propagation.adoc[Context Propagation] guide.

To ensure consistent behavior, Quarkus Messaging disables the propagation of any context during message dispatching through inbound or outbound connectors.
This means that context captured through Emitters won't be propagated to the outgoing channel, and incoming channels won't dispatch messages by activating a context (e.g. the request context).
This behaviour can be configured using `quarkus.messaging.connector-context-propagation` configuration property, by listing the context types to propagate.
For example `quarkus.messaging.connector-context-propagation=CDI` will only propagate the CDI context.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why it's called connector-context-propagation and not context-propagation? Do we expect other settings for non-connectors?


Internal channels however do propagate the context, as they are part of the same application and the context is not lost.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth turning "internal channels" into a link to the section explaining what they are.


For example, you might want to propagate the caller context from an incoming HTTP request to the message processing stage.
For emitters, it is recommended to use the `MutinyEmitter`, as it exposes methods such as `sendAndAwait` that makes sure to wait until a message processing is terminated.

[WARNING]
====
The execution context to which the RequestScoped context is bound, in the previous example the REST call, controls the lifecycle of the context.
This means that when the REST call is completed the RequestScoped context is destroyed.
Therefore, you need to make sure that your processing or message dispatch is completed before the REST call completes.

For more information check the xref:context-propagation.adoc#context-propagation-for-cdi[Context Propagation] guide.
====

For example, let `RequestScopedBean` a request-scoped bean, `MutinyEmitter` can be used to dispatch messages locally through the internal channel `app`:

[source, java]
----
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.reactive.messaging.MutinyEmitter;

import io.quarkus.logging.Log;

import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/")
public class Resource {

@Channel("app")
MutinyEmitter<String> emitter;

@Inject
RequestScopedBean requestScopedBean;

@POST
@Path("/send")
public void send(String message) {
requestScopedBean.setValue("Hello");
emitter.sendAndAwait(message);
}

}
----

Then the request-scoped bean can be accessed in the message processing stage, regardless of the <<execution_model>>:

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.annotations.Blocking;


@ApplicationScoped
public class Processor {

@Inject
RequestScopedBean requestScopedBean;

@Incoming("app")
@Blocking
public void process(String message) {
Log.infof("Message %s from request %s", message, requestScopedBean.getValue());
}

}
----

[TIP]
====
You can use the context propagation annotation `@CurrentThreadContext` to configure the context propagation for a specific method.
Using an emitter this annotation needs to be present on the propagator method, i.e. the caller, not the processing method.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a very clear sentence. I suppose what you're trying to explain is that this configures the contexts that will be propagated "from" an emitter method, and as such, the annotation needs to be put on the emitter/emitting method, and configures the contexts that will be captured and propagated from that method.


The asynchronous nature of the message dispatching in Quarkus Messaging, and the way context propagation works,
can lead to unexpected results using emitters in internal channels.
It is recommended to use `ContextualEmitter` to ensure the context propagation plan is applied correctly.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not tell me why I should use ContextualEmitter instead of MutinyEmitter. I can't guess.


The following example shows how to avoid propagating any context to the message processing stage:

[source, java]
----
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.logging.Log;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;

import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/")
public class Resource {

@Channel("app")
ContextualEmitter<String> emitter;

@Inject
RequestScopedBean requestScopedBean;

@POST
@Path("/send")
@CurrentThreadContext(propagated = {})
public void send(String message) {
requestScopedBean.setValue("Hello");
emitter.sendAndAwait(message);
}

}
----

====

=== Request Context Activation

In some cases, you might need to activate the request context while processing messages consumed from a broker.
While using `@ActivateRequestContext` on the `@Incoming` method is an option, it's lifecycle does not follow that of a Quarkus Messaging message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
While using `@ActivateRequestContext` on the `@Incoming` method is an option, it's lifecycle does not follow that of a Quarkus Messaging message.
While using `@ActivateRequestContext` on the `@Incoming` method is an option, its lifecycle does not follow that of a Quarkus Messaging message.

For incoming channels, you can enable the request scope activation with the build time property `quarkus.messaging.request-scoped.enabled=true`.
This will activate the request context for each message processed by incoming channels, and close the context once the message is processed.

== Health Checks

Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel.
Expand Down Expand Up @@ -868,9 +1026,39 @@ The `quarkus-test-vertx` dependency provides the `@io.quarkus.test.vertx.RunOnVe

If your tests are dependent on context propagation, you can configure the in-memory connector channels with `run-on-vertx-context` attribute to dispatch events, including messages and acknowledgements, on a Vert.x context.
Alternatively you can switch this behaviour using the `InMemorySource#runOnVertxContext` method.

====

=== Channel Decorators

https://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[Channel decorators] is a way to intercept and decorate the reactive streams corresponding to messaging channels.
This can be useful for adding custom behavior to the channels, such as logging, metrics, or error handling.

It is therefore possible to implement a bean implementing `PublisherDecorator` for incoming channels, and `SubscriberDecorator` for outgoing channels.
Since two APIs are symmetric, you can implement both interfaces in the same bean.
These beans are automatically discovered by Quarkus and applied by priority (from the least value to the greatest).

Some decorators are included by default by Quarkus extensions.

Incoming channels (PublisherDecorator) in the order of priority:

- `io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator` (-100): Clears the context propagation for incoming channels
- `io.smallrye.reactive.messaging.providers.locals.ContextDecorator` (0): Ensures messages are dispatched on the message context
- `io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator` (100): Handles pausable channels
- `io.smallrye.reactive.messaging.providers.IncomingInterceptorDecorator` (500): Handles `IncomingInterceptor` beans
- `io.smallrye.reactive.messaging.providers.metrics.MetricDecorator` (1000): MicroProfile Metrics support, enabled with `quarkus-smallrye-metrics` extension
- `io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator` (1000): Micrometer Metrics support, enabled with `quarkus-micrometer` extension
- `io.smallrye.reactive.messaging.providers.extension.ObservationDecorator` (1000): Message observation support for incoming channels
- `io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator` (1000): Handles pausable channels
- `io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingIncomingDecorator` (1000): Included with `quarkus-opentelemetry` extension, propagates tracing information

Outgoing channels (SubscriberDecorator):

- `io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator` (-100): Clears the context propagation for outgoing channels
- `io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator` (1000): Message observation support for outgoing channels
- `io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator` (1000): Handles pausable channels
- `io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator` (1000): Included with `quarkus-opentelemetry` extension, propagates tracing information
- `io.smallrye.reactive.messaging.providers.OutgoingInterceptorDecorator` (2000): Handles `OutgoingInterceptor` beans

== Going further

This guide shows the general principles of Quarkus Messaging extensions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName CONTEXTUAL_EMITTER = DotName.createSimple(io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter.class.getName());
static final DotName KAFKA_TRANSACTIONS_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_REQUEST_REPLY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}

if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isContextualEmitter(injectionPointType)
|| isKafkaTransactionsEmitter(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
Expand Down Expand Up @@ -695,6 +695,13 @@ private static boolean isMutinyEmitter(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isContextualEmitter(Type type) {
// raw type ContextualEmitter is wrong, must be ContextualEmitter<Something>
return DotNames.CONTEXTUAL_EMITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isKafkaTransactionsEmitter(Type type) {
// raw type KafkaTransactions is wrong, must be KafkaTransactions<Something>
return DotNames.KAFKA_TRANSACTIONS_EMITTER.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName CONTEXTUAL_EMITTER = DotName.createSimple(io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter.class.getName());
static final DotName PULSAR_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions.class.getName());

static final DotName TARGETED = DotName.createSimple(io.smallrye.reactive.messaging.Targeted.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}

if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isPulsarEmitter(injectionPointType)) {
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)
|| isContextualEmitter(injectionPointType) || isPulsarEmitter(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
Expand Down Expand Up @@ -467,6 +468,13 @@ private static boolean isMutinyEmitter(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isContextualEmitter(Type type) {
// raw type MutinyEmitter is wrong, must be MutinyEmitter<Something>
return DotNames.CONTEXTUAL_EMITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isPulsarEmitter(Type type) {
// raw type PulsarTransactions is wrong, must be PulsarTransactions<Something>
return DotNames.PULSAR_EMITTER.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ public interface ReactiveMessagingBuildTimeConfig {
@WithName("auto-connector-attachment")
@WithDefault("true")
boolean autoConnectorAttachment();

/**
* Whether to enable the RequestScope context on a message context
*/
@WithName("request-scoped.enabled")
@WithDefault("false")
boolean activateRequestScopeEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
import io.quarkus.smallrye.reactivemessaging.runtime.ConnectorContextPropagationDecorator;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitterFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor;
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterFilter;
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterInterceptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
Expand Down Expand Up @@ -112,11 +115,14 @@ FeatureBuildItem feature() {
}

@BuildStep
AdditionalBeanBuildItem beans() {
void beans(BuildProducer<AdditionalBeanBuildItem> additionalBean, ReactiveMessagingBuildTimeConfig buildTimeConfig) {
// We add the connector and channel qualifiers to make them part of the index.
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
QuarkusWorkerPoolRegistry.class);
QuarkusWorkerPoolRegistry.class, ConnectorContextPropagationDecorator.class, ContextualEmitterFactory.class));
if (buildTimeConfig.activateRequestScopeEnabled()) {
additionalBean.produce(new AdditionalBeanBuildItem(RequestScopedDecorator.class));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.smallrye.reactivemessaging.runtime;

import java.util.List;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.context.ThreadContext;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.SubscriberDecorator;

@ApplicationScoped
public class ConnectorContextPropagationDecorator implements PublisherDecorator, SubscriberDecorator {

private final ThreadContext tc;

@Inject
public ConnectorContextPropagationDecorator(
@ConfigProperty(name = "quarkus.messaging.connector-context-propagation") Optional<List<String>> propagation) {
tc = ThreadContext.builder()
.propagated(propagation.map(l -> l.toArray(String[]::new)).orElse(ThreadContext.NONE))
.cleared(ThreadContext.ALL_REMAINING)
.build();
}

@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, List<String> channelName,
boolean isConnector) {
if (isConnector) {
return publisher.emitOn(tc.currentContextExecutor());
}
return publisher;
}

@Override
public int getPriority() {
// Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0
return -100;
}
}
Loading
Loading