Skip to content

Reactive messaging fails when using PublisherBuilder or ProcessorBuilder #4587

@misl

Description

@misl

When switching from Quarkus 0.23.2 to 0.24.0 my application resulted in the following error:

2019-10-15 14:58:43,386 ERROR [it.tra.dis.mes.dat.DataPointProcessor#process] (vert.x-eventloop-thread-1) The method it.traeck.dispatch.messaging.data.DataPointProcessor#process has thrown an exception: java.lang.ClassCastException: class [B cannot be cast to class io.smallrye.reactive.messaging.mqtt.MqttMessage ([B is in module java.base of loader 'bootstrap'; io.smallrye.reactive.messaging.mqtt.MqttMessage is in unnamed module of loader 'app')
	at it.traeck.dispatch.messaging.data.DataPointProcessor_SmallryeMessagingInvoker_process_ea6c045679c66cce80caed88218a4aec51b70518.invoke(DataPointProcessor_SmallryeMessagingInvoker_process_ea6c045679c66cce80caed88218a4aec51b70518.zig:46)
	at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:61)
	at io.smallrye.reactive.messaging.ProcessorMediator.lambda$processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads$6(ProcessorMediator.java:201)
	at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
	at io.smallrye.reactive.streams.stages.FlatMapStageFactory$FlatMapStage.lambda$apply$1(FlatMapStageFactory.java:43)
	at io.reactivex.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.drain(FlowableConcatMap.java:284)
	at io.reactivex.internal.operators.flowable.FlowableConcatMap$BaseConcatMapSubscriber.onNext(FlowableConcatMap.java:159)
	at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
	at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
	at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
	...

While looking around with the debugger I noticed ProcessorMediator.processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() is being called. This message tried to invoke the handler with just the payload instead of the surrounding Message object. The reason for this Production.STREAM_OF_PAYLOAD is being configured instead of STREAM_OF_MESSAGE.

My handler looks like this:

@Incoming("device-data")
@Outgoing("entity-data")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public PublisherBuilder<MqttMessage<String>> process( final MqttMessage<byte[]> message ) {
    return ReactiveStreams.of( message )
        .filter( Objects::nonNull )
        .peek( messageLogger::log )
        .map( wrapper( this::consume ) )
        .map( wrapper( this::transform ) )
        .onErrorResumeWith( e -> {
          if ( e instanceof FailedMessageException ) {
            exceptionLogger.log( (FailedMessageException) e );
          } else {
            LOGGER.error( "Message failed:", e );
          }
          return ReactiveStreams.empty();
        } )
        ;
}

Initially I thought this was an issue somewhere in SmallRye messaging. That's why I created an issue there. However further investigation lead to the Quarkus reactive messaging extension.

It appears that QuarkusMediatorConfigurationUtil.JandexGenericTypeAssignable.check() results in NotAssignable where Assignable is expected. Especially the following part: argumentClass.isAssignableFrom(target). This basicaly results in AmqpMessage.isAssignalbleFrom(Message) or MqttMessage.isAssignalbleFrom(Message)

Should it not be the other way around? target.isAssignableFrom(argumentClass)

I ran into it using Mqtt but I think it applies to Amqp as well.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions