Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<smallrye-jwt.version>1.1.1</smallrye-jwt.version>
<smallrye-reactive-streams-operators.version>1.0.3</smallrye-reactive-streams-operators.version>
<smallrye-converter-api.version>1.0.3</smallrye-converter-api.version>
<smallrye-reactive-messaging.version>0.0.5</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>0.0.7</smallrye-reactive-messaging.version>
<smallrye-rest-client.version>1.2.2</smallrye-rest-client.version>
<swagger-ui.version>3.20.9</swagger-ui.version>
<javax.activation.version>1.1.1</javax.activation.version>
Expand Down Expand Up @@ -129,7 +129,7 @@
<reactive-streams.version>1.0.2</reactive-streams.version>
<test-containers.version>1.10.7</test-containers.version>
<jboss-logging.version>3.3.2.Final</jboss-logging.version>
<axle-client.version>0.0.3</axle-client.version>
<axle-client.version>0.0.4</axle-client.version>
<kafka-clients.version>1.1.0</kafka-clients.version>
<kafka2.version>1.1.0</kafka2.version>
<debezium.version>0.9.2.Final</debezium.version>
Expand Down
2 changes: 1 addition & 1 deletion build-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
what we work with by self downloading it: -->
<graal-sdk.version-for-documentation>1.0.0-rc14</graal-sdk.version-for-documentation>
<rest-assured.version>3.3.0</rest-assured.version>
<axle-client.version>0.0.2</axle-client.version>
<axle-client.version>0.0.4</axle-client.version>
<vertx.version>3.6.3</vertx.version>

<!-- Enable APT by default for Eclipse -->
Expand Down
15 changes: 13 additions & 2 deletions docs/src/main/asciidoc/async-message-passing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,19 @@ public class GreetingService {
<1> If not set, the address is the fully qualified name of the bean, for instance, in this snippet it's `org.acme.vertx.GreetingService`.
<2> The method parameter is the message body. If the method returns _something_ it's the message response.

// TODO Revisit this once the blocking PR is integrated - https://github.com/quarkusio/quarkus/pull/1646
IMPORTANT: The code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop.
[IMPORTANT]
====
By default, the code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop.
If your processing is blocking, use the `blocking` arttribute:

[source, java]
----
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
----
====

=== Configuring the address

Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/using-vertx.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ The first approach can be implemented as follows:
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("{name}/streaming")
public Publisher<String> greeting(@PathParam("name") String name) {
return ReactiveStreams.fromPublisher(vertx.periodicStream(2000).toPublisher())
return vertx.periodicStream(2000).toPublisherBuilder()
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()))
.buildRs();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.quarkus.smallrye.reactivemessaging.deployment;

import org.jboss.builder.item.MultiBuildItem;
import org.jboss.jandex.FieldInfo;
import org.jboss.jandex.MethodInfo;

import io.quarkus.arc.processor.BeanInfo;

public final class EmitterBuildItem extends MultiBuildItem {

/**
* The name of the stream the emitter is connected to.
*/
private final String name;

public EmitterBuildItem(String name) {
this.name = name;
}

public String getName() {
return name;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.DotName;
import org.jboss.jandex.MethodInfo;
import org.jboss.logging.Logger;
Expand All @@ -37,13 +38,16 @@
import io.quarkus.arc.processor.AnnotationStore;
import io.quarkus.arc.processor.BeanDeploymentValidator;
import io.quarkus.arc.processor.BeanInfo;
import io.quarkus.arc.processor.InjectionPointInfo;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingTemplate;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;

/**
*
Expand All @@ -55,6 +59,8 @@ public class SmallRyeReactiveMessagingProcessor {

static final DotName NAME_INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName NAME_OUTGOING = DotName.createSimple(Outgoing.class.getName());
static final DotName NAME_STREAM = DotName.createSimple(Stream.class.getName());
static final DotName NAME_EMITTER = DotName.createSimple(Emitter.class.getName());

@BuildStep
AdditionalBeanBuildItem beans() {
Expand All @@ -63,6 +69,7 @@ AdditionalBeanBuildItem beans() {

@BuildStep
BeanDeploymentValidatorBuildItem beanDeploymentValidator(BuildProducer<MediatorBuildItem> mediatorMethods,
BuildProducer<EmitterBuildItem> emitters,
BuildProducer<FeatureBuildItem> feature) {

feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_REACTIVE_MESSAGING));
Expand All @@ -78,10 +85,7 @@ public void validate(ValidationContext validationContext) {
for (BeanInfo bean : validationContext.get(Key.BEANS)) {
if (bean.isClassBean()) {
// TODO: add support for inherited business methods
for (MethodInfo method : bean.getTarget()
.get()
.asClass()
.methods()) {
for (MethodInfo method : bean.getTarget().get().asClass().methods()) {
if (annotationStore.hasAnnotation(method, NAME_INCOMING)
|| annotationStore.hasAnnotation(method, NAME_OUTGOING)) {
// TODO: validate method params and return type?
Expand All @@ -91,6 +95,19 @@ public void validate(ValidationContext validationContext) {
}
}
}

for (InjectionPointInfo injectionPoint : validationContext.get(Key.INJECTION_POINTS)) {
if (injectionPoint.getRequiredType().name().equals(NAME_EMITTER)) {
AnnotationInstance stream = injectionPoint.getRequiredQualifier(NAME_STREAM);
if (stream != null) {
// Stream.value() is mandatory
String name = stream.value().asString();
LOGGER.debugf("Emitter injection point '%s' detected, stream name: '%s'",
injectionPoint.getTargetInfo(), name);
emitters.produce(new EmitterBuildItem(name));
}
}
}
}
});
}
Expand All @@ -105,6 +122,7 @@ public List<UnremovableBeanBuildItem> removalExclusions() {
@Record(STATIC_INIT)
public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuildItem beanContainer,
List<MediatorBuildItem> mediatorMethods,
List<EmitterBuildItem> emitterFields,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
/*
* IMPLEMENTATION NOTE/FUTURE IMPROVEMENTS: It would be possible to replace the reflection completely and use Jandex and
Expand All @@ -124,7 +142,8 @@ public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuild
.getIdentifier());
}
}
template.registerMediators(beanClassToBeanId, beanContainer.getValue());
template.registerMediators(beanClassToBeanId, beanContainer.getValue(),
emitterFields.stream().map(EmitterBuildItem::getName).collect(Collectors.toList()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.runtime;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

Expand All @@ -15,9 +16,10 @@
@Template
public class SmallRyeReactiveMessagingTemplate {

public void registerMediators(Map<String, String> beanClassToBeanId, BeanContainer container) {
public void registerMediators(Map<String, String> beanClassToBeanId, BeanContainer container, List<String> emitters) {
// Extract the configuration and register mediators
MediatorManager mediatorManager = container.instance(MediatorManager.class);
mediatorManager.initializeEmitters(emitters);
for (Entry<String, String> entry : beanClassToBeanId.entrySet()) {
try {
Class<?> beanClass = Thread.currentThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.AnnotationTarget.Kind;
import org.jboss.jandex.DotName;
import org.jboss.jandex.FieldInfo;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.Type;
Expand Down Expand Up @@ -138,6 +139,15 @@ public Set<AnnotationInstance> getRequiredQualifiers() {
return typeAndQualifiers.qualifiers;
}

public AnnotationInstance getRequiredQualifier(DotName name) {
for (AnnotationInstance qualifier : typeAndQualifiers.qualifiers) {
if (qualifier.name().equals(name)) {
return qualifier;
}
}
return null;
}

public boolean hasDefaultedQualifier() {
return hasDefaultedQualifier;
}
Expand Down